Flink

bigdata

1

Flink——流式计算框架

# 一、Flink是什么

# Flink概念

Flink的官网地址:https://flink.apache.org/ (opens new window)

项目Logo是一个非常具有Apache特色的小松鼠。

Flink是Apache基金会的一个顶级项目,是目前业界公认最好的大数据的流式计算框架。其核心是用JavaScala编写的分布式流数据引擎。

关于Flink,最早起源于Stratosphere项目。在2010-2014年期间,这是一个由德国柏林工业大学联合几所欧洲大学共同参与的研究性项目。2014年开始改名为Flink,并捐献给了Apache基金会进行孵化,最终在2014年12月,Flink成为了Apache软件基金会的顶级项目。并迅速超越了传统的Apache Storm,坐上了流式计算的头把交椅。

在Flink的发展过程中,阿里扮演了非常重要的角色。在2015年,阿里就开始参与了Flink的建设,并且建立了自己的一个分支版本Blink,在阿里体系内得到广泛的使用。到2019年,阿里将自己的Blink版本合并到了Flink的主分支内,为Flink做出了极大的贡献。在官网上同时也提供了中文版的主页。

Flink是一个框架和分布式处理引擎,用于在无边界有边界数据流上进行有状态的计算。

Flink能在所有常见集群环境中运行,并能以内存速度任意规模进行计算。

从官网介绍上可以看到,Flink擅长于处理无界流有界流数据。

  1. 无界流(unbounded stream)有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。

    无界流就是我们通常所说的流式数据。流式数据最大的特点是无界实时

  2. 有界流(bounded stream)有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理

    批处理最大的特点是有界和持久。批处理需要关注的是系统中的全套数据,这些数据大部分都是静态的。批处理一般用于进行大型的离线计算。例如我们写的SQL语句,都是针对一个表中包含的大小固定的数据进行处理。只不过在Flink中,将这些批量数据也当成是一种流来看待,因此才有了有界流这样一个概念。

2

部署应用到任意地方

Apache Flink 是一个分布式系统,它需要计算资源来执行应用程序。Flink 集成了所有常见的集群资源管理器,例如 Hadoop YARN (opens new window)Apache Mesos (opens new window)Kubernetes (opens new window),但同时也可以作为独立集群运行。

Flink 被设计为能够很好地工作在上述每个资源管理器中,这是通过资源管理器特定(resource-manager-specific)的部署模式实现的。Flink 可以采用与当前资源管理器相适应的方式进行交互。

部署 Flink 应用程序时,Flink 会根据应用程序配置的并行性自动标识所需的资源,并从资源管理器请求这些资源。在发生故障的情况下,Flink 通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信都是通过 REST 调用进行的,这可以简化 Flink 与各种环境中的集成。

运行任意规模应用

Flink 旨在任意规模上运行有状态流式应用。因此,应用程序被并行化为可能数千个任务,这些任务分布在集群中并发执行。所以应用程序能够充分利用无尽的 CPU、内存、磁盘和网络 IO。而且 Flink 很容易维护非常大的应用程序状态。其异步增量检查点算法对处理延迟产生最小的影响,同时保证精确一次状态的一致性。

  • 每天处理数万亿的事件,
  • 应用维护TB级别大小的状态,
  • 应用在数千个内核上运行

利用内存性能

有状态的 Flink 程序针对本地状态访问进行了优化。任务的状态始终保留在内存中,如果状态大小超过可用内存,则会保存在能高效访问的磁盘数据结构中。任务通过访问本地(通常在内存中)状态来进行所有的计算,从而产生非常低的处理延迟。Flink 通过定期异步对本地状态进行持久化存储,来保证故障场景下精确一次的状态一致性。

# Flink适用场景

实际上,流式处理批处理就是我们平常处理数据的常用方式。那为什么还要产生Flink这样专门的计算框架呢?在这里,我们还需要给Flink的适用场景加上一个特点,大,也就是数据量非常大。 在面临海量数据处理时,现有的很多开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为他们要实现的目标是完全不相同的。

  • 对于流处理,最需要关注的是低延迟和Exactly-once保证。
  • 对于批处理,更为关注的是高吞吐、高效处理。

所以在实现时通常会给出两套不同的实现方法。例如MapReduce就是专门进行批处理,而Storm专注于处理流式数据。

而Flink则是一个真正意义上的流批统一的计算框架。

Flink在实现流处理和批处理时,采用了一些与传统计算不一样的方案,他从另一个视角来看点流处理和批处理,将二者统一到一起进行处理。Flink是完全支持流处理,也就是说作为流处理看待时,输入的数据流是无界的。而批处理则被作为一种特殊的流处理,只是他的输入数据流被定义为有界的。

基于同一个Flink运行环境,分别提供了流处理和批处理API,而这两种API也实现上层面向流处理、批处理类型应用框架的基础。

典型的一些应用场景包括实时监控系统推荐系统日志分析系统等。

# 流式计算方案

Flink虽然实现了流批统一的计算模式,但是本质上来说,他还是更偏向于流式计算。所以,在开始Flink学习之前,我们有必要来理解一下,一个典型的流式计算框架是什么样的,需要处理哪些问题。

实际上,在处理源源不断的流式数据时,通常会有两种不同的思路。

  • 一种思路是典型的来一条数据处理一条数据。

Kafka Streams就是一个最为典型的这种处理方式的流式计算框架。

老牌的流式计算框架Storm其实也是采用的类似的方案。

  • 第二种思路是将流式数据看成一个一个小的批量数据。

典型的是在Spark Streaming。在处理流式数据时,是把流式数据划分成一个一个小的批量数据,称为窗口

对这两种方案的比较很容易看出:

第一种方案的时效性是比较高的,但是需要进行大量的并行计算,对计算资源的要求也会非常高。

第二种方案,减少了并行计算,相对节省计算资源,但是时效性就没这么高。

这两种方案都是典型的大数据场景下的流式计算处理方案,Flink处理流式数据的方案也无出其右。我们可以先了解一下这两种方案,然后在后续学习Flink的过程中,逐步验证这些方案,理解Flink是如何在流式计算领域中傲视群雄的。

# 二、Flink安装部署

# Flink的部署方式

Flink的部署方式非常多,Local、Standalone、Yarn、Mesos、Docker、Kubernetes、AWS都可以支持。

其中,我们主要关注LocalStandaloneYarn三种。

  • LOCAL:不单独部署运行环境,在代码中直接调试。后续一些简单的代码样例就会以Local方式调试。

  • Standalone:独立运行环境。 这种模式下,Flink将自己完全管理运行资源。这种方式,其实资源利用率是比较低的。

  • Yarn:以Hadoop提供的Yarn作为资源管理服务。这样可以更高效的使用集群的机器资源。

获取Flink:Flink下载地址 (opens new window)

当前最新版本是1.14.2,我们这次采用次新的1.13.5版本。选择Binaries下载运行时版本。在下载时会需要选择对应的scala版本。本次选择flink-1.13.5-bin-scala_2.11.tgz (opens new window)

Scala是基于JVM执行的一种语言,可以理解为跟Java很类似的。但是他的版本兼容性并没有java好,所以经常需要区分不同的版本,但是编译后都是提交到JVM上运行的。

而我们下载的运行版本都是scala编译后的执行文件,所以跟scala语言没有太多关系。

# 前置安装

  • 安装JDK(JAVA/Scala都是基于JVM运行)
  • 安装Zookeeper(Flink虽然内置了zookeeper,在部署时通常还是采用外置的zookeeper,便于优化)
  • Hadoop(只有yarn集群时安装)

# Standalone模式启动

本次部署使用Standalone模式,部署3台机器搭建flink集群。

  1. 上传flink安装包,解压。

81

  • lib目录下是flink的核心jar包,其中就包含了阿里贡献的blink的相关jar包。
  • opt目录下是flink的一些扩展jar包。
  • example下包含的是flink的示例。
  • conf配置目录。
  • bin脚本目录。
  1. 修改flink核心配置文件,conf/flink-conf.yaml

82

  • jobmanager.rpc.address必须指定自己的机器名。
  • 而Hign Avaibility部分是配置使用zookeeper搭建HA高可用集群的,如果没有安装hadoop和zookeeper,可以不用配置。
  1. 修改conf目录下的mastersworkers文件,指定集群中的节点。

83

  1. 启动,执行master节点所在机器的bin目录下的start-cluster.sh脚本。

正常启动后,使用jps指令查看,

  • 在master机器上可以看到一个StandaloneSessionClusterEntrypoint进程,
  • 而在workers机器上,可以看到一个TaskManagerRunner指令。

如果启动有问题,可以到flink的log目录下去查看启动日志。

启动完成后,可以查看flink提供的管理页面,地址: http://hadoop01:8081

84

在这个页面可以查看集群的相关状态。这里面比较关键的就是这个Available Task Slots。这个Slots插槽就是Flink执行具体任务的单元。具体的作用会在后面再讲解,但是在这里你只需要知道,如果slots不够,Flink就无法执行任务。

而最后的submit New Job页面,可以提交任务。现在,可以尝试下将flink的example下的jar包提交到集群上执行一下。例如可以用flink的example/streaming/WordCount.jar提交上去执行,就可以看到他的执行过程。

至此,本地Standalone部署成功。

这种本地部署的方式,资源利用率不够高效。通常不会用在生产环境中,更多的是作为开发调试集群。

# 三、Flink运行原理

# Flink运行流程图

Flink官方提供的集群结构图

15

# 客户端

对于Flink,可以通过执行一个Java/Scala程序,或者通过./bin/flink run ... 指令启动一个客户端。

客户端的主要作用其实就是构建好一个Dataflow graph或者也称为JobGraph,然后提交给JobManager

  • 这个JobGraph如果在客户端本地构建,这就是Per-job模式
  • 如果是提交到JobManager由Flink集群来构建,这就是Application模式

然后将提交完成后,客户端可以选择立即结束,这就是detached模式

也可以选择继续执行,来不断跟踪JobManager反馈的任务执行情况,这就是默认的attached模式

# JobManager和TaskManager

Flink中的节点可以分为JobManager和TaskManager。

  • JobManager处理器也称为Master,用于协调分布式任务执行;以及用来调度Task进行具体的任务。
  • TaskManager处理器也称为Worker,用于实际执行任务。

4

一个有效的Flink集群中可以包含多个JobManager组成高可用集群,也可以有多个TaskManager进行并行计算。他们可以直接在物理机上启动,也可以通过像Yarn这样的资源调度框架启动。

每一个处理器都是一个单独的JVM进程,也可以通过配置的方式管理他们占用的内存资源。在flink-conf.yaml配置文件中,可以通过参数进行配置。

  • jobmanager.memory.process.size属性,配置JobManager占用的内存大小;
  • taskmanager.memory.process.size属性,配置每个TaskManager占用的内存大小。

这个内存大小包含了JVM占用的堆内存以及堆外的元数据区和堆外直接内存的大小。这些参数也可以在提交任务的时候进行干预。

而JobManager在接收到任务时,整体执行的流程会是这样(该图片为yarn模式的流程,其余集群模式也都差不多)。

5

客户端会往JobManager提交任务,JobManager会往ResouceManager申请资源,当资源足够时,再将任务分配给集群中的TaskManager去执行。

  • 只不过在Standalone模式下,这个ResourceManager是由Flink自己担任的。
  • 而在Yarn模式下,则是转为由Yarn来担任ResourceManager角色。ResourceManager会在由Yarn的NodeManager管理的机器上动态分配运行容器。

# JobManager

JobManager作用:

JobManager会首先接收到客户端提交的应用程序,这些资源都将分发给所有的TaskManager去真正执行任务。

这个应用程序整体会包含几个部分:作业图JobGraph数据流图logic dataflow graph以及打包了所有类库以及资源的jar包

JobManager运行过程:

JobManger会把JobGraph转换成一个物理层面的数据流图作业图JobGraph相当于是一个设计图,也被叫做执行图ExecutionGraph,这其中包含了所有可以并发执行的任务,相当于是一个执行计划

接下来JobGraph会向资源管理器 (例如Yarn的ResourceManager)请求执行任务必要的资源,这些资源会表现为TaskManager上的slot插槽

一旦获得了足够多的资源,就会将执行图分发到真正运行任务的TaskManager上。

而在运行过程中,JobManager还会负责所有需要中央协调的操作,例如反馈任务执行结果,协调检查点备份,协调故障恢复等。

JobManager整体上由三个功能模块组成:

  • ResourceManager

ResourceManager在Flink集群中负责申请、提供和注销集群资源,并且管理task slots。Flink中提供了非常多的ResourceManager实现,比如Yarn,Mesos,K8s和standalone模式。

在standalone模式下,ResourceManager只负责在TaskManager之间协调slot的分配,而TaskManager的启动只能由TaskManager自己管理。

  • Dispatcher

Dispatcher模块提供了一系列的REST接口来提交任务,Flink的控制台也是由这个模块来提供。并且对于每一个执行的任务,Dispatcher会启动一个新的JobMaster,来对任务进行协调。

  • JobMaster

一个JobMaster负责管理一个单独的JobGraph。Flink集群中,同一时间可以运行多个任务,每个任务都由一个对应的JobMaster来管理。

一个集群中最少有一个JobManager。而在高可用部署时,也可以有多个JobManager。这些JobManager会选举出一个作为Leader,而其他的节点就处于备用的状态。

# TaskManager

TaskManager也成为Worker。每个TaskManager上可以有一个或多个Slot,这些Slot就是程序运行的最小单元。

在flink.conf.yaml文件中通过taskmanager.numberOfTaskSlots属性进行配置。

14

每一个TaskManager就是一个独立的JVM进程,而每个Slot就会以这个进程中的一个线程执行。这些Slot在同一个任务中是共享的,一个Slot就足以贯穿应用的整个处理流程。

# 插槽Slots

Task Slot是一个静态的概念,代表的是TaskManager具有的并发执行能力

每个slot表示的是TaskManager上拥有资源的一个固定大小的子集。每一个TaskManager是一个独立的JVM进程,可以在独立的线程上执行一个或多个任务task。

为了控制一个TaskManager能接收多少个task,TaskManager上就会划分出多个slot来进行控制。每个Slot就会以这个进程中的一个线程执行,这些Slot在同一个任务中是共享的,一个Slot就足以贯穿应用的整个处理流程。

这些slot之间的内存管理、数据都是相互隔离的;而这些slot其实都是在同一个JVM进程中,所以这里的隔离并不涉及到CPU等其他资源的隔离。

flink-conf.yaml配置文件中的taskmanager.numberOfTaskSlots属性,就配置了配个taskManager上有多少个slot,默认值是1。所以如果集群有3个TaskManager,每个taskManager上有1个slot,那么集群内就有3个slot(3*1)。

Flink集群只需要关注一个任务内的最大并行数(并行度),能提供满足最大并行数的slot即可,而不用关注整个任务需要多少Slot。

# 并行度 parallelism

并行度是一个动态的概念,表示的是运行程序时,实际需要使用的并发能力

程序运行时的parallelism管理有三个地方可以配置(优先级从低到高):

  • flink-conf.yaml文件中的parallelism.default这个属性,默认值是1。

  • 提交任务时可以指定任务整体的并行度要求。这个并行度可以在提交任务的管理页面和命令行中添加。

  • 程序代码中指定的并行度。在flink的应用程序中,几乎每一个分布式操作都可以定制单独的并行度。

总结:

简单说,slots配置的是集群的并发执行能力;而parallelism则是任务需要的并行度。

如果集群提供的slot资源不够,那程序就无法正常执行下去,会表现为任务阻塞或者超时异常。

6

# Flink时间语义

时间语义是Flink中非常精妙的一部分设计,也可以说是Flink最为重要的一个设计。

对于流式数据处理,顺序是非常重要的。而顺序是通过时间来表示的

但是,数据在网络传输的过程中,会产生各种中断或者延迟。很可能后发生的消息,经过网络传输后,反而先到达Flink进行计算。或者某些连续的数据由于网络不稳定产生了终端。最终处理的顺序就乱了。因此,就有必要定义不同的时间语义,用来管理消息的顺序

# Flink的三种自然时间语义

在Flink中定义了三种基本的时间语义:

  • Event Time: 事件真实发生的时间。
  • Ingestion Time: 事件进入Flink的时间。也就是由Data Source读入的时间。
  • Process Time: 事件进入Processor真正开始计算的时间。

22

在这三种时间语义当中,通常情况下,我们关注最多的是Event Time,因为那才是计算过程中真正需要关心的时间,但是Flink是无法直接知道Event的发生时间的。

Ingestion Time没有太多业务价值,通常不会太过关心。

Processing Time是Flink能够自行知道的时间,在EventTime不确定的情况下,Flink就只能根据ProcessingTime来进行计算了。

# 四、Flink开发

# Function顶级接口

Function是一个顶级处理函数接口,之前用到的各种Source、Sink、Transform都是这个接口的子实现类。

Function代表一个普通的函数接口,只对数据进行计算

Function接口本身没有提供任何方法。

# RichFunction接口

RichFunction则是Function的一个直接子接口,包含了对任务的生命周期管理。例如:

  • open方法,是在Slot任务执行之前触发,可以用来做很多一次性的初始化工作。
  • close方法,是在Slot任务执行之后触发,同样可以用来做很多一次性的收尾工作。
  • getRuntimeContext方法可以拿到方法执行的上下文,可以拿到很多任务执行时的信息,例如当前子任务的ID、当前任务的状态后端等。

# JAVA依赖包

Flink提供了java和scala两套客户端API,我们采用java依赖进行开发。

       
    <properties>
        <flink.version>1.12.5</flink.version>
    </properties>

    <dependencies>
       <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

Flink的计算功能非常强大,提供的应用API也非常丰富。Flink为流式/批量处理应用程序提供了

不同级别的抽象。

7

这四层API是一个依次向上支撑的关系。

  • 最底层的抽象就是有状态实时流处理 Stateful Stream Processing,是最底层的Low-Level API。实际上就是基于ProcessFunction提供的一整套API。在上面侧输出流部分,已经接触到了一个示例。这是最灵活,功能最全面的一层客户端API,允许应用程序可以定制复杂的计算过程。但是这一层大部分的常用的功能都已经封装在了上层的Core API当中。

    大部分的应用都不会需要使用到这一层API。

  • Core APIs主要是DataStream API以及针对批处理的DataSet API。这是最为常用的一套API。其中,又以DataStream API为主。他们其实就是基于一系列ProcessFunction做的一些高层次的封装,可以极大的简化客户端应用程序的开发。

    • 其中DataStream API是Flink中主要进行流计算的模块。

    • DateSet API是Flink中主要进行批量计算的模块。

  • Table API主要是表(Table)为中心的声明式编程API。他允许应用程序像操作关系型数据库一样对数据进行一些select\join\groupby等典型的逻辑操作,并且也可以通过用户自定义函数进行功能扩展,而不用确切地指定程序指定的代码。当然,Table API的表达能力还是不如Core API灵活。大部分情况下,用户程序应该将Table API和DataStream API混合使用。

  • SQL是Flink API中最顶层的抽象。功能类似于Table API,只是程序实现的是直接的SQL语句支持。本质上还是基于Table API的一层抽象。


常用的有DataStream APIDataSet APITable与SQL API三大部分:

DataSet API处理批量数据,但是批量数据在Flink中是被当做有界流来处理的。DataSet API中的大部分基础概念和功能也都是包含在Flink的DataStream API中的。

Table APISQL 是Flink主要针对Java和Scala语言提供的一套查询API。在Python语言客户端中也可以使用。他们是集成在一起的一整套API。通过Table API,可以用来对Flink数据集提供类似于关系型数据的数据查询过滤等功能。

在这三个部分中,DateStream API是Flink最为重要的部分。之前介绍过,Flink是以流的方式来进行流批统一的,所以这一部分API基本上包含了Flink的所有精华。

Flink 中的 DataStream 程序是对数据流进行转换的常规程序(例如过滤、更新状态、定义窗口、聚合)。数据流的起始是从各种源创建的(例如消息队列、套接字流、文件)。结果通过 sink 返回,例如可以将数据写入文件或标准输出(例如命令行终端、消息队列、套接字流、文件)。

官方文档——Flink DataStream API 编程指南 (opens new window)

# DataStream 是什么?

DataStream在Flink的应用程序中被认为是一个不可更改的数据集,这个数据集可以是无界的,也可以是有界的,Flink对他们的处理方式是一致的,这也就是所谓的流批统一

DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)

你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。

21

其实大数据场景下的流式计算是很复杂的,但是经过Flink封装后,确实就简单很多了。封装后Flink 程序看起来像一个转换 DataStream 的常规程序。每个程序由相同的基本部分组成:

  1. 获取一个执行环境(execution environment)
  2. 通过Source,加载/读取初始数据(获取数据源);
  3. 指定数据相关的转换Transformations
  4. 通过Sink,指定计算结果的存储位置(输出);
  5. 最后提交并启动任务,触发程序执行

# Environment 运行环境

StreamExecutionEnvironment是所有Flink中流式计算程序的基础。创建环境的方式有三种方式:

StreamExecutionEnvironment.getExecutionEnvironment()

StreamExecutionEnvironment.createLocalEnvironment()

StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)
1
2
3
4
5

通常情况下,你只需要使用getExecutionEnvironment()这一种方式就可以了。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
1

这个API会根据运行环境创建正确的StreamExecutionEnvironment对象:

  • 如果你在 IDE 中执行你的程序或将其作为一般的 Java 程序执行,那么它将创建一个本地环境,该环境将在你的本地机器上执行你的程序。
  • 如果你基于程序创建了一个 JAR 文件,并通过命令行运行它,Flink 集群管理器将执行程序的 main 方法,同时 getExecutionEnvironment() 方法会返回一个执行环境,在集群上执行你的程序。

这样就不需要区分应用是在IDE本地执行或者是在某一个Flink Cluster上执行。

# Source数据源

数据源Source,Flink应用程序的数据来源。Flink中提供了非常丰富的Source实现,目前主流的数据源都可以对接。为了指定 data sources,执行环境提供了一些方法,支持使用各种方法从文件中读取数据。

# 基于文件File的Source

从一个文本文件种读取数据。

// 从文件获取数据
DataStream<String> text = env.readTextFile("file:///path/to/file");
1
2

# 基于Socket的数据源

对接一个Socket通道,读取数据。

// 从socket发送端获取数据
DataStreamSource<String> stream = env.socketTextStream("localhost", 11111);
1
2

# 基于集合的数据源

fromCollection方法,从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。

final List<Integer> list = Arrays.asList(1, 2, 3, 4, 5); 
final DataStreamSource<Integer> stream = env.fromCollection(list);
1
2

# RabbitMQ读取数据

官方说明文档 (opens new window)

引入RabbitMQ 的连接器。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-rabbitmq_2.11</artifactId>
    <version>1.14.2</version>
</dependency>
1
2
3
4
5

RMQSource 负责从 RabbitMQ 中消费数据,可以配置三种不同级别的保证:

  1. 正好一次(exactly-once): 保证正好一次需要以下条件 -
  • 开启 checkpointing: 开启 checkpointing 之后,消息在 checkpoints 完成之后才会被确认(然后从 RabbitMQ 队列中删除).
  • 使用关联标识(Correlation ids): 关联标识是 RabbitMQ 的一个特性,消息写入 RabbitMQ 时在消息属性中设置。 从 checkpoint 恢复时有些消息可能会被重复处理,source 可以利用关联标识对消息进行去重。
  • 非并发 source: 为了保证精确一次的数据投递,source 必须是非并发的(并行度设置为1)。 这主要是由于 RabbitMQ 分发数据时是从单队列向多个消费者投递消息的。
  1. 至少一次(At-least-once): 在 checkpointing 开启的条件下,如果没有使用关联标识或者 source 是并发的, 那么 source 就只能提供至少一次的保证。

  2. 无任何保证(No guarantee): 如果没有开启 checkpointing,source 就不能提供任何的数据投递保证。 使用这种设置时,source 一旦接收到并处理消息,消息就会被自动确认

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// checkpointing is required for exactly-once or at-least-once guarantees
env.enableCheckpointing(...);

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();
    
final DataStream<String> stream = env
    .addSource(new RMQSource<String>(
        connectionConfig,            // config for the RabbitMQ connection
        "queueName",                 // name of the RabbitMQ queue to consume
        true,                        // 使用相关ID;如果至少需要一次,则可以为false
        new SimpleStringSchema()))   // 将消息转换为Java对象的反序列化模式
    .setParallelism(1);              // 非并行源仅需要一次
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 从Kafka读取数据

在通常情况下,流式数据最大的数据来源还是kafka。而Flink已经提供了针对kafka的Source。

需要添加maven依赖,引入kafka的连接器

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka_2.12</artifactId> 
    <version>1.12.3</version> 
</dependency> 
1
2
3
4
5

然后使用FlinkKafkaConsumer创建一个Source。

Flink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。构造函数接收以下参数:

  • Topic 名称或者名称列表;
  • 用于反序列化 Kafka 数据的 DeserializationSchema 或者 KafkaDeserializationSchema;
  • Kafka 消费者的属性。需要以下属性:
    • “bootstrap.servers”(以逗号分隔的 Kafka broker 列表)
    • “group.id” 消费组 ID
Properties properties = new Properties(); 
properties.setProperty("bootstrap.servers", "localhost:9092"); 
properties.setProperty("group.id", "test"); 
// 从Apache Kafka获取数据
final FlinkKafkaConsumer<String> mysource = new FlinkKafkaConsumer<> ("flinktopic", new SimpleStringSchema(), properties); 
DataStream<String> stream = env.addSource(mysource); 
1
2
3
4
5
6

自定义KafkaDeserializationSchema配置序列化对象。

创建 Schema实现KafkaDeserializationSchema,重写其T deserialize(ConsumerRecord<byte[], byte[]> record)方法,Kafka处理的每条消息都会调用该方法(deserialize)进行反序列化,生成所需自定义对象。

public class MyDeserializationSchema implements KafkaDeserializationSchema<MyPerson> {

    @Override
    public boolean isEndOfStream(MonitorGunMsg o) {
        return false;
    }

    @Override
    public MonitorGunMsg deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
        String kafkaMsg = new String(consumerRecord.value(), StandardCharsets.UTF_8);
        try{
            JSONObject msgObj = JSON.parseObject(kafkaMsg);
            String name = msgObj.getString("name");
            Integer age = msgObj.getInteger("age");
            MyPerson info = new MyResultInfo();
            info.setTime(consumerRecord.timestamp());
            info.setName(name);
            info.setAge(age);
            return monitorGunMsg;
        }catch (Exception e){
            logger.error("kafka_source message prase [{}] error:", kafkaMsg, e);
        }
        return null;

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

配置 Kafka Consumer 开始消费的位置有4种方式:

myConsumer.setStartFromEarliest();     // 尽可能从最早的记录开始
myConsumer.setStartFromLatest();       // 从最新的记录开始
myConsumer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒)
myConsumer.setStartFromGroupOffsets(); // 默认的方法
1
2
3
4

# 自定义Source

用户程序也可以基于Flink提供的SourceFunction,配置自定义的Source数据源。

public class UDFSource {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        final DataStreamSource<Stock> orderDataStreamSource = env.addSource(new MyOrderSource());
        orderDataStreamSource.print();
        env.execute("UDFOrderSOurce");
    }

    public static class MyOrderSource implements SourceFunction<Stock> {
        private boolean running = true;
        @Override
        public void run(SourceContext<Stock> ctx) throws Exception {
            final Random random = new Random();
            while(running){
                Stock stock = new Stock();
                stock.setId("stock_"+System.currentTimeMillis()%700);
                stock.setPrice(random.nextDouble()*100);
                stock.setStockName("UDFStock");
                stock.setTimestamp(System.currentTimeMillis());

                ctx.collect(stock);
                Thread.sleep(1000);
            }
        }
        @Override
        public void cancel() {
            running=false;
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

另外,Flink非常多常用组件的Connector。例如Hadoop,HBase,ES,JDBC等。 具体参见官方网站的Connectors模块 (opens new window)

23

# Transformation

官方文档——数据流转换 (opens new window)

Transformation是对DataStream进行数据转换的操作。通过方法能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换方法合并成一个复杂的数据流拓扑(方法链)。

# Transformation数据流转换

数据流转换 DataStream Transformations

# Map

DataStream -> DataStream,处理一个元素生成另一个元素。

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});
1
2
3
4
5
6
7
# FlatMap

DataStream -> DataStream,他与Map的区别在于会将多层嵌套的数据结构压缩成一个扁平的Map结构。

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});
1
2
3
4
5
6
7
8
9
# Filter

DataStream -> DataStream,根据一个判断条件对数据进行过滤,不满足要求的数据将被剔除。

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});
1
2
3
4
5
6
# KeyBy

DataStream -> KeyedStream, 对于(key,value)类型的数据,按照key进行分组,并按照给定的计算方法将key相同的那些value聚合成一个新的value。

dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
1
2

这里只需要注意下,对Key的类型是有一点要求的:

  1. key可以是任何类型的数组。;

  2. key如果是一个POJO类型的对象,那么他需要重写HashCode()方法。

# Reduce

KeyedStream -> DataStream,将KeyedStream中的每一个Value数组进行两两相邻的循环操作。最终计算出一个值。

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});
1
2
3
4
5
6
7
# Aggregations

KeyedStream -> DataStream,对KeyedStream中的数组进行一些统计计算

可以通过元素的序号直接选择统计的列,也可以指定元素的列名。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
1
2
3
4
5
6
7
8
9
10

这其中min()和minBy()的区别是:min()返回当前这一列的最小值,而minBy()返回最小值所在的这一个数据元祖。

# union

DataStream,DataStream -> DataStream,将两个DataSteam的数据集合到一起,产生一个包含了所有元素的新DataStream。注意下这个union操作是不去重的。

dataStream.union(otherStream1, otherStream2, ...);
1
# Connect

DataStream,DataStream -> ConnectedStream连接两个保持原有类型的数据流。两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的,数据和形式不发生任何变化,两个流相互独立。通常只作为一个中间状态,进行后续的统计。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
1
2
3
4
# CoMap,CoFlatMap

ConnectedStream -> DataStream,和之前的Map,FlatMap相似,只是这是作用在ConnectedStream的版本。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 物理分区

物理分区 Physical partitioning。Flink 也提供以下方法让用户根据需要在数据转换完成后对数据分区进行更细粒度的配置。

# broadcast

Broadcasting DataStream → DataStream,将数据广播到每个分区。

dataStream.broadcast();
1

# Task chaining(任务链/方法链)

将两个方法链接在一起能使得它们在同一个线程中执行,从而提升性能。

Flink 默认会将能链接的方法尽可能地进行链接(例如, 两个 map 转换操作)。

如果想对整个作业禁用方法链,可以调用 StreamExecutionEnvironment.disableOperatorChaining()

此外, Flink 还提供了对链接更细粒度控制的 API 以满足更多需求。

# Sink输出

Sink是Flink中的输出组件,当使用 DataStream转换数据后, 由Sink负责将数据输出到文件Socket外部系统等。

Flink 自带了多种内置的输出格式,这些格式相关的实现封装在 DataStreams 的方法中。

# Sink至Text(输出为字符串)

// 将元素按行写成字符串。通过调用每个元素的 toString() 方法获得字符串。
writeAsText() / TextOutputFormat
1
2

# 打印

DataStream可以通过print()和printToErr()将结果输出到标准控制台。

  • 本地开发输出至Console面板;
  • 部署job可以在TaskManager菜单的控制台中查看。
// 控制台输出
wordcounts.print();
1
2

# Sink输出至文件

对于DataStream,有两个方法writeAsTextwriteAsCsv,可以直接将结果输出到文本文件中。但是在当前版本下,这两个方法已经被标记为过时。

当前推荐使用StreamingFileSink,调用withOutputFileConfig方法。例如:

public class FileSinkDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(100);
        final URL resource = FileRead.class.getResource("/test.txt");
        final String filePath = resource.getFile();
        final DataStreamSource<String> stream = env.readTextFile(filePath);

        // 配置输出文件
        OutputFileConfig outputFileConfig = OutputFileConfig
                .builder()
                .withPartPrefix("prefix")
                .withPartSuffix(".txt")
                .build();
        // 将结果输出到文本文件
        final StreamingFileSink<String> streamingfileSink = StreamingFileSink
                .forRowFormat(new Path("D:/ft"), new SimpleStringEncoder<String>("UTF-8"))
                .withOutputFileConfig(outputFileConfig)
                .build();
        stream.addSink(streamingfileSink);

        env.execute("FileSink");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

流式计算场景下的文件输出,不能直接往一个文件里不停的写。StreamingFileSink提供了流式数据的分区读写以及滚动更新功能。Flink另外提供了多种文件格式的Sink类型。具体参见streamfile-sink (opens new window)

然后,针对流批统一场景,Flink还另外提供了一个StreamingFileSink的升级版实现——FileSink

当然使用FileSink时需要增加一个maven依赖。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>1.12.5</version>
</dependency>
1
2
3
4
5

这样就可以使用FileSink进行流批统一的文件输出。如下:

final FileSink<String> fileSink = FileSink
          .forRowFormat(new Path("D:/ft"), new SimpleStringEncoder<String>("UTF-8"))
          .withOutputFileConfig(outputFileConfig)
          .build();
stream.sinkTo(fileSink);
1
2
3
4
5

通常情况下,流式数据很少会要求输出到文件当中,更多的场景还是会直接输出到其他下游组件当中,例如kafka、ES等。

# 输出Socket

使用writeToSocket方法,例如我们可以将之前从Socket读到的wordcount结果输出回Socket。

public class SocketSinkDemo {
    public static void main(String[] args) throws Exception {
        // 获取环境信息
        final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
//        environment.setParallelism(1);
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        final int port = parameterTool.getInt("port");

        // 从socket发送端获取数据
        final DataStreamSource<String> inputDataStream = environment.socketTextStream(host, port);

        // 对数据进行转换处理
        final DataStream<Tuple2<String, Integer>> wordcounts = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                final String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        })
                .keyBy(value -> value.f0)
                .sum(1);

        // 控制台输出
        wordcounts.print();
        // 再将数据输出至Socket
        wordcounts.writeToSocket(host, port, new SerializationSchema<Tuple2<String, Integer>>() {
            @Override
            public byte[] serialize(Tuple2<String, Integer> element) {
                return (element.f0 + "-" + element.f1).getBytes(StandardCharsets.UTF_8);
            }
        });

        // 触发执行
        environment.execute("SocketSinkDemo");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# 输出到kafka

在Source中Flink提供的这个kafka的connector依赖包(flink-connector-kafka_2.12),即提供了FlinkKafkaConsumer作为Source消费消息,也提供了FlinkKafkaProducer作为Sink生产消息。

public class KafkaSinkDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收kafka消息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "worker1:9092,worker2:9092,worker3:9092");
        properties.setProperty("group.id", "test");
        final FlinkKafkaConsumer<String> mysource = new FlinkKafkaConsumer<>("flinktopic", new SimpleStringSchema(), properties);
//        mysource.setStartFromLatest();
//        mysource.setStartFromTimestamp();
        DataStream<String> stream = env.addSource(mysource);

        // 控制台打印
        stream.print();

        // 再将消息转存到另一个Topic
        properties = new Properties();
        properties.setProperty("bootstrap.servers", "worker1:9092,worker2:9092,worker3:9092");
        final FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>("flinktopic2"
                , new SimpleStringSchema()
                , properties
                , new FlinkFixedPartitioner<>()
                , FlinkKafkaProducer.Semantic.EXACTLY_ONCE
                , 5);
        stream.addSink(myProducer);
        
        env.execute("KafkaConsumer");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# RabbitMQ Sink

该连接器提供了一个 RMQSink 类,用来向 RabbitMQ 队列发送数据。

final DataStream<String> stream = ...

final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setHost("localhost")
    .setPort(5000)
    ...
    .build();
    
stream.addSink(new RMQSink<String>(
    connectionConfig,            // config for the RabbitMQ connection
    "queueName",                 // name of the RabbitMQ queue to send messages to
    new SimpleStringSchema()));  // serialization schema to turn Java objects to messages
1
2
3
4
5
6
7
8
9
10
11
12

# 自定义Sink function

可以通过不带生命周期的SinkFunction以及带生命周期的 RickSinkFunction来自定义自己的Sink实现。

  • SinkFunction接口只有一个invoke方法。
  • RichSinkFunction继承了RichFunction接口,另外增加了open\close等生命周期管理的方法。
public class UDFJDBCSinkDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        final DataStreamSource<Stock> source = env.addSource(new UDFSource.MyOrderSource());
        source.addSink(new MyJDBCSink());

        env.execute("UDFJDBCSinkDemo");
    }
    public static class MyJDBCSink extends RichSinkFunction<Stock> {
        Connection connection = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "root", "root");
            insertStmt = connection.prepareStatement("insert into flink_stock (id, price,stockname) values (?, ?, ?)");
            updateStmt = connection.prepareStatement("update flink_stock set price = ?,stockname = ? where id = ?");
        }

        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            connection.close();
        }

        @Override
        public void invoke(Stock value, Context context) throws Exception {
            System.out.println("更新记录 : "+value);
            updateStmt.setDouble(1, value.getPrice());
            updateStmt.setString(2, value.getStockName());
            updateStmt.setString(3, value.getId());
            updateStmt.execute();
            if( updateStmt.getUpdateCount() == 0 ){
                insertStmt.setString(1, value.getId());
                insertStmt.setDouble(2, value.getPrice());
                insertStmt.setString(3, value.getStockName());
                insertStmt.execute();
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

运行这个示例,需要引入mysql的jdbc驱动包。

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.25</version>
</dependency>
1
2
3
4
5

# SideOutputStream

对于那些超过了最长等待时间的事件,Flink的处理思路是不再提供统一的处理,而是将这些事件单独放到另一个侧输出流SideOutputStream中。

由用户决定到底要如何处理这些数据,到底是将这些数据抛弃掉,还是进行一些补偿的计算行为。

# 侧输出流的作用

其实还不只是在于处理乱序数据,他是完全交由用户自行完成的一个补偿机制。

从一个主要的DataStream数据流中,可以产生任意数量的侧输出结果流

  • 并且这些结果流的数据类型也不需要完全与主要的数据里中的数据类型一致。
  • 并且不同的侧输出流,他们的类型也不必要完全相同。

总之,这个侧数据流完全由用户自行把控。

# 侧输出流的使用

使用输出流,首先需要进行明确的定义。

OutputTag<String> outputTag = new OutputTag<String>("side-output") {}; 1
1

接下来可以通过用户自定义的一些Funciton算子来实现侧输出流的数据收录。包括:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

ProcessFunction

关于ProcessFunction,是Flink提供的一套底层基础API。

我们之前了解的各种DataStreamAPI,都是基于ProcessFunction这一套API构建起来的,具体可以参见:process_function.html (opens new window)

你可以使用在上述方法中向用户暴露的context参数,将数据发送到outputtag标识的侧输出流。

DataStream<Integer> input = ...;

final OutputTag<String> outputTag = new OutputTag<String>("side-output") {
};

SingleOutputStreamOperator<Integer> mainDataStream = input
    .process(new ProcessFunction<Integer, Integer>() {
        @Override
        public void processElement(
            Integer value,
            Context ctx,
            Collector<Integer> out) throws Exception {
            // 发送数据到主要的输出 
            out.collect(value);
            // 发送数据到旁路输出 
            ctx.output(outputTag, "sideout-" + String.valueOf(value));
        }
    });
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

接下来,可以在DataStream的运算结果上使用getSideOutput(OutputTag)方式获取侧输出流,进行后续的侧输出流处理。

final OutputTag<String> outputTag = new OutputTag<String>("side-output"){}; 
SingleOutputStreamOperator<Integer> mainDataStream = ...; 
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
1
2
3

整个侧输出流相当于是对所有异常数据的一个兜底操作,不光对于超时的事件可以用侧输出流进行最后的补偿处理,对于一些不正确的噪点事件,也可以用侧输出流的方式进行最后的操作。而对于侧输出流中没有捕获的事件, Flink就爱莫能助,只能放弃了。

完整代码示例:

public class SideOutputDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final URL resource = FileRead.class.getResource("/stock.txt");
        final String filePath = resource.getFile();
//        final DataStreamSource<String> stream = env.readTextFile(filePath);
        final DataStreamSource<String> dataStream = env.readFile(new TextInputFormat(new Path(filePath)), filePath);
        final SingleOutputStreamOperator<Stock> stockStream = dataStream
                .map((MapFunction<String, Stock>) value -> {
                    final String[] split = value.split(",");
                    return new Stock(split[0], Double.parseDouble(split[1]), split[2], Long.parseLong(split[3]));
                });

        OutputTag<Stock> cheapStock = new OutputTag<Stock>("cheapStock"){};
        OutputTag<Stock> expensiveStock = new OutputTag<Stock>("expensiveStock"){};

        final SingleOutputStreamOperator<Stock> stockPriceStream = stockStream.process(new ProcessFunction<Stock, Stock>() {
            @Override
            public void processElement(Stock value, Context ctx, Collector<Stock> out) throws Exception {
                if (value.getPrice() < 10.00) {
                    ctx.output(cheapStock, value);
                } else if (value.getPrice() > 80.00) {
                    ctx.output(expensiveStock, value);
                } else {
                    out.collect(value);
                }

            }
        });

        stockPriceStream.print("stockPriceStream");
        stockPriceStream.getSideOutput(cheapStock).print("cheapStock");
        stockPriceStream.getSideOutput(expensiveStock).print("expensiveStock");

        env.execute("SideOutputDemo");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 六、Flink部署

# 提交任务

Standalone模式的管理控制台:

访问控制台,打开 Submit New Job页面,选择 +Add New 按钮,提交jar包。

11

点击提交,就可以开启一个任务。

running job页面就可以看到正在执行的任务stream word count

选择这个任务,就能看到任务的执行情况。

12

  1. 这个数据流图展示了整个这个应用的具体执行的步骤,整个数据流转的执行计划
  • 其中每一个蓝色方块中表示每一步操作的描述,并展示每一步的并行度Parallelism;

  • 连线上的标识,表示Task分片的类型,按照什么规则去分片。

  1. 下方列表数据,展示了整个应用的数据流量。包括每一步操作的名称、状态、接受以及发送的数据大小以及记录数量等。

然后,我们回到Overview页面,查看下整体的slot情况。

13

可以看到我们这个job总共需要7个slot,但是集群中只有3个slot,程序也正常执行起来了。

这也体现了slot复用的效果。也就是说slot可以在不同的执行步骤中处理不同的任务。只要集群资源能够支撑应用最大的并行度要求,整个应用就可以运行起来。

实际上,Flink对于这个数据流图还会有一些自己的优化,例如某些相邻的操作,他们的并行度相同,任务也不是很复杂时,Flink会将这些相邻的步骤进行合并。

这些slot在同一个任务内部是可以不断复用的,但是在不同的任务之间,是不能共用的。所以,这时可以看到,集群中仅有的3个slot已经全部被这个应用job给占满了。如果需要再启动其他的应用job,就无法执行了。这时JobManager会不断的尝试重新申请slot。

  • 如果集群中有空出来的slot,那就可以分配给新启动的应用job。
  • 如果一 直申请不下来,JobManager会不断重试,默认每重试10次就会休息一点时间,过后再继续申请。

# 七、Flink相关资料

Flink官网 (opens new window)

Flink下载地址 (opens new window)