在这篇文章中,将探讨如何使用PySpark的MLIB库来预测著名的机器学习问题——泰坦尼克号生存预测。对于这些机器学习爱好者来说,这个数据集并不陌生,将从数据预处理开始,处理分类变量(转换它们),并使用MLIB构建和评估模型。
首先,需要设置一个环境来启动Spark会话,这将使能够使用所有所需的库来进行预测。以下是如何设置Spark会话的代码示例:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Titanic_project').getOrCreate()
在这段代码中,首先导入了SparkSession对象,然后使用builder函数构建会话,并使用getOrCreate()方法创建会话。通过对象名称,可以查看会话的相关信息,如版本、应用名称和主位置。
接下来,将读取泰坦尼克号数据集。在编写代码之前,让先看看数据集包含哪些特征:
data = spark.read.csv('titanic.csv',inferSchema=True,header=True)
在这里,使用PySpark的read.csv方法读取数据集。inferSchema参数设置为True,意味着它将返回原始数据中每列的真实数据类型,这是一个好习惯,可以让更真实地了解数据集。Header参数设置为True,表示将数据集的第一行作为DataFrame的标题,否则原始标题也将被视为记录。
处理空值有多种方法。可以采用中心趋势方法(如均值/中位数/众数)来填充空值,也可以简单地删除所有空值。在这里,选择删除所有空值,因为数据集中空值并不多,因此一次性删除所有空值是更好的选择。
my_final_data = my_cols.na.drop()
使用NA.drop()方法从特征DataFrame中删除所有NA值,然后将其分配给一个新变量,这将是最终的数据。
需要处理当前处于字符串状态的分类列,因为字符串类型不为任何机器学习算法所接受,所以需要对其进行处理。在PySpark中,需要执行一系列操作/步骤来转换必要的特征列。
from pyspark.ml.feature import (VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer)
gender_indexer = StringIndexer(inputCol='Sex',outputCol='SexIndex')
gender_encoder = OneHotEncoder(inputCol='SexIndex',outputCol='SexVec')
embark_indexer = StringIndexer(inputCol='Embarked',outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex',outputCol='EmbarkVec')
assembler = VectorAssembler(inputCols=['Pclass', 'SexVec', 'Age', 'SibSp', 'Parch', 'Fare', 'EmbarkVec'],outputCol='features')
如上所述,需要Vector assembler和One Hot编码技术来进行转换,因此从PySpark的ml.feature库中导入了它们。StringIndexer负责将字符串类型转换为分类类型,而One Hot Encoder将每个分类值转换为其二进制值,即0或1。重复对“Embarked”列进行与“Gender”列相同的处理。最后,Vector Assembler将所有预处理的特征列组合在一起,并去除不需要的列。
现在可以进入模型开发阶段,首先需要导入机器学习算法。对于这个问题,需要预测分类数据,因此需要使用分类机器学习算法,即逻辑回归。
from pyspark.ml.classification import LogisticRegression
在参数中传递特征列和标签(独立)列。
有时,处理整个模型开发过程可能会变得复杂。PySpark中的Pipelines可以帮助维护执行周期流程,以便每个步骤都能在其最佳阶段执行,既不提前也不推迟。
from pyspark.ml import Pipeline
log_reg_titanic = LogisticRegression(featuresCol='features',labelCol='Survived')
pipeline = Pipeline(stages=[gender_indexer,embark_indexer,gender_encoder,embark_encoder,assembler,log_reg_titanic])
train_titanic_data, test_titanic_data = my_final_data.randomSplit([0.7,.3])
fit_model = pipeline.fit(train_titanic_data)
results = fit_model.transform(test_titanic_data)
首先,导入了Pipeline模块。然后,使用逻辑回归方法开发模型,并在参数中传递特征列和标签列。接下来,使用Pipeline方法,可以看到在stages部分,所有预处理步骤都按顺序排列。然后,使用randomSplit()方法,将最终数据集分成70%的训练集和30%的测试集。最后,首先使用训练数据拟合Pipeline,然后使用Pipeline模型转换测试数据。
现在进入了模型评估阶段,这意味着开发已经完成,现在应该评估它。希望模型能够根据测试数据表现出良好的结果,即具有较高的准确率。
from pyspark.ml.evaluation import BinaryClassificationEvaluator
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='Survived')
results.select('Survived','prediction').show()