Flink Kudu Sink
通过Flink从Kafka订阅数据入库到Kudu
Kudu入库
JAVA API 提供了三种向 kudu 插入数据的刷新策略
AUTO_FLUSH_SYNC
AUTO_FLUSH_BACKGROUND
MANUAL_FLUSH
比较 | AUTO_FLUSH_SYNC |
AUTO_FLUSH_BACKGROUND |
MANUAL_FLUSH |
---|---|---|---|
是否批处理 | 否 | 是 | 是 |
吞吐量 | 低 | 高 | 高 |
灵活性 | 高 | 低 | 高 |
AUTO_FLUSH_SYNC
,apply() 返回之前已经刷新了缓冲区写入kudu tablet。flush方法调用不会产生任何影响。MANUAL_FLUSH
,apply() 立即返回,需要手动调用flush才会写入kudu tablet。AUTO_FLUSH_BACKGROUND
,调用将立即返回,写入将在后台发送。写操作是在后台进行的,后台Flush线程定时刷新,所以错误都将存储在会话本地缓冲区中。
集群配置,3台kudu master 6台kudu server(8核16G)
AUTO_FLUSH_BACKGROUND
,入库5s,40w - 50w row/sMANUAL_FLUSH
,和AUTO_FLUSH_BACKGROUND
同一数量级,稍慢一点AUTO_FLUSH_SYNC
模式,测试慢20倍左右,2-3w row/s
考虑到性能和灵活性,使用MANUAL_FLUSH入库方式。配置间隔5s发送 或 超过500条数据发送。测试6个并发度30w/s以上。
kudu.mode=MANUAL_FLUSH # kudu入库模式
kudu.flushInterval=5000 # MANUAL_FLUSH kudu 手动提交时间间隔
kudu.mutationBufferSpace=1000 # MANUAL_FLUSH kudu buffer数量,超过1000/2 = 500触发手动提交
代码结构
├── README.md
├── pom.xml 依赖库配置
├── src
│ └── main
│ ├── java
│ │ └── fkudu
│ │ ├── BaseStreamJob.java
│ │ ├── SinkKudu.java Kudu sink封装
│ │ ├── SinkKuduFailJob.java 失败日志处理入口
│ │ ├── StreamingJob.java 流计算入口
│ │ ├── constants 常量
│ │ ├── kudu kudu操作方法封装
│ │ ├── models POJO model
│ │ └── utiles
│ └── resources
│ ├── application.properties 项目配置文件
│ └── log4j.properties 日志配置文件
models
POJO,每个字段都有getter和setter方法。
业务模型
StreamingJob
- Environment
- env
- checkpoint
- restartStrategy
- Source
- kafka data source
- kafka config source
- Transform
- filter
- map
- connect
- Sink
- kudu sink -> fail callback
环境变量
checkpoint
Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
- 设置模式为exactly-once
- 设置checkpoint的周期, 每隔多久进行启动一个检查点
- 检查点必须在一分钟内完成,或者被丢弃
- 同一时间只允许进行的检查点
- Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint。修改配置conf/flink-conf.yaml
state.checkpoints.num-retained: 2
Flink checkpoint目录分别对应的是 jobId,flink提供了在启动之时通过设置 -s 参数指定checkpoint目录。
checkpoint 由元数据文件、数据文件(与 state backend 相关)组成
/user-defined-checkpoint-dir
/{job-id}
|
+ --shared/
+ --taskowned/
+ --chk-1/
|-- _metadata
+ --chk-2/
+ --chk-3/
...
_metadata 元数据
public class CompletedCheckpoint implements Serializable {
private final JobID job;
private final long checkpointID;
private final long timestamp;
private final long duration;
/** 本次 Checkpoint 中每个算子的 ID 及算子对应 State 信息 */
private final Map<OperatorID, OperatorState> operatorStates;
private final CheckpointProperties props;
private final Collection<MasterState> masterHookStates;
// Checkpoint 存储路径
private final CompletedCheckpointStorageLocation storageLocation;
// 元数据句柄
private final StreamStateHandle metadataHandle;
// Checkpoint 目录地址
private final String externalPointer;
private transient volatile CompletedCheckpointStats.DiscardCallback discardCallback;
}
重启策略
- 固定延迟重启策略
- 失败率重启策略
- 无重启策略
restart-strategy.failure-rate.delay: 10 s
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.max-failures-per-interval: 10
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 10 s
restart-strategy: none
华为云默认无重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
10, // 尝试重启次数
Time.of(10, TimeUnit.SECONDS) // 延迟时间间隔
));
// 重启策略,固定时间间隔允许Job重启的最大次数,固定时间间隔,两次重启的延迟时间
env.setRestartStrategy(RestartStrategies.failureRateRestart(
10,
Time.of(5, TimeUnit.MINUTES),
Time.of(10, TimeUnit.SECONDS)
));
重试次数满了还是会挂掉的,还需要作业监控(任务运行状态、服务器指标等)提供异常告警。
Source
广播流和数据流通过log_name关联
Sink
Stream Kudu Sink 调用SinkKudu
,传入配置和异常回调处理,把失败数据写入Kudu的log_fail表。
SinkKudu
继承RichSinkFunction实现自定义sink
public void open(Configuration parameters)
public void invoke(McLog mcLog, SinkFunction.Context context)
public void close()
- 调用
KuduWriter
发送log_fail失败数据存入buffer; - 按配置时间(默认10分钟)定时重发失败数据,超出配置失败数量(默认1000),抛异常按重启策略重启;
- checkpoint,实现snapshotState(把buffer写入checkpoint state保存)和initializeState(重启恢复,把checkpoint state的数据写回buffer)。
KuduWriter
- 配置flushMode(MANUAL_FLUSH)
- 配置mutationBufferSpace(buffer大小)和flushInterval(定时flush时间)
- 缓存table,收到广播流通知重新获取table
kudu-batch 重跑失败日志入库
kudu-connector依赖flink-connector-kudu_2.11_1.1-SNAPSHOT 默认依赖的1.10的版本有bug,使用1.12的版本,手动打包jar放到src/main/resources/lib/目录。
- 查询失败(配置limit)数据
status <= 0
,重新发送kudu,设置status为1 running - 发送成功,设置
status为2
success - 发送失败,设置
status为-1
fail,等待下次重发
打包部署
mvn clean package -Dfile.encoding=UTF-8 -DskipTests=true
打包后文件在target目录生成fkudu-xxx.jar,把jar文件上传到机器目录
配置
app.checkPoints=true # 开启checkPoint
app.checkPoints.fsStateBackend=true # checkPoint后端
app.checkPoints.checkPointDataUrl=obs://hadoop-obs/flink/checkpoints # checkPoint后端存放路径
app.checkPoints.enableCheckpointing=120000 # checkPoint时间间隔 2min
app.checkPoints.checkPointTimeOut=100000 # checkPoint超时时间1min40s
app.retry.period=300000 # 重试发送失败日志周期,每5min
app.retry.size=1000 # 最大重试失败数量,超过抛异常重启
kudu.mode=MANUAL_FLUSH # kudu入库模式
kudu.flushInterval=5000 # MANUAL_FLUSH kudu 手动提交时间间隔
kudu.mutationBufferSpace=1000 # MANUAL_FLUSH kudu buffer数量,超过1半触发手动提交
Flink执行
- Standalone Cluster,独立集群
- Flink on Yarn
- Session-Cluster,多个job共享一个yarn session
Per-Job-Cluster
,一个job对应一个yarn session
- Docker
- Kubernetes
Flink per-job cluster
cd /data/flink/fkudu_test
flink run -p 6 -m yarn-cluster /data/flink/fkudu_test/fkudu-xxx.jar
hadoop fs -ls hdfs://xxx/user/root/.flink
命令行查看任务
yarn app -list
application_1620730305230_0035 Flink per-job cluster Apache Flink
flink list -yid application_1620730305230_0035
------------------ Running/Restarting Jobs -------------------
19.05.2021 15:17:20 : ab7e992bc14da2a67ee01c9735e86ac7 : flink kudu 11 (RUNNING)
常见入库失败问题
- 缺表
- 缺分区
- 缺主键字段
- 字段溢出