在大数据时代,数据处理成为了一个重要的课题。PySpark,作为Apache Spark社区为Python用户设计的数据分析工具,因其在处理大规模数据集时的高效性而受到数据科学家的青睐。PySpark结合了Python语言的简洁性和Spark的高效性,允许用户在Python环境中操作RDD(弹性分布式数据集)和DataFrame。本文将探讨PySpark的基本概念、DataFrame操作以及一些实用的PySpark函数。
PySpark是一个强大的数据分析工具,它允许将数据和计算操作分散到多个集群中,从而显著提高性能。与传统的数据加工工具相比,PySpark在数据收集、存储和传输的成本上具有明显优势。在处理现实世界问题时,经常需要处理大量数据,因此像Hadoop、Spark这样的分布式引擎成为了数据科学生态系统中的主要工具。
DataFrame是一个分布式的数据集合,以行的形式组织,每行都有命名的列。简单来说,它类似于关系型数据库中的表或带有列标题的Excel表格。DataFrame主要设计用于处理大规模的结构化或半结构化数据集。
本文将讨论10个最常用和必要的PySpark函数,以实现对结构化数据的高效分析。将使用Google Colab作为IDE进行数据分析。首先,需要在Google Colab中安装PySpark,然后导入pyspark.sql模块,并创建一个SparkSession,这将是Spark SQL API的入口点。
# 安装PySpark
!pip install pyspark
# 导入pyspark
import pyspark
# 导入sparksession
from pyspark.sql import SparkSession
# 创建一个sparksession对象,并提供appName
spark = SparkSession.builder.appName("pysparkdf").getOrCreate()
这个SparkSession对象将与Spark SQL的函数和方法进行交互。现在,通过读取CSV文件来创建一个Spark DataFrame。将使用一个简单的数据集,即Kaggle上可用的80种谷物产品的营养成分数据。
# 通过spark对象读取csv文件创建dataframe
df = spark.read.option("header", "true").csv("/content/cereal.csv")
# 显示df创建的前10行
df.show(10)
这是用于数据分析的DataFrame。现在,打印DataFrame的模式以了解更多关于数据集的信息。
# 打印DataFrame的模式
df.printSchema()
DataFrame包含16个特征或列,每个列都包含字符串类型的值。让开始探讨函数:
select()函数帮助从整个DataFrame中显示所选列的子集,只需要传递所需的列名。让使用select()打印DataFrame的任意三列。
df.select('name', 'mfr', 'rating').show(10)
在输出中,得到了包含三个列name、mfr、rating的DataFrame子集。
withColumn()函数用于操纵列或创建一个新列与现有列。它是一个转换函数,也可以改变任何现有列的数据类型。在DataFrame模式中,看到所有列都是字符串类型。让将calorie列的数据类型更改为整数。
df.withColumn("Calories", df['calories'].cast("Integer")).printSchema()
在模式中,可以看到calories列的数据类型已更改为整数类型。
groupBy()函数用于将数据收集到DataFrame的组中,并允许对分组数据执行聚合函数。这是一个非常常见的数据分析操作,类似于SQL中的groupBy子句。让找出数据集中每种谷物的数量。
df.groupBy("name", "calories").count().show()
orderBy()函数用于根据DataFrame的特定列对整个DataFrame进行排序。它根据列值对DataFrame的行进行排序。默认情况下,它按升序排序。让根据数据集中的protein列对DataFrame进行排序。
df.orderBy("protein").show()
可以看到,整个DataFrame根据protein列进行了排序。
split()用于将DataFrame的字符串列分割成多个列。这个函数通过withColumn()和select()应用于DataFrame。DataFrame的name列包含两个字符串单词的值。让将name列从两个字符串之间的空格分割成两列。
from pyspark.sql.functions import split
df1 = df.withColumn('Name1', split(df['name'], " ").getItem(0)) \
.withColumn('Name2', split(df['name'], " ").getItem(1))
df1.select("name", "Name1", "Name2").show()
在这个输出中,可以看到name列被分割成了两列。
lit()函数用于向DataFrame添加一个新列,该列包含字面量或一些常数值。让添加一个“摄入量”列,该列包含每种谷物的常数值以及相应的谷物名称。
from pyspark.sql.functions import lit
df2 = df.select(col("name"), lit("75 gm").alias("intake quantity"))
df2.show()
在输出中,可以看到创建了一个新的列“摄入量”,它包含每种谷物的摄入量。
when()函数用于根据特定条件显示输出。它评估提供的条件,然后相应地返回值。这是一个SQL函数,支持PySpark按顺序检查多个条件并返回值。这个函数类似于if-then-else和switch语句。让看看哪些谷物富含维生素。
from pyspark.sql.functions import when
df.select("name", when(df.vitamins >= "25", "rich in vitamins")).show()
filter()函数用于根据特定列值过滤行中的数据。例如,可以过滤出含有100卡路里的谷物。
df.filter(df.calories == "100").show()
# 过滤非空值
df.filter(df.name.isNotNull()).show()
# 过滤空值
df.filter(df.name.isNull()).show()