Apache Pig 是一个高级平台,用于处理和分析大量数据。它运行在Hadoop上,利用 Hadoop 分布式文件系统(HDFS)和 MapReduce 处理系统。Pig 使用一个引擎来执行数据流,并且包含了一个高级语言 Pig Latin 来表达这些数据流。Pig 的输入是 Pig Latin,它将被转换成 MapReduce 作业。Pig 利用 MapReduce 的技巧来进行所有的数据处理,将Pig Latin脚本组合成一个或多个 MapReduce 作业来执行。
Apache Pig 由 Yahoo 设计,因为它易于学习和使用,使得Hadoop变得相当简单。Pig 的开发是因为 MapReduce 编程变得越来越困难,许多 MapReduce 用户对声明式语言感到不舒服。现在,Pig 是 Apache 下的一个开源项目。
Pig 的特点
Pig 拥有丰富的操作符集合,例如 join、sort 等。它易于编程,因为它类似于 SQL。Apache Pig将任务自动转换为 MapReduce 作业,程序员只需要关注语言的语义,而不需要关注 MapReduce。可以使用 Pig 创建自己的函数。其他编程语言中的函数,如 Java,可以嵌入到 Pig Latin 脚本中。Apache Pig 可以处理各种类型的数据,如结构化、非结构化和半结构化数据,并将结果存储在 HDFS 中。
Pig 与 MapReduce 的对比
Pig 相较于 MapReduce 有多个优势。Apache Pig是一种数据流语言,意味着它允许用户描述如何从一到多个输入中读取数据,进行处理,然后并行地存储到一到多个输出中。而 MapReduce 则是一种编程风格。Apache Pig 是一种高级语言,而 MapReduce 是编译的 Java 代码。Pig 执行 join 和多文件操作的语法非常直观且简单,类似于 SQL。如果需要编写 join 操作,MapReduce 代码会变得复杂。Apache Pig 的学习曲线非常小,运行 MapReduce 代码必须具备 Java 和 MapReduce 库的专业知识。Apache Pig 脚本可以完成多行 MapReduce 代码的操作,而 MapReduce 代码执行相同操作需要更多的代码行。Apache Pig 易于调试和测试,而 MapReduce 程序需要大量的编码和测试时间。Pig Latin的成本比 MapReduce 低。
Pig 架构
Pig 架构如下。Pig 位于Hadoop之上。Pig 脚本可以在 Grunt shell 或 Pig 服务器上运行。Pig 的执行引擎优化并编译脚本,最终将其转换为 MapReduce 作业。它使用 HDFS 存储 MapReduce 作业之间的中间数据,然后将输出写入 HDFS。
Pig 执行选项
Pig Grunt Shell 命令
Pig 数据类型
Pig 操作符
-- 下面是 student.txt 文件的内容
John,23,Hyderabad
James,45,Hyderabad
Sam,33,Chennai
,56,Delhi
,43,Mumbai
-- LOAD 操作符
-- 从给定的文件系统加载数据
A = LOAD 'student.txt' AS (name: chararray, age: int, city: chararray);
-- 将 student 文件中的数据,列名为‘name’,‘age’,‘city’加载到变量 A 中
-- DUMP 操作符
-- 用于显示关系的内容包括 A
DUMP A
//results
(John,23,Hyderabad)
(James,45,Hyderabad)
(Sam,33,Chennai)
(,56,Delhi)
(,43,Mumbai)
-- STORE 操作符
-- 存储函数将结果保存到文件系统
STORE A into ‘myoutput’ using PigStorage(‘*’);
-- 将 A 中的数据存储到 myoutput 中,用‘*’分隔
DUMP myoutput;
//results
John*23*Hyderabad
James*45*Hyderabad
Sam*33*Chennai
*56*Delhi
*43*Mumbai
-- FILTER 操作符
B = FILTER A by name is not null;
-- FILTER 操作符将根据某些条件过滤表格。这里,name 是 A 中的列。非空值的 name 将存储在变量 B 中
DUMP B;
//results
(John,23,Hyderabad)
(James,45,Hyderabad)
(Sam,33,Chennai)
-- FOREACH GENERATE 操作符
C = FOREACH A GENERATE name, city;
-- FOREACH 操作符用于访问单个记录。这里,将从 A 中获取 name 和 city 列的行,并存储到 C 中
DUMP C
//results
(John,Hyderabad)
(James,Hyderabad)
(Sam,Chennai)
(,Delhi)
(,Mumbai)
-- 有一个 people 文件,包含员工 ID、姓名和小时数字段
001,Rajiv,21
002,siddarth,12
003,Rajesh,22
-- 首先,将这些数据加载到变量 employee 中
employee = Load ‘people’ as (empid, name, hours);
-- 按小时数过滤少于 20 的并存储在 parttime 中
parttime = FILTER employee BY Hours < 20;
-- 按小时数降序排列 parttime 并存储在另一个名为 part_time 的文件中
sorted = ORDER parttime by hours DESC;
STORE sorted INTO ‘part_time’;
DUMP sorted;
DESCRIBE sorted;
//results
(003,Rajesh,22)
(001,Rajiv,21)