Hive与Hadoop的集成及PySpark中的分区表处理

Apache Hive是一个构建在ApacheHadoop之上的数据仓库软件项目,它提供了一个类似于SQL的接口来查询存储在各种数据库和文件系统中的数据,这些数据库和文件系统与Hadoop集成。Hadoop用于以分布式方式存储数据,并将其分割成更小的块以提高访问和处理数据的速度。Hive使得在HDFS(Hadoop分布式文件系统,Hadoop的分布式存储空间)上查询大数据变得更加容易。Hive查询语言(HQL)与SQL非常相似,但它是为处理海量数据而设计的。因此,Hive和关系型数据库在属性上存在差异。

Hive与关系型数据库的关键差异包括:

  • 模式可以变化;
  • 分区存在,它是将数据存储到更小块文件中以快速访问和检索的关键方法;
  • 完整性约束,如主键和外键,不存在;
  • 仅支持对ORC格式的Hive表进行更新和删除操作。否则,操作是在分区级别进行的——创建新分区、追加到分区、覆盖分区、删除分区。

本文旨在探讨在实现过程中出现的更多差异。将重点介绍在PySpark代码中处理分区表的一个案例,在这个案例中遇到了问题,并且没有从在线内容中获得太多帮助。当在代码中处理表时,通常涉及到批量处理。

在数据库中存储数据的正常处理过程是在第一次写入时“创建”表,然后在连续写入时“插入到”已创建的表中。这两个步骤在Spark的批量作业中进行了解释。

PySpark脚本中创建Hive表

假设想从Spark数据框df中创建一个Hive表。必须指定数据存储的格式。它可以是文本、ORC、parquet等。这里使用了parquet格式(一种列式压缩格式)。还必须提到Hive表的名称。仅提供表名,数据文件将存储在默认的Hive目录中,连同元数据。这些类型的表被称为管理表。当提到目录时,数据单独存储在提到的路径中,而元数据仍然在默认路径中。当提供目录时,那么Hive表被称为外部表。在这种情况下,删除表不会影响数据文件。

df.write.format('parquet').option('path', table_dir).saveAsTable(db_name + '.' + table_name)

上述代码是创建Hive表的命令。创建了一个外部表,并将数据文件存储为parquet格式。

向Hive表中插入数据

只能对已存在的Hive表进行插入操作。

df.coalesce(10).write.format('parquet').insertInto(db_name + '.' + table_name)

上述代码是向Hive表中插入数据的命令。不需要指定表的路径,只需提供表名即可。

分区表

分区是将大量数据分割成多个较小的块,以便于查询和更快的处理。分区通常在具有低基数的列上进行,因为每个分区列的唯一值都会创建一个分区。

例如,在批量处理中,加载日期通常是分区列,因为每天的写入可以写入不同的文件夹,而不会干扰前一天的加载。

假设处理的是零售商店数据。批量处理涉及为零售商的每个分支机构的每日账单详情加载表。

这个表可以创建两个分区列——(1)load_date和(2)branch_id。这样,每个分支机构将为每天有一个分区。加载的表位置将如下所示。在表创建中提到的路径中,将为所有分区列的每个分区值创建单独的文件夹,并且以相同的层次顺序创建。

当分区是在硬编码值上创建时,那么就是静态分区。当分区是在列值上创建时,称为动态分区。

在使用Spark作业的动态阈值功能时,需要设置几个参数。

spark.conf.set("hive.exec.dynamic.partition", "true") spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") df.coalesce(10).write.mode('overwrite').partitionBy('load_date', 'branch_id').format('parquet').option('path', table_dir).saveAsTable(db_name + '.' + table_name)

上述代码是在创建分区Hive表时使用的命令。在批量处理期间,第一次运行时将创建表。在后续运行中,它仍然能够将数据加载到具有相同表名的新分区中。如果需要重新加载相同日期的数据,使用覆盖模式将仅覆盖相应的重新加载分区。为了避免这种情况,可以使用追加模式。

关键点:

  • 如果在并行运行多个实例的批量处理,并尝试同时写入表,将会出现错误。因为saveAsTable命令用于创建表,即使尝试访问不同的分区(例如,每个作业实例都是针对不同的branch_id),多个并行执行创建相同表也会出错。
  • 当查看insertInto的帮助时,这是纯粹的插入命令,只要它们处理不同的分区,平行运行就不会影响表。
  • 由于表已经创建,不需要指定表路径和分区列。
df.coalesce(10).write.mode('overwrite').format('parquet').insertInto(db_name + '.' + table_name)

这是创建插入语句的理想方式。

再次强调三个关键点:

  • 在表创建期间,数据框中的分区列将作为表中的最后列创建。这与数据框中的列顺序无关。
  • 插入不允许指定分区列。自然地,在插入过程中,列顺序不会被更改,并将按原样输入到表中。创建的parquet文件将具有与数据框df相同的列顺序。如果分区列不是最后几个,将注意到加载到表中的列值发生了交换。
  • 即使在交换中存在数据类型冲突,也很少看到错误。parquet文件将在插入过程中以错误的顺序加载。在显示Hive表时,冲突将只是空的。

为了避免这种情况,必须在创建过程中交换df中的列顺序,以便在创建时将分区列移动到最后。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485