对于数据科学家来说,当涉及到处理大型数据集和运行复杂模型时,Google Colab无疑是一个救星。而对于数据工程师来说,PySpark可以简单地被描述为一个半神!那么,当将这两个各自领域的佼佼者结合起来时,会发生什么呢?将得到一个几乎完美的解决方案,用于解决所有数据科学和机器学习问题!在本文中,将看到如何在Google Colaboratory笔记本中运行PySpark,并且执行一些大多数数据科学问题中常见的基本数据探索任务。那么,让开始吧!
在Colab工作时,首先想做的一件事就是将Google Drive挂载到Colab。这将使能够在Colab笔记本中访问Drive上的任何目录。
from google.colab import drive
drive.mount('/content/drive')
完成此操作后,下一步显然是加载数据。
现在,假设将使用足够大的数据集。因此,将数据上传到Drive的最佳方式是以zip格式。只需将zip文件夹拖放到Drive上的任何想要的目录中。解压这些数据一点也不麻烦。只需提供zip文件夹的路径以及!unzip命令即可。
!unzip "/content/drive/My Drive/AV articles/PySparkon Colab/black_friday_train.zip"
如果不确定文件夹的确切位置,可以从Colab的侧边栏中查看。
Spark是用Scala编程语言编写的,需要Java虚拟机(JVM)来运行。因此,第一个任务是下载Java。
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
接下来,将从此处安装Apache Spark 3.0.1与Hadoop 2.7。
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
现在,只需要解压该文件夹。
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
注意 - 在撰写本文时,3.0.1是Apache Spark的最新版本。但Spark发展非常迅速。因此,如果在执行此代码时有更新的Spark版本,那么需要将3.0.1替换为最新版本。
需要安装的最后一项是findspark库。它将在系统中定位Spark并将其作为常规库导入。
!pip install -q findspark
现在已经在Colab中安装了所有必要的依赖项,是时候设置环境路径了。这将使能够在Colab环境中运行Pyspark。
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"
现在,需要在系统中定位Spark。为此,导入findspark并使用findspark.init()方法。
import findspark
findspark.init()
现在,可以从pyspark.sql导入SparkSession并创建一个SparkSession,这是Spark的入口点。
from pyspark.sql import SparkSession
spark = SparkSession.builder
.master("local")
.appName("Colab")
.config('spark.ui.port', '4050')
.getOrCreate()
最后,打印SparkSession变量。如果一切顺利,应该能够看到上面的输出。
首先,需要加载数据集。将使用read.csv模块。提供的inferSchema参数将使Spark能够自动确定每列的数据类型,但它必须先过一次数据。如果不想这样做,那么可以改为在schema参数中显式提供模式。
df = spark.read.csv("train.csv", header=True, inferSchema=True)
这将创建一个Spark dataframe。
这里有来自DataHack平台的Black Friday数据集。它提供了过去一个月零售公司各种客户的购买摘要。提供了客户人口统计、购买详情和总购买金额。目标是预测每个客户对各种产品的购买金额。
现在是使用PySpark dataframe函数探索数据的时候了。在这个过程中,将不断将其与Pandas dataframe进行比较。
显示列详情
探索性数据分析的第一步是检查dataframe的模式。这将给一个关于dataframe中的列及其数据类型的鸟瞰图。
df.printSchema()
显示行
现在肯定想要查看实际数据。就像在Pandas DataFrame中有df.head()函数一样,这里有show()函数。可以在括号内提供想要打印的行数。
df.show(5)
DataFrame中的行数
如果想要知道dataframe中的总行数,只需使用count()函数。
df.count()
显示特定列
有时可能想要查看dataframe中的一些特定列。为此,可以利用Spark SQL的能力。使用select()函数,可以提到任何想要查看的列。
df.select("User_ID","Gender","Age","Occupation").show(5)
描述列
通常当处理数值特征时,想要查看关于dataframe的统计信息。describe()函数最适合此类目的。它与Pandas的describe函数非常相似,但统计值要少得多,字符串列也被描述。
df.describe().show()
分类列的不同值
当想要确定dataframe中分类列的唯一值时,distinct()将派上用场。
df.select("City_Category").distinct().show()
使用Groupby聚合
可以使用groupBy函数对dataframe的列值进行分组,然后对它们应用聚合函数以获得一些有用的见解。在这里,可以将dataframe中的各种城市类别进行分组,并确定每个城市类别的总购买量。为此,必须使用来自Spark SQL函数模块的sum聚合函数。
from pyspark.sql import functions as F
df.groupBy("City_Category").agg(F.sum("Purchase")).show()
计数和移除空值
现在都知道现实世界的数据不会忽视缺失值。因此,总是检查缺失值并在存在时移除它们是明智的。
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()
有一些列包含空值。因此,最好用一些值替换它们。根据数据集,Product Category列中的空值可能意味着用户没有购买该产品。因此,最好用0替换空值。
将使用fillna()函数替换空值。由于Spark dataframes是不可变的,需要将结果存储在一个新的dataframe中。
df = df.fillna({'Product_Category_2':0, 'Product_Category_3':0})
可以再次检查空值以验证更改。
df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()
完美!dataframe中再也没有空值了。
保存到文件
最后,在完成所有分析后,如果想将结果保存到一个新的CSV文件中,可以使用write.csv()函数。
df.write.csv("/content/drive/My Drive/AV articles/PySpark on Colab/preprocessed_data")
但这里有一个问题。不会只保存一个CSV,而是根据dataframe的分区数量保存多个。因此,如果有2个分区,那么每个分区将保存两个CSV文件。
df.rdd.getNumPartitions()
# Spark df to Pandas df
df_pd = df.toPandas()
# Store result
df_pd.to_csv("/content/drive/My Drive/AV articles/PySparkon Colab/pandas_preprocessed_data.csv")