使用PySpark在Google Colab中进行数据探索

对于数据科学家来说,当涉及到处理大型数据集和运行复杂模型时,Google Colab无疑是一个救星。而对于数据工程师来说,PySpark可以简单地被描述为一个半神!那么,当将这两个各自领域的佼佼者结合起来时,会发生什么呢?将得到一个几乎完美的解决方案,用于解决所有数据科学和机器学习问题!在本文中,将看到如何在Google Colaboratory笔记本中运行PySpark,并且执行一些大多数数据科学问题中常见的基本数据探索任务。那么,让开始吧!

目录

  • 连接Google Drive到Colab
  • 从Google Drive读取数据
  • Google Colab中设置PySpark
  • 将数据加载到PySpark
  • 理解数据
  • 使用PySpark DataFrame进行数据探索
  • 显示列详情
  • 显示行
  • DataFrame中的行数
  • 显示特定列
  • 描述列
  • 分类列的不同值
  • 使用Groupby聚合
  • 计数和移除空值
  • 保存到文件

连接Google Drive到Colab

在Colab工作时,首先想做的一件事就是将Google Drive挂载到Colab。这将使能够在Colab笔记本中访问Drive上的任何目录。

from google.colab import drive drive.mount('/content/drive')

完成此操作后,下一步显然是加载数据。

从Drive读取数据

现在,假设将使用足够大的数据集。因此,将数据上传到Drive的最佳方式是以zip格式。只需将zip文件夹拖放到Drive上的任何想要的目录中。解压这些数据一点也不麻烦。只需提供zip文件夹的路径以及!unzip命令即可。

!unzip "/content/drive/My Drive/AV articles/PySparkon Colab/black_friday_train.zip"

如果不确定文件夹的确切位置,可以从Colab的侧边栏中查看。

Google Colab中设置PySpark

Spark是用Scala编程语言编写的,需要Java虚拟机(JVM)来运行。因此,第一个任务是下载Java。

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

接下来,将从此处安装Apache Spark 3.0.1与Hadoop 2.7。

!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

现在,只需要解压该文件夹。

!tar xf spark-3.0.1-bin-hadoop2.7.tgz

注意 - 在撰写本文时,3.0.1是Apache Spark的最新版本。但Spark发展非常迅速。因此,如果在执行此代码时有更新的Spark版本,那么需要将3.0.1替换为最新版本。

需要安装的最后一项是findspark库。它将在系统中定位Spark并将其作为常规库导入。

!pip install -q findspark

现在已经在Colab中安装了所有必要的依赖项,是时候设置环境路径了。这将使能够在Colab环境中运行Pyspark。

import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

现在,需要在系统中定位Spark。为此,导入findspark并使用findspark.init()方法。

import findspark findspark.init()

现在,可以从pyspark.sql导入SparkSession并创建一个SparkSession,这是Spark的入口点。

from pyspark.sql import SparkSession spark = SparkSession.builder .master("local") .appName("Colab") .config('spark.ui.port', '4050') .getOrCreate()

最后,打印SparkSession变量。如果一切顺利,应该能够看到上面的输出。

加载数据到PySpark

首先,需要加载数据集。将使用read.csv模块。提供的inferSchema参数将使Spark能够自动确定每列的数据类型,但它必须先过一次数据。如果不想这样做,那么可以改为在schema参数中显式提供模式。

df = spark.read.csv("train.csv", header=True, inferSchema=True)

这将创建一个Spark dataframe。

理解数据

这里有来自DataHack平台的Black Friday数据集。它提供了过去一个月零售公司各种客户的购买摘要。提供了客户人口统计、购买详情和总购买金额。目标是预测每个客户对各种产品的购买金额。

现在是使用PySpark dataframe函数探索数据的时候了。在这个过程中,将不断将其与Pandas dataframe进行比较。

显示列详情

探索性数据分析的第一步是检查dataframe的模式。这将给一个关于dataframe中的列及其数据类型的鸟瞰图。

df.printSchema()

显示行

现在肯定想要查看实际数据。就像在Pandas DataFrame中有df.head()函数一样,这里有show()函数。可以在括号内提供想要打印的行数。

df.show(5)

DataFrame中的行数

如果想要知道dataframe中的总行数,只需使用count()函数。

df.count()

显示特定列

有时可能想要查看dataframe中的一些特定列。为此,可以利用Spark SQL的能力。使用select()函数,可以提到任何想要查看的列。

df.select("User_ID","Gender","Age","Occupation").show(5)

描述列

通常当处理数值特征时,想要查看关于dataframe的统计信息。describe()函数最适合此类目的。它与Pandas的describe函数非常相似,但统计值要少得多,字符串列也被描述。

df.describe().show()

分类列的不同值

当想要确定dataframe中分类列的唯一值时,distinct()将派上用场。

df.select("City_Category").distinct().show()

使用Groupby聚合

可以使用groupBy函数对dataframe的列值进行分组,然后对它们应用聚合函数以获得一些有用的见解。在这里,可以将dataframe中的各种城市类别进行分组,并确定每个城市类别的总购买量。为此,必须使用来自Spark SQL函数模块的sum聚合函数。

from pyspark.sql import functions as F df.groupBy("City_Category").agg(F.sum("Purchase")).show()

计数和移除空值

现在都知道现实世界的数据不会忽视缺失值。因此,总是检查缺失值并在存在时移除它们是明智的。

df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

有一些列包含空值。因此,最好用一些值替换它们。根据数据集,Product Category列中的空值可能意味着用户没有购买该产品。因此,最好用0替换空值。

将使用fillna()函数替换空值。由于Spark dataframes是不可变的,需要将结果存储在一个新的dataframe中。

df = df.fillna({'Product_Category_2':0, 'Product_Category_3':0})

可以再次检查空值以验证更改。

df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

完美!dataframe中再也没有空值了。

保存到文件

最后,在完成所有分析后,如果想将结果保存到一个新的CSV文件中,可以使用write.csv()函数。

df.write.csv("/content/drive/My Drive/AV articles/PySpark on Colab/preprocessed_data")

但这里有一个问题。不会只保存一个CSV,而是根据dataframe的分区数量保存多个。因此,如果有2个分区,那么每个分区将保存两个CSV文件。

df.rdd.getNumPartitions() # Spark df to Pandas df df_pd = df.toPandas() # Store result df_pd.to_csv("/content/drive/My Drive/AV articles/PySparkon Colab/pandas_preprocessed_data.csv")
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485