Apache Hive是一个构建在ApacheHadoop之上的数据仓库软件项目,它提供了一个类似于SQL的接口来查询存储在各种数据库和文件系统中的数据,这些数据库和文件系统与Hadoop集成。Hadoop用于以分布式方式存储数据,并将其分割成更小的块以提高访问和处理数据的速度。Hive使得在HDFS(Hadoop分布式文件系统,Hadoop的分布式存储空间)上查询大数据变得更加容易。Hive查询语言(HQL)与SQL非常相似,但它是为处理海量数据而设计的。因此,Hive和关系型数据库在属性上存在差异。
Hive与关系型数据库的关键差异包括:
本文旨在探讨在实现过程中出现的更多差异。将重点介绍在PySpark代码中处理分区表的一个案例,在这个案例中遇到了问题,并且没有从在线内容中获得太多帮助。当在代码中处理表时,通常涉及到批量处理。
在数据库中存储数据的正常处理过程是在第一次写入时“创建”表,然后在连续写入时“插入到”已创建的表中。这两个步骤在Spark的批量作业中进行了解释。
假设想从Spark数据框df中创建一个Hive表。必须指定数据存储的格式。它可以是文本、ORC、parquet等。这里使用了parquet格式(一种列式压缩格式)。还必须提到Hive表的名称。仅提供表名,数据文件将存储在默认的Hive目录中,连同元数据。这些类型的表被称为管理表。当提到目录时,数据单独存储在提到的路径中,而元数据仍然在默认路径中。当提供目录时,那么Hive表被称为外部表。在这种情况下,删除表不会影响数据文件。
df.write.format('parquet').option('path', table_dir).saveAsTable(db_name + '.' + table_name)
上述代码是创建Hive表的命令。创建了一个外部表,并将数据文件存储为parquet格式。
只能对已存在的Hive表进行插入操作。
df.coalesce(10).write.format('parquet').insertInto(db_name + '.' + table_name)
上述代码是向Hive表中插入数据的命令。不需要指定表的路径,只需提供表名即可。
分区是将大量数据分割成多个较小的块,以便于查询和更快的处理。分区通常在具有低基数的列上进行,因为每个分区列的唯一值都会创建一个分区。
例如,在批量处理中,加载日期通常是分区列,因为每天的写入可以写入不同的文件夹,而不会干扰前一天的加载。
假设处理的是零售商店数据。批量处理涉及为零售商的每个分支机构的每日账单详情加载表。
这个表可以创建两个分区列——(1)load_date和(2)branch_id。这样,每个分支机构将为每天有一个分区。加载的表位置将如下所示。在表创建中提到的路径中,将为所有分区列的每个分区值创建单独的文件夹,并且以相同的层次顺序创建。
当分区是在硬编码值上创建时,那么就是静态分区。当分区是在列值上创建时,称为动态分区。
在使用Spark作业的动态阈值功能时,需要设置几个参数。
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
df.coalesce(10).write.mode('overwrite').partitionBy('load_date', 'branch_id').format('parquet').option('path', table_dir).saveAsTable(db_name + '.' + table_name)
上述代码是在创建分区Hive表时使用的命令。在批量处理期间,第一次运行时将创建表。在后续运行中,它仍然能够将数据加载到具有相同表名的新分区中。如果需要重新加载相同日期的数据,使用覆盖模式将仅覆盖相应的重新加载分区。为了避免这种情况,可以使用追加模式。
关键点:
df.coalesce(10).write.mode('overwrite').format('parquet').insertInto(db_name + '.' + table_name)
这是创建插入语句的理想方式。
再次强调三个关键点:
为了避免这种情况,必须在创建过程中交换df中的列顺序,以便在创建时将分区列移动到最后。