flink

Apache Flink

Flink是什么?

官网flink.apache.org

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.

flink-home-graphic

flink

Flink和其他流计算的比较

Flink、Spark、mstream

  • spark中,批处理用RDD(Resiliennt Distributed Datasets,抽象弹性分布式数据集),流式用DStream,本质还是RDD,小批量来模拟流式。spark把流看成是更快的批处理。
  • flink把批处理看成流的特殊例子,批处理建立在流式基础之上。

Flink

  • 流和批处理的支持
  • 复杂的状态管理
  • event-time处理语义
  • exactly-once保证
  • 高可用性

Flink应用场景

  • 事件驱动型应用,是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作

usecases-eventdrivenapps

  • 数据分析应用,批处理和流计算

usecases-analytics

  • 数据管道应用,数据管道是以持续流模式运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。

usecases-datapipelines

Flink的一些概念

流(数据)是流处理的基本要素

无界和有界数据

bounded-unbounded

  • 无界流 有定义流的开始,但没有定义流的结束。通常是数据生成时进行实时的处理。
  • 有界流 有定义流的开始,也有定义流的结束。有界流处理通常被称为批处理。

状态

任何运行基本业务逻辑的流处理应用都需要在一定时间内存储所接收的事件中间结果,以供后续的某个时间点(例如收到下一个事件或者经过一段特定时间)进行访问并进行后续处理。

  • 多种状态基础类型,例如原子值(value),列表(list)以及映射(map)
  • 插件化的State Backend:State Backend 负责管理应用程序状态,并在需要的时候进行 checkpoint。Flink 支持多种 state backend,可以将状态存在内存或者 RocksDB。也支持插件式的自定义 state backend 进行状态存储。
  • 精确一次语义:Flink 的 checkpoint 和故障恢复算法保证了故障发生后应用状态的一致性。
  • 超大数据量状态,Flink 能够利用其异步以及增量式的 checkpoint 算法,存储数 TB 级别的应用状态
  • 可弹性伸缩的应用:Flink 能够通过在更多或更少的工作节点上对状态进行重新分布,支持有状态应用的分布式的横向伸缩。

local-state

check-point

Flink的CheckPoint机制背后的实现算法,Chandy-Lamport算法。

时间

大多数的事件流都拥有事件本身所固有的时间语义。许多常见的流计算都基于时间语义,例如窗口聚合、基于时间的 join(比如,某个时间的汇率和某个时间的交易)。

时间属性

  • Processing Time,处理时间,指的是执行具体操作时的机器时间(也称作挂钟时间)。
  • Event Time,事件时间,指的是数据本身携带的时间。这个时间是在事件产生时的时间。无论处理的是历史记录的事件还是实时的事件,事件时间模式的处理总能保证结果的准确性和一致性。
  • Ingestion Time,进入时间,指的是数据进入Flink的时间;在系统内部,会把它当做事件时间来处理。

PROCTIME() – 声明一个额外的列作为处理时间属性

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

事件时间属性可以用 WATERMARK 语句在CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 声明 user_action_time 是事件时间属性,并且用延迟5秒的策略来生成 watermark
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

Watermark

什么是乱序呢?可以理解为数据到达的顺序和其实际产生时间的排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等。

  • Watermark 支持:Flink 引入了 watermark 的概念,用以衡量事件时间进展。Watermark 也是一种平衡处理延时和完整性的灵活机制。
  • 迟到数据处理:当以带有 watermark 的事件时间模式处理数据流时,在计算完成之后仍会有相关数据到达。这样的事件被称为迟到事件。Flink 提供了多种处理迟到数据的选项,例如将这些数据重定向到旁路输出(side output)或者更新之前完成计算的结果。

Watermark就是一种衡量Event Time进展的有效机制。随着时间的推移,最早流入实时计算的数据会被处理完成,之后流入的数据处于正在处理状态。处于正在处理部分的和已处理部分的交界的时间戳,可以被定义为Watermark,代表在此之前的事件已经被处理完成并输出。Watermark是一种告诉Flink一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据。Watermarks理解为一个水位线,这个Watermarks在不断的变化。只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算。

某些极端情况下数据延迟会非常严重,即便通过WaterMark机制也无法等到数据全部进入窗口再进行处理。默认情况下,Flink会将这些严重迟到的数据丢弃掉;如果用户希望即使数据延迟到达,也能够按照流程处理并输出结果,此时可以借助Allowed Lateness机制来对迟到的数据进行额外的处理。allowedLateness就是针对event time而言,对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据,会buffer起来,直到watermark超过end-of-window + allowedLateness()的时间,窗口的数据及元数据信息才会被删除。

Windows

把无界数据流分解成有界数据流聚合分析

场景

  • 每分钟的浏览量
  • 每周的活跃用户
  • 每天的最高充值记录

窗口区分

  • 滚动时间窗口,每分钟页面浏览量,TumblingEventTimeWindows.of(Time.minutes(1))
  • 滑动时间窗口,每10秒钟计算前1分钟的页面浏览量,SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
  • 会话窗口,每个会话的网页浏览量,其中会话之间的间隔至少为30分钟 ,EventTimeSessionWindows.withGap(Time.minutes(30))

window-assigners

Flink架构

Flink 运行时由两种类型的进程组成:一个 JobManager 和一个或者多个 TaskManager。

processes

JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。

TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子。

Flink应用开发

Flink分层

  • 数据流编程模型

抽象层次

  • Flink API 最底层的抽象为有状态实时流处理,其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。

  • Flink API 第二层抽象是 Core APIs。使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。

  • Flink API 第三层抽象是 Table API。Table API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。

  • Flink API 最顶层抽象是 SQL。这层抽象在语义和程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。

Flink程序的基本构建块是流和转换,是(可能永无止境的)数据记录流,而转换是将一个或多个流作为一个或多个流的算子操作。输入,并产生一个或多个输出流。

Flink程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个汇(sink)结束。

program_dataflow

Flink程序本质上是分布式并行程序。在程序执行期间,一个流有一个或多个流分区(Stream Partition),每个算子有一个或多个算子子任务(Operator Subtask)。每个子任务彼此独立,并在不同的线程中运行,或在不同的计算机或容器中运行。

parallel_dataflow.svg

Flink应用程序是从其 main() 方法产生的一个或多个 Flink 作业的任何用户程序。这些作业的执行可以在本地 JVM(LocalEnvironment)中进行,或具有多台机器的集群的远程设置(RemoteEnvironment)中进行。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// source
DataStream<String> text = env.readTextFile("file:///path/to/file");

DataStream<String> input = ...;

// transform
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
    @Override
    public Integer map(String value) {
        return Integer.parseInt(value);
    }
});

// sink
writeAsText(String path);

print();

Flink中的DataStream程序是实现数据流转换的常规程序(例如,Filter,更新状态,定义窗口,聚合)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

  • 初始化execution environment,
  • 加载/创建初始数据,
  • 指定此数据的转换,
  • 指定放置计算结果的位置,
  • 触发程序执行

word-count-1.png

word-count-2.png

demo_1.png

demo_2.png

demo_3.png

上传jar,然后选择入口类和参数,执行测试

Flink 的 Table API 和 SQL 是流批统一的 API。Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。

传统的关系代数和流处理与输入数据、执行和输出结果的关系

关系代数/SQL

  • 关系(或表)是有界(多)元组集合
  • 对批数据(例如关系数据库中的表)执行的查询可以访问完整的输入数据
  • 批处理查询在产生固定大小的结果后终止

流处理

  • 流是一个无限元组序列
  • 流式查询在启动时不能访问所有数据,必须等待数据流入
  • 流查询不断地根据接收到的记录更新其结果,并且始终不会结束

动态表 & 连续查询

动态表是Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询 。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

stream-query-stream

  • 将流转换为动态表。
  • 在动态表上计算一个连续查询,生成一个新的动态表。
  • 生成的动态表被转换回流

在流上定义表

append-mode

在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表。在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。

query-groupBy-cnt

clicks 分组至每小时滚动窗口中,然后计算 url 数量

query-groupBy-window-cnt

表到流的转换

  • Append-only流
  • Retract流
  • Upsert流

flink sql client

sudo ./sql-client.sh embedded

Flink SQL> help;
The following commands are available:

CLEAR       Clears the current terminal.
CREATE TABLE        Create table under current catalog and database.
DROP TABLE      Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'
CREATE VIEW     Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DESCRIBE        Describes the schema of a table with the given name.
DROP VIEW       Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
EXPLAIN     Describes the execution plan of a query or table with the given name.
HELP        Prints the available commands.
INSERT INTO     Inserts the results of a SQL SELECT query into a declared table sink.
INSERT OVERWRITE        Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.
QUIT        Quits the SQL CLI client.
RESET       Resets all session configuration properties.
SELECT      Executes a SQL SELECT query on the Flink cluster.
SET     Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
SHOW FUNCTIONS      Shows all user-defined and built-in functions.
SHOW TABLES     Shows all registered tables.
SOURCE      Reads a SQL SELECT query from a file and executes it on the Flink cluster.
USE CATALOG     Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'
USE     Sets the current default database. Experimental! Syntax: 'USE <name>;'

Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.
SELECT name, COUNT(*) AS cnt
FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS nameTable(name)
GROUP BY name;

Flink SQL的实现原理是什么?是如何实现 SQL 解析的呢?

Flink将SQL校验、SQL解析以及SQL优化交给了Apache Calcite。

  • SQL query会经过Calcite解析器转变成SQL节点树,通过验证后构建成Calcite的抽象语法树(也就是图中的LogicalPlan)
  • Table API上的调用会构建成Table API的抽象语法树,并通过Calcite提供的RelBuilder转变成Calcite的抽象语法树
  • 依次被转换成逻辑执行计划和物理执行计划。在提交任务后会分发到各个TaskManager中运行,在运行时会使用Janino编译器编译代码后运行。

电商场景实战之实时PV和UV曲线

bigdata11