Apache Flink 大数据处理框架介绍

Apache Flink是一个大数据处理框架,它允许程序员以高效且可扩展的方式处理海量数据。本文将介绍Flink Java API中的一些基本概念和标准数据转换,并探讨其API的流畅风格,这使得与Flink的核心结构——分布式数据集合——的协作变得简单。首先,将查看Flink的DataSet API转换,并使用它们来实现一个单词计数程序。然后,将简要了解Flink的DataStream API,它允许实时处理事件流。

Maven依赖

要开始使用Flink,需要添加Maven依赖到项目中。以下是链接到java和flink-test-utils库的依赖配置:

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_2.10</artifactId> <version>1.2.0</version> <scope>test</scope> </dependency>

基本API概念

在使用Flink时,需要了解与其API相关的几个概念:提供了多种数据转换函数,包括过滤、映射、连接、分组和聚合。Flink中的sink操作启动流的执行以产生程序的期望结果,例如将结果保存到文件系统或打印到标准输出。Flink转换是惰性的,直到调用sink操作时才执行。

API有两种操作模式,即批处理和实时处理。如果处理的是有限的数据源,可以在批处理模式下使用DataSet API。要处理无限的实时数据流,必须使用DataStream API。

DataSet API转换

Flink程序的入口点是ExecutionEnvironment类的实例——它定义了程序执行的上下文。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

注意,在本地机器上运行应用程序将在本地JVM上进行处理。如果想在一组机器的集群上开始处理,需要安装Apache Flink并相应地配置ExecutionEnvironment。

创建数据集

需要为程序提供数据以执行数据转换。

DataSet amounts = env.fromElements(1, 29, 40, 50);

可以从多个源创建DataSet,例如Apache Kafka、CSV、文件或几乎任何其他数据源。

过滤和规约

假设想要过滤掉高于某个阈值的数字,然后将它们全部求和。可以使用filter()和reduce()转换来实现这一点:

int threshold = 30; List collect = amounts .filter(a -> a > threshold) .reduce((integer, t1) -> integer + t1) .collect();

collect()方法是sink操作,它启动实际的数据转换。

映射

假设有一个Person对象的DataSet:

private static class Person { private int age; private String name; // 标准构造函数/getter/setter }

接下来,从这些对象创建一个DataSet:

DataSet personDataSource = env.fromCollection( Arrays.asList( new Person(23, "Tom"), new Person(75, "Michael") ));

假设只想从每个集合对象中提取年龄字段。可以使用map()转换只获取Person类的某些字段:

List ages = personDataSource .map(p -> p.age) .collect();

连接

当有两个数据集时,可能想要在某个id字段上将它们连接起来。可以使用join()转换来实现这一点。

Tuple3 address = new Tuple3(1, "5th Avenue", "London"); DataSet<Tuple3> addresses = env.fromElements(address); Tuple2 firstTransaction = new Tuple2(1, "Transaction_1"); Transaction DataSet<Tuple2> = env.fromElements(firstTransaction, new Tuple2(12, "Transaction_2"));

两个元组中的第一个字段都是Integer类型,是想要连接两个数据集的id字段。

要执行实际的连接逻辑,需要为地址和交易实现KeySelector接口:

private static class IdKeySelectorTransaction implements KeySelector<Tuple2, Integer> { @Override public Integer getKey(Tuple2 value) { return value.f0; } } private static class IdKeySelectorAddress implements KeySelector<Tuple3, Integer> { @Override public Integer getKey(Tuple3 value) { return value.f0; } }

每个选择器只返回要执行连接的操作字段。

排序

假设有以下Tuple2集合:

Tuple2 secondPerson = new Tuple2(4, "Tom"); Tuple2 thirdPerson = new Tuple2(5, "Scott"); Tuple2 fourthPerson = new Tuple2(200, "Michael"); Tuple2 firstPerson = new Tuple2(1, "Jack"); DataSet<Tuple2> transactions = env.fromElements( fourthPerson, secondPerson, thirdPerson, firstPerson);

如果想按元组的第一个字段对这个集合进行排序,可以使用sortPartitions()转换:

List<Tuple2> sorted = transactions .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING) .collect();

单词计数

单词计数问题通常用来展示大数据框架的处理能力。基本解决方案涉及统计文本输入中单词的出现次数。

作为解决方案的第一步,创建一个LineSplitter类,该类将输入分割成标记(单词),为每个标记收集一个Tuple2的键值对。在这些元组中,键是文本中找到的单词,值是一个整数(1)。

public class LineSplitter implements FlatMapFunction<String, Tuple2> { @Override public void flatMap(String value, Collector<Tuple2> out) { Stream.of(value.toLowerCase().split("\W+")) .filter(t -> t.length() > 0) .forEach(token -> out.collect(new Tuple2(token, 1))); } }

在Collector类上调用collect()方法,以将数据向前推送到处理过程中。

接下来也是最后一步是按它们的第一元素(单词)对元组进行分组,然后对第二元素执行求和,以产生单词的出现次数:

public static DataSet<Tuple2> startWordCount(ExecutionEnvironment env, List lines) throws Exception { DataSet text = env.fromCollection(lines); return text.flatMap(new LineSplitter()) .groupBy(0) .aggregate(Aggregations.SUM, 1); }

使用了三种类型的Flink转换:flatMap(),groupBy()和aggregate()。

DataStream API

Apache Flink还通过DataStream API支持事件流处理。如果想要开始消费事件,必须首先使用StreamExecutionEnvironment类:

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

此外,可以使用运行时从各种源创建事件流。它可以是像Apache Kafka这样的消息总线,但在本例中,简单地从几个字符串元素创建一个Feed:

DataStream dataStream = executionEnvironment.fromElements( "This is the first sentence", "This is the second one-word sentence");

可以像在正常的DataSet类中一样对DataStream的每个元素应用转换:

SingleOutputStreamOperator uppercase = text.map(String::toUpperCase);

要触发执行,需要调用一个sink操作,如print(),它只是将转换的结果打印到标准输出,然后是StreamExecutionEnvironment类中的execute()方法:

uppercase.print(); executionEnvironment.execute();

它产生以下输出:

1> THIS IS THE FIRST SENTENCE 2> THIS IS THE SECOND SENTENCE WITH ONE WORD

在处理实时事件流时,有时可能需要对事件进行分组,并对这些事件的窗口应用一些计算。

假设有一个事件流,每个事件都是一个由事件编号和事件发送到系统的timestamp组成的配对,可以容忍事件的顺序错误,但只要它们不超过二十秒的延迟。

在这个例子中,首先创建一个模拟两个事件的流,它们相隔几分钟,并定义一个timestamp extractor来确定延迟阈值:

SingleOutputStreamOperator<Tuple2> inWindow = env.fromElements( new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Tuple2>(Time.seconds(20)) { @Override public long extractTimestamp(Tuple2 element) { return element.f1 * 1000; } });

接下来,定义一个窗口操作,将事件分组到五秒的窗口中,并对这些事件应用转换:

SingleOutputStreamOperator<Tuple2> reduced = inWindow .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .maxBy(0, true); reduced.print(); 1> (15.1491221519)
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485