使用PySpark MLIB实现泰坦尼克号生存预测

在这篇文章中,将探讨如何使用PySparkMLIB库来预测著名的机器学习问题——泰坦尼克号生存预测。对于这些机器学习爱好者来说,这个数据集并不陌生,将从数据预处理开始,处理分类变量(转换它们),并使用MLIB构建和评估模型。

环境设置

首先,需要设置一个环境来启动Spark会话,这将使能够使用所有所需的库来进行预测。以下是如何设置Spark会话的代码示例:

from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Titanic_project').getOrCreate()

在这段代码中,首先导入了SparkSession对象,然后使用builder函数构建会话,并使用getOrCreate()方法创建会话。通过对象名称,可以查看会话的相关信息,如版本、应用名称和主位置。

读取数据集

接下来,将读取泰坦尼克号数据集。在编写代码之前,让先看看数据集包含哪些特征:

  • 乘客ID:分配给每位乘客的唯一ID。
  • 生存:模型将预测的目标列。
  • 乘客等级:记录旅行乘客的不同等级。
  • 姓名:乘客的姓名。
  • 性别:乘客的性别。
  • 年龄:乘客的年龄。
  • 兄弟姐妹和配偶数:乘客的兄弟姐妹和配偶数量。
  • 父母和子女数:乘客的父母和子女数量。
  • 票号:分配给票的唯一编号。
  • 票价:根据不同标准(如等级和设施)确定的泰坦尼克号票的价格。
  • 舱位:分配给每位乘客的舱位编号。
  • 登船港口:乘客将从哪个港口登船?
data = spark.read.csv('titanic.csv',inferSchema=True,header=True)

在这里,使用PySpark的read.csv方法读取数据集。inferSchema参数设置为True,意味着它将返回原始数据中每列的真实数据类型,这是一个好习惯,可以让更真实地了解数据集。Header参数设置为True,表示将数据集的第一行作为DataFrame的标题,否则原始标题也将被视为记录。

处理空值

处理空值有多种方法。可以采用中心趋势方法(如均值/中位数/众数)来填充空值,也可以简单地删除所有空值。在这里,选择删除所有空值,因为数据集中空值并不多,因此一次性删除所有空值是更好的选择。

my_final_data = my_cols.na.drop()

使用NA.drop()方法从特征DataFrame中删除所有NA值,然后将其分配给一个新变量,这将是最终的数据。

处理分类列

需要处理当前处于字符串状态的分类列,因为字符串类型不为任何机器学习算法所接受,所以需要对其进行处理。在PySpark中,需要执行一系列操作/步骤来转换必要的特征列。

from pyspark.ml.feature import (VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer) gender_indexer = StringIndexer(inputCol='Sex',outputCol='SexIndex') gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec') embark_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkIndex') embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec') assembler = VectorAssembler(inputCols=['Pclass', 'SexVec', 'Age', 'SibSp', 'Parch', 'Fare', 'EmbarkVec'],outputCol='features')

如上所述,需要Vector assembler和One Hot编码技术来进行转换,因此从PySpark的ml.feature库中导入了它们。StringIndexer负责将字符串类型转换为分类类型,而One Hot Encoder将每个分类值转换为其二进制值,即0或1。重复对“Embarked”列进行与“Gender”列相同的处理。最后,Vector Assembler将所有预处理的特征列组合在一起,并去除不需要的列。

模型开发

现在可以进入模型开发阶段,首先需要导入机器学习算法。对于这个问题,需要预测分类数据,因此需要使用分类机器学习算法,即逻辑回归。

from pyspark.ml.classification import LogisticRegression

在参数中传递特征列和标签(独立)列。

PySpark中的Pipelines

有时,处理整个模型开发过程可能会变得复杂。PySpark中的Pipelines可以帮助维护执行周期流程,以便每个步骤都能在其最佳阶段执行,既不提前也不推迟。

from pyspark.ml import Pipeline log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived') pipeline = Pipeline(stages=[gender_indexer,embark_indexer,gender_encoder,embark_encoder,assembler,log_reg_titanic]) train_titanic_data, test_titanic_data = my_final_data.randomSplit([0.7,.3]) fit_model = pipeline.fit(train_titanic_data) results = fit_model.transform(test_titanic_data)

首先,导入了Pipeline模块。然后,使用逻辑回归方法开发模型,并在参数中传递特征列和标签列。接下来,使用Pipeline方法,可以看到在stages部分,所有预处理步骤都按顺序排列。然后,使用randomSplit()方法,将最终数据集分成70%的训练集和30%的测试集。最后,首先使用训练数据拟合Pipeline,然后使用Pipeline模型转换测试数据。

现在进入了模型评估阶段,这意味着开发已经完成,现在应该评估它。希望模型能够根据测试数据表现出良好的结果,即具有较高的准确率。

from pyspark.ml.evaluation import BinaryClassificationEvaluator my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Survived') results.select('Survived','prediction').show()
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485