PySpark数据科学工具详解

大数据时代,数据处理成为了一个重要的课题。PySpark,作为Apache Spark社区为Python用户设计的数据分析工具,因其在处理大规模数据集时的高效性而受到数据科学家的青睐。PySpark结合了Python语言的简洁性和Spark的高效性,允许用户在Python环境中操作RDD(弹性分布式数据集)和DataFrame。本文将探讨PySpark的基本概念、DataFrame操作以及一些实用的PySpark函数。

PySpark简介

PySpark是一个强大的数据分析工具,它允许将数据和计算操作分散到多个集群中,从而显著提高性能。与传统的数据加工工具相比,PySpark在数据收集、存储和传输的成本上具有明显优势。在处理现实世界问题时,经常需要处理大量数据,因此像Hadoop、Spark这样的分布式引擎成为了数据科学生态系统中的主要工具。

PySpark DataFrame操作

DataFrame是一个分布式的数据集合,以行的形式组织,每行都有命名的列。简单来说,它类似于关系型数据库中的表或带有列标题的Excel表格。DataFrame主要设计用于处理大规模的结构化或半结构化数据集。

PySpark函数解析

本文将讨论10个最常用和必要的PySpark函数,以实现对结构化数据的高效分析。将使用Google Colab作为IDE进行数据分析。首先,需要在Google Colab中安装PySpark,然后导入pyspark.sql模块,并创建一个SparkSession,这将是Spark SQL API的入口点。

# 安装PySpark !pip install pyspark # 导入pyspark import pyspark # 导入sparksession from pyspark.sql import SparkSession # 创建一个sparksession对象,并提供appName spark = SparkSession.builder.appName("pysparkdf").getOrCreate()

这个SparkSession对象将与Spark SQL的函数和方法进行交互。现在,通过读取CSV文件来创建一个Spark DataFrame。将使用一个简单的数据集,即Kaggle上可用的80种谷物产品的营养成分数据。

# 通过spark对象读取csv文件创建dataframe df = spark.read.option("header", "true").csv("/content/cereal.csv") # 显示df创建的前10行 df.show(10)

这是用于数据分析的DataFrame。现在,打印DataFrame的模式以了解更多关于数据集的信息。

# 打印DataFrame的模式 df.printSchema()

DataFrame包含16个特征或列,每个列都包含字符串类型的值。让开始探讨函数:

select()函数帮助从整个DataFrame中显示所选列的子集,只需要传递所需的列名。让使用select()打印DataFrame的任意三列。

df.select('name', 'mfr', 'rating').show(10)

在输出中,得到了包含三个列name、mfr、rating的DataFrame子集。

withColumn()函数用于操纵列或创建一个新列与现有列。它是一个转换函数,也可以改变任何现有列的数据类型。在DataFrame模式中,看到所有列都是字符串类型。让将calorie列的数据类型更改为整数。

df.withColumn("Calories", df['calories'].cast("Integer")).printSchema()

在模式中,可以看到calories列的数据类型已更改为整数类型。

groupBy()函数用于将数据收集到DataFrame的组中,并允许对分组数据执行聚合函数。这是一个非常常见的数据分析操作,类似于SQL中的groupBy子句。让找出数据集中每种谷物的数量。

df.groupBy("name", "calories").count().show()

orderBy()函数用于根据DataFrame的特定列对整个DataFrame进行排序。它根据列值对DataFrame的行进行排序。默认情况下,它按升序排序。让根据数据集中的protein列对DataFrame进行排序。

df.orderBy("protein").show()

可以看到,整个DataFrame根据protein列进行了排序。

split()用于将DataFrame的字符串列分割成多个列。这个函数通过withColumn()和select()应用于DataFrame。DataFrame的name列包含两个字符串单词的值。让将name列从两个字符串之间的空格分割成两列。

from pyspark.sql.functions import split df1 = df.withColumn('Name1', split(df['name'], " ").getItem(0)) \ .withColumn('Name2', split(df['name'], " ").getItem(1)) df1.select("name", "Name1", "Name2").show()

在这个输出中,可以看到name列被分割成了两列。

lit()函数用于向DataFrame添加一个新列,该列包含字面量或一些常数值。让添加一个“摄入量”列,该列包含每种谷物的常数值以及相应的谷物名称。

from pyspark.sql.functions import lit df2 = df.select(col("name"), lit("75 gm").alias("intake quantity")) df2.show()

在输出中,可以看到创建了一个新的列“摄入量”,它包含每种谷物的摄入量。

when()函数用于根据特定条件显示输出。它评估提供的条件,然后相应地返回值。这是一个SQL函数,支持PySpark按顺序检查多个条件并返回值。这个函数类似于if-then-else和switch语句。让看看哪些谷物富含维生素。

from pyspark.sql.functions import when df.select("name", when(df.vitamins >= "25", "rich in vitamins")).show()

filter()函数用于根据特定列值过滤行中的数据。例如,可以过滤出含有100卡路里的谷物。

df.filter(df.calories == "100").show() # 过滤非空值 df.filter(df.name.isNotNull()).show() # 过滤空值 df.filter(df.name.isNull()).show()
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485