在分布式计算环境中,处理大规模数据集是一个常见的需求。为了简化这一过程,可以使用Apache Spark,这是一个能够让在一个集群的多个节点上分发数据和执行计算的框架。每个节点被称为一个独立的机器,它们各自处理数据的一个子集,并执行数据集操作的一部分计算。Spark主要使用Scala编写,但也支持Java、Python、R和SQL。
PySpark是为支持Python语言而引入的Spark接口。PySpark API包含了Scikit-learn和Pandas库的功能,实际上,最新版本的PySpark在计算能力上与用Scala编写的Spark相匹配。在PySpark中,数据框架主要可以通过以下两种方式创建:
以下代码和文件可以在Google Colaboratory中找到。将使用Google Colaboratory来实践如何将CSV文件导入PySpark。首先,需要在Google Colaboratory中安装pyspark库。
!pip install pyspark
要创建一个从现有RDD的PySpark数据框架,首先使用.parallelize()方法创建一个RDD,然后使用SparkSession的.createDataFrame()方法将其转换为PySpark数据框架。
要开始使用PySpark,首先需要创建一个Spark会话。通过导入库来创建Spark会话。
from pyspark import SparkContext
from pyspark.sql import SparkSession
创建SparkContext:
sc = SparkContext.getOrCreate()
使用SparkContext的.getOrCreate()方法来创建SparkContext,存储在变量sc中。如果之前已经创建过,.getOrCreate()方法会获取旧的实例。
创建SparkSession:
spark = SparkSession.builder.appName('PySpark DataFrame From RDD').getOrCreate()
在这里,通过传递一个字符串给.appName()来命名应用程序。接下来,使用.getOrCreate(),它会创建并实例化SparkSession到对象spark中。如果已经存在SparkSession,则会使用现有的。
创建弹性数据结构(RDD):
rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)], 4)
使用SparkContext sc的.parallelize()方法,传入学生成绩的元组。在后续步骤中,将这个RDD转换为PySpark数据框架。传入numSlices值为4,这是数据将并行化分成的分区数。
检查RDD的数据类型:
print(type(rdd))
可以通过检查变量rdd的数据类型来验证RDD是否创建成功。执行后,将得到pyspark.rdd.RDD。
将RDD转换为PySpark数据框架:
sub = ['Division','English','Mathematics','Physics','Chemistry']
marks_df = spark.createDataFrame(rdd, schema=sub)
SparkSession spark的.createDataFrame()方法接受数据作为RDD、Python列表或Pandas数据框架。这里传递RDD作为数据。还创建了一个字符串列表sub,它将被传递到.createDataFrame()方法的schema属性中。
检查PySpark数据框架的数据类型:
print(type(marks_df))
要验证操作是否成功,将检查marks_df的数据类型。执行后,将得到pyspark.sql.dataframe.DataFrame作为输出。
PySpark数据框架的模式:
marks_df.printSchema()
可以使用.printSchema()方法来检查文件的模式,这在有数十或数百列时非常有用。
PySpark数据框架的内容:
marks_df.show()
要查看文件的内容,将在PySpark数据框架对象上使用.show()方法。这将显示PySpark数据框架的前20行。
将所有内容整合在一起:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('PySpark DataFrame From RDD').getOrCreate()
rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)], 4)
sub = ['Division','English','Mathematics','Physics','Chemistry']
marks_df = spark.createDataFrame(rdd, schema=sub)
marks_df.show()
执行后,得到:
将使用SparkSession的.read()方法来导入外部文件。这将返回一个Spark数据框架对象。可以导入的外部文件格式包括JSON、TXT或CSV。导入每种文件类型的方法几乎相同,可以轻松导入。与从RDD创建PySpark数据框架的方法相比,这种方法更简单,只需要Spark会话。
导入库:
from pyspark.sql import SparkSession
创建Spark会话:
spark = SparkSession.builder.appName('PySpark DataFrame From External Files').getOrCreate()
在这里,通过传递一个字符串给.appName()来命名应用程序。接下来,使用.getOrCreate(),它会创建并实例化SparkSession到对象spark中。如果已经存在SparkSession,则会使用现有的。
将外部文件读入PySpark数据框架:
csv_file = spark.read.csv('Fish.csv', sep = ',', inferSchema = True, header = True)
txt_file = spark.read.text("example.txt")
json_file = spark.read.json("sample.json", multiLine=True)
检查PySpark数据框架的数据类型:
print(type(csv_file))
print(type(txt_file))
print(type(json_file))
检查PySpark数据框架的模式:
csv_file.printSchema()
txt_file.printSchema()
json_file.printSchema()
将所有内容整合在一起:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PySpark DataFrame From External Files').getOrCreate()
csv_file = spark.read.csv('Fish.csv', sep = ',', inferSchema = True, header = True)
txt_file = spark.read.text("example.txt")
json_file = spark.read.json("sample.json", multiLine=True)
print(type(csv_file))
csv_file.printSchema()
PySpark数据框架到Pandas数据框架:
也可以将PySpark数据框架转换为Pandas数据框架。这使得可以在数据框架上使用Pandas方法,这可能非常有用。
df = csv_file.toPandas()
使用.toPandas()方法将PySpark数据框架转换为Pandas数据框架。这将返回一个Pandas数据框架。让检查新数据框架的数据类型以确认操作。
type(df)
一次性读取多个文件:
可以通过传递文件路径列表作为字符串类型,在.read()方法中一次性读取多个文件。例如:
files = ['Fish.csv', 'Salary.csv']
df = spark.read.csv(files, sep = ',' , inferSchema=True, header=True)
这将创建并分配一个PySpark数据框架到变量df。在一次性读取多个文件时,最好考虑具有相同模式的文件,因为联合数据框架不会增加任何意义。
在本文中,学习了PySpark数据框架以及创建它们的两种方法。.parallelize()是一个不错的选择,除了它需要比.read()方法更多的努力。.read()方法在要快速读取CSV文件时非常方便。一旦转换为PySpark数据框架,就可以在其上执行多种操作。其中一个广泛使用的应用程序是使用PySpark SQL进行查询。还查看了一些在执行PySpark任务时有用的附加方法。
在LinkedIn上与联系。
对于任何建议或文章请求,可以给发邮件。