kudu-sink

通过Flink从Kafka订阅数据入库到Kudu

Kudu入库

JAVA API 提供了三种向 kudu 插入数据的刷新策略

  • AUTO_FLUSH_SYNC
  • AUTO_FLUSH_BACKGROUND
  • MANUAL_FLUSH
比较 AUTO_FLUSH_SYNC AUTO_FLUSH_BACKGROUND MANUAL_FLUSH
是否批处理
吞吐量
灵活性
  • AUTO_FLUSH_SYNC,apply() 返回之前已经刷新了缓冲区写入kudu tablet。flush方法调用不会产生任何影响。
  • MANUAL_FLUSH,apply() 立即返回,需要手动调用flush才会写入kudu tablet。
  • AUTO_FLUSH_BACKGROUND,调用将立即返回,写入将在后台发送。写操作是在后台进行的,后台Flush线程定时刷新,所以错误都将存储在会话本地缓冲区中。

Kudu三种FlushMode对比分析

集群配置,3台kudu master 6台kudu server(8核16G)

  • AUTO_FLUSH_BACKGROUND,入库5s,40w - 50w row/s
  • MANUAL_FLUSH,和AUTO_FLUSH_BACKGROUND同一数量级,稍慢一点
  • AUTO_FLUSH_SYNC模式,测试慢20倍左右,2-3w row/s

考虑到性能和灵活性,使用MANUAL_FLUSH入库方式。配置间隔5s发送 或 超过500条数据发送。测试6个并发度30w/s以上。

kudu.mode=MANUAL_FLUSH # kudu入库模式
kudu.flushInterval=5000 # MANUAL_FLUSH kudu 手动提交时间间隔
kudu.mutationBufferSpace=1000 # MANUAL_FLUSH kudu buffer数量,超过1000/2 = 500触发手动提交

代码结构

├── README.md
├── pom.xml 依赖库配置
├── src
│   └── main
│       ├── java
│       │   └── fkudu
│       │       ├── BaseStreamJob.java
│       │       ├── SinkKudu.java Kudu sink封装
│       │       ├── SinkKuduFailJob.java 失败日志处理入口
│       │       ├── StreamingJob.java 流计算入口
│       │       ├── constants 常量
│       │       ├── kudu kudu操作方法封装
│       │       ├── models POJO model
│       │       └── utiles
│       └── resources
│           ├── application.properties 项目配置文件
│           └── log4j.properties 日志配置文件

models

POJO,每个字段都有getter和setter方法。

业务模型

StreamingJob

  1. Environment
    • env
    • checkpoint
    • restartStrategy
  2. Source
    • kafka data source
    • kafka config source
  3. Transform
    • filter
    • map
    • connect
  4. Sink
    • kudu sink -> fail callback

环境变量

code-04

checkpoint

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

  • 设置模式为exactly-once
  • 设置checkpoint的周期, 每隔多久进行启动一个检查点
  • 检查点必须在一分钟内完成,或者被丢弃
  • 同一时间只允许进行的检查点
  • Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint。修改配置conf/flink-conf.yaml

state.checkpoints.num-retained: 2

Flink checkpoint目录分别对应的是 jobId,flink提供了在启动之时通过设置 -s 参数指定checkpoint目录。

checkpoint 由元数据文件、数据文件(与 state backend 相关)组成

/user-defined-checkpoint-dir
    /{job-id}
        |
        + --shared/
        + --taskowned/
        + --chk-1/
            |-- _metadata
        + --chk-2/
        + --chk-3/
        ...

_metadata 元数据

public class CompletedCheckpoint implements Serializable {
    private final JobID job;
    private final long checkpointID;
    private final long timestamp;
    private final long duration;

    /** 本次 Checkpoint 中每个算子的 ID 及算子对应 State 信息 */
    private final Map<OperatorID, OperatorState> operatorStates;
    private final CheckpointProperties props;
    private final Collection<MasterState> masterHookStates;
    // Checkpoint 存储路径
    private final CompletedCheckpointStorageLocation storageLocation;
    // 元数据句柄
    private final StreamStateHandle metadataHandle;
    // Checkpoint 目录地址
    private final String externalPointer;
    private transient volatile CompletedCheckpointStats.DiscardCallback discardCallback;
}

flink-web-02.png

kudu-sink-checkpoint-02.png

kudu-sink-checkpoint-01.png

重启策略

  1. 固定延迟重启策略
  2. 失败率重启策略
  3. 无重启策略
restart-strategy.failure-rate.delay: 10 s
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.max-failures-per-interval: 10
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 10 s
restart-strategy: none

restart-strategy-01.png

华为云默认无重启策略

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  10, // 尝试重启次数
  Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔
));

// 重启策略,固定时间间隔允许Job重启的最大次数,固定时间间隔,两次重启的延迟时间
env.setRestartStrategy(RestartStrategies.failureRateRestart(
    10,
    Time.of(5, TimeUnit.MINUTES),
    Time.of(10, TimeUnit.SECONDS)
));

重试次数满了还是会挂掉的,还需要作业监控(任务运行状态、服务器指标等)提供异常告警。

Source

广播流和数据流通过log_name关联

code-03

Sink

Stream Kudu Sink 调用SinkKudu,传入配置和异常回调处理,把失败数据写入Kudu的log_fail表。

code-05

SinkKudu

继承RichSinkFunction实现自定义sink

public void open(Configuration parameters)
public void invoke(McLog mcLog, SinkFunction.Context context) 
public void close()

code-02

  1. 调用KuduWriter发送log_fail失败数据存入buffer;
  2. 按配置时间(默认10分钟)定时重发失败数据,超出配置失败数量(默认1000),抛异常按重启策略重启;
  3. checkpoint,实现snapshotState(把buffer写入checkpoint state保存)和initializeState(重启恢复,把checkpoint state的数据写回buffer)。

code-06

code-07

KuduWriter

  1. 配置flushMode(MANUAL_FLUSH)
  2. 配置mutationBufferSpace(buffer大小)和flushInterval(定时flush时间)
  3. 缓存table,收到广播流通知重新获取table

kudu-batch 重跑失败日志入库

kudu-connector依赖flink-connector-kudu_2.11_1.1-SNAPSHOT 默认依赖的1.10的版本有bug,使用1.12的版本,手动打包jar放到src/main/resources/lib/目录。

  1. 查询失败(配置limit)数据status <= 0,重新发送kudu,设置status为1 running
  2. 发送成功,设置status为2 success
  3. 发送失败,设置status为-1 fail,等待下次重发

打包部署

mvn clean package -Dfile.encoding=UTF-8 -DskipTests=true

打包后文件在target目录生成fkudu-xxx.jar,把jar文件上传到机器目录

配置

app.checkPoints=true # 开启checkPoint
app.checkPoints.fsStateBackend=true # checkPoint后端
app.checkPoints.checkPointDataUrl=obs://hadoop-obs/flink/checkpoints # checkPoint后端存放路径
app.checkPoints.enableCheckpointing=120000 # checkPoint时间间隔 2min
app.checkPoints.checkPointTimeOut=100000 #  checkPoint超时时间1min40s
app.retry.period=300000 # 重试发送失败日志周期,每5min
app.retry.size=1000 # 最大重试失败数量,超过抛异常重启
kudu.mode=MANUAL_FLUSH # kudu入库模式
kudu.flushInterval=5000 # MANUAL_FLUSH kudu 手动提交时间间隔
kudu.mutationBufferSpace=1000 # MANUAL_FLUSH kudu buffer数量,超过1半触发手动提交

Flink执行

  1. Standalone Cluster,独立集群
  2. Flink on Yarn
    • Session-Cluster,多个job共享一个yarn session
    • Per-Job-Cluster,一个job对应一个yarn session
  3. Docker
  4. Kubernetes

Flink per-job cluster

cd  /data/flink/fkudu_test
flink run -p 6 -m yarn-cluster /data/flink/fkudu_test/fkudu-xxx.jar

kudu-sink-yarn-02.png kudu-sink-yarn-03.png kudu-sink-yarn-04.png kudu-sink-yarn-06.png

hadoop fs -ls hdfs://xxx/user/root/.flink

命令行查看任务

yarn app -list
application_1620730305230_0035	Flink per-job cluster	        Apache Flink

flink list -yid application_1620730305230_0035
------------------ Running/Restarting Jobs -------------------
19.05.2021 15:17:20 : ab7e992bc14da2a67ee01c9735e86ac7 : flink kudu 11 (RUNNING)

flink-web-01.png

常见入库失败问题

  • 缺表
  • 缺分区
  • 缺主键字段
  • 字段溢出
bigdata11