Apache Flink是一个大数据处理框架,它允许程序员以高效且可扩展的方式处理海量数据。本文将介绍Flink Java API中的一些基本概念和标准数据转换,并探讨其API的流畅风格,这使得与Flink的核心结构——分布式数据集合——的协作变得简单。首先,将查看Flink的DataSet API转换,并使用它们来实现一个单词计数程序。然后,将简要了解Flink的DataStream API,它允许实时处理事件流。
要开始使用Flink,需要添加Maven依赖到项目中。以下是链接到java和flink-test-utils库的依赖配置:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.10</artifactId>
<version>1.2.0</version>
<scope>test</scope>
</dependency>
在使用Flink时,需要了解与其API相关的几个概念:提供了多种数据转换函数,包括过滤、映射、连接、分组和聚合。Flink中的sink操作启动流的执行以产生程序的期望结果,例如将结果保存到文件系统或打印到标准输出。Flink转换是惰性的,直到调用sink操作时才执行。
API有两种操作模式,即批处理和实时处理。如果处理的是有限的数据源,可以在批处理模式下使用DataSet API。要处理无限的实时数据流,必须使用DataStream API。
Flink程序的入口点是ExecutionEnvironment类的实例——它定义了程序执行的上下文。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
注意,在本地机器上运行应用程序将在本地JVM上进行处理。如果想在一组机器的集群上开始处理,需要安装Apache Flink并相应地配置ExecutionEnvironment。
需要为程序提供数据以执行数据转换。
DataSet amounts = env.fromElements(1, 29, 40, 50);
可以从多个源创建DataSet,例如Apache Kafka、CSV、文件或几乎任何其他数据源。
假设想要过滤掉高于某个阈值的数字,然后将它们全部求和。可以使用filter()和reduce()转换来实现这一点:
int threshold = 30;
List collect = amounts
.filter(a -> a > threshold)
.reduce((integer, t1) -> integer + t1)
.collect();
collect()方法是sink操作,它启动实际的数据转换。
假设有一个Person对象的DataSet:
private static class Person {
private int age;
private String name;
// 标准构造函数/getter/setter
}
接下来,从这些对象创建一个DataSet:
DataSet personDataSource = env.fromCollection(
Arrays.asList(
new Person(23, "Tom"),
new Person(75, "Michael")
));
假设只想从每个集合对象中提取年龄字段。可以使用map()转换只获取Person类的某些字段:
List ages = personDataSource
.map(p -> p.age)
.collect();
当有两个数据集时,可能想要在某个id字段上将它们连接起来。可以使用join()转换来实现这一点。
Tuple3 address = new Tuple3(1, "5th Avenue", "London");
DataSet<Tuple3> addresses = env.fromElements(address);
Tuple2 firstTransaction = new Tuple2(1, "Transaction_1");
Transaction DataSet<Tuple2> = env.fromElements(firstTransaction, new Tuple2(12, "Transaction_2"));
两个元组中的第一个字段都是Integer类型,是想要连接两个数据集的id字段。
要执行实际的连接逻辑,需要为地址和交易实现KeySelector接口:
private static class IdKeySelectorTransaction implements KeySelector<Tuple2, Integer> {
@Override
public Integer getKey(Tuple2 value) {
return value.f0;
}
}
private static class IdKeySelectorAddress implements KeySelector<Tuple3, Integer> {
@Override
public Integer getKey(Tuple3 value) {
return value.f0;
}
}
每个选择器只返回要执行连接的操作字段。
假设有以下Tuple2集合:
Tuple2 secondPerson = new Tuple2(4, "Tom");
Tuple2 thirdPerson = new Tuple2(5, "Scott");
Tuple2 fourthPerson = new Tuple2(200, "Michael");
Tuple2 firstPerson = new Tuple2(1, "Jack");
DataSet<Tuple2> transactions = env.fromElements(
fourthPerson, secondPerson, thirdPerson, firstPerson);
如果想按元组的第一个字段对这个集合进行排序,可以使用sortPartitions()转换:
List<Tuple2> sorted = transactions
.sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
.collect();
单词计数问题通常用来展示大数据框架的处理能力。基本解决方案涉及统计文本输入中单词的出现次数。
作为解决方案的第一步,创建一个LineSplitter类,该类将输入分割成标记(单词),为每个标记收集一个Tuple2的键值对。在这些元组中,键是文本中找到的单词,值是一个整数(1)。
public class LineSplitter implements FlatMapFunction<String, Tuple2> {
@Override
public void flatMap(String value, Collector<Tuple2> out) {
Stream.of(value.toLowerCase().split("\W+"))
.filter(t -> t.length() > 0)
.forEach(token -> out.collect(new Tuple2(token, 1)));
}
}
在Collector类上调用collect()方法,以将数据向前推送到处理过程中。
接下来也是最后一步是按它们的第一元素(单词)对元组进行分组,然后对第二元素执行求和,以产生单词的出现次数:
public static DataSet<Tuple2> startWordCount(ExecutionEnvironment env, List lines) throws Exception {
DataSet text = env.fromCollection(lines);
return text.flatMap(new LineSplitter())
.groupBy(0)
.aggregate(Aggregations.SUM, 1);
}
使用了三种类型的Flink转换:flatMap(),groupBy()和aggregate()。
Apache Flink还通过DataStream API支持事件流处理。如果想要开始消费事件,必须首先使用StreamExecutionEnvironment类:
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
此外,可以使用运行时从各种源创建事件流。它可以是像Apache Kafka这样的消息总线,但在本例中,简单地从几个字符串元素创建一个Feed:
DataStream dataStream = executionEnvironment.fromElements(
"This is the first sentence",
"This is the second one-word sentence");
可以像在正常的DataSet类中一样对DataStream的每个元素应用转换:
SingleOutputStreamOperator uppercase = text.map(String::toUpperCase);
要触发执行,需要调用一个sink操作,如print(),它只是将转换的结果打印到标准输出,然后是StreamExecutionEnvironment类中的execute()方法:
uppercase.print();
executionEnvironment.execute();
它产生以下输出:
1> THIS IS THE FIRST SENTENCE
2> THIS IS THE SECOND SENTENCE WITH ONE WORD
在处理实时事件流时,有时可能需要对事件进行分组,并对这些事件的窗口应用一些计算。
假设有一个事件流,每个事件都是一个由事件编号和事件发送到系统的timestamp组成的配对,可以容忍事件的顺序错误,但只要它们不超过二十秒的延迟。
在这个例子中,首先创建一个模拟两个事件的流,它们相隔几分钟,并定义一个timestamp extractor来确定延迟阈值:
SingleOutputStreamOperator<Tuple2> inWindow = env.fromElements(
new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Tuple2>(Time.seconds(20)) {
@Override
public long extractTimestamp(Tuple2 element) {
return element.f1 * 1000;
}
});
接下来,定义一个窗口操作,将事件分组到五秒的窗口中,并对这些事件应用转换:
SingleOutputStreamOperator<Tuple2> reduced = inWindow
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.maxBy(0, true);
reduced.print();
1> (15.1491221519)