kudu

kudu是什么

https://kudu.apache.org/docs/

kudu 是一个针对 Apache Hadoop 平台而开发的列式存储管理器。支持水平扩展,高可用。

  • OLAP 工作的快速处理
  • 与 MapReduce,Spark 和其他 Hadoop 生态系统组件集成
  • 与 Apache Impala(incubating)紧密集成,使其与 Apache Parquet 一起使用 HDFS 成为一个很好的可变的替代方案
  • 强大而灵活的一致性模型,允许您根据每个 per-request(请求选择)一致性要求,包括 strict-serializable(严格可序列化)一致性的选项
  • 针对同时运行顺序和随机工作负载的情况性能很好。
  • High availability(高可用性)Tablet server *和 *Master 使用 Raft Consensus Algorithm 来保证节点的高可用,确保只要有一半以上的副本可用
  • 结构化数据模型

kudu常见的几个应用场景

  • 实时更新的应用,刚刚到达的数据就马上要被终端用户使用访问到。
  • 时间序列相关的应用
    • 根据海量历史数据查询
    • 必须非常快地返回关于单个实体的细粒度查询
  • 实时预测模型的应用,支持根据所有历史数据周期地更新模型

列式存储

Columnar Data Store(列式数据存储)

Kudu 是一个 columnar data store(列式数据存储)。列式数据存储在强类型列中。由于几个原因,通过适当的设计,Kudu 对 analytical(分析)或 warehousing(数据仓库)工作会非常出色。

Read Efficiency(高效读取)

对于分析查询,允许读取单个列或该列的一部分同时忽略其他列,这意味着您可以在磁盘上读取更少块来完成查询。与基于行的存储相比,即使只返回几列的值,仍需要读取整行数据。

Data Compression(数据压缩)

由于给定的列只包含一种类型的数据,基于模式的压缩比压缩混合数据类型(在基于行的解决方案中使用)时更有效几个数量级。结合从列读取数据的效率,压缩允许您在从磁盘读取更少的块时完成查询。

kudu和其他存储工具的对比

kudu的集群架构基本和HBase类似,采用主从结构,Master节点管理元数据,Tablet节点负责分片管理数据。和HBase不同的是,Kudu没有借助于HDFS存储实际数据,而是自己直接在本地磁盘上管理分片数据,包括数据的Replication机制,kudu的Tablet server直接管理Master分片和Slave分片,自己通过raft协议解决一致性问题等,多个Slave可以同时提供数据读取服务,相对于HBase依托HDFS进行Region数据的管理方式,自主性会强一些,不过比如Tablet节点崩溃,数据的迁移拷贝工作等,也需要Kudu自己完成。

kudu 环境搭建

安装kudu

https://kudu.apache.org/docs/quickstart.html

kudu 本地搭建kudu环境,启动docker kudu服务

  1. git clone https://github.com/apache/kudu
  2. cd kudu
  3. export KUDU_QUICKSTART_IP=$(ifconfig | grep "inet " | grep -Fv 127.0.0.1 | awk '{print $2}' | tail -1)
  4. docker-compose -f docker/quickstart.yml up -d
  5. docker exec -it $(docker ps -aqf "name=kudu-master-1") /bin/bash

检查kudu服务的健康情况

  1. kudu cluster ksck kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251

kudu-impala docker启动kudu-impala服务,使用impala操作kudu

  1. docker run -d --name kudu-impala --network="docker_default" \
  2. -p 21000:21000 -p 21050:21050 -p 25000:25000 -p 25010:25010 -p 25020:25020 \
  3. --memory=4096m apache/kudu:impala-latest impala

进入impala-shell

  1. docker exec -it kudu-impala impala-shell

kudu操作

impala 操作

Kudu-Impala 集成特性

  • CREATE/ALTER/DROP TABLE
  • UPDATE / DELETE,Impala 支持 UPDATE和 DELETE SQL命令逐行或批处理修改kudu表中的已有的数据。
  • Flexible Partitioning(灵活分区)
  • Parallel Scan(并行扫描),Impala 使用的 Kudu 客户端可以跨多个 tablets 扫描
  • High-efficiency queries(高效查询)
  1. create database db_flink_26;
  2. use db_flink_26;
  3. CREATE TABLE my_first_table
  4. (
  5. id BIGINT,
  6. name STRING,
  7. PRIMARY KEY(id)
  8. )
  9. PARTITION BY HASH PARTITIONS 4
  10. STORED AS KUDU;
  11. DESCRIBE my_first_table;
  12. INSERT INTO my_first_table VALUES (99, "sarah");
  13. SELECT * FROM my_first_table;
  14. INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim");
  15. SELECT * FROM my_first_table;

kudu-1

web访问

http://localhost:8050/ http://localhost:8051/

kudu-2

注意:

  • kudu里面的表名格式是impala::库名.表名。
  • 查询已经存在表,通过api或者spark创的表,impala看不到,需要创映射
  1. CREATE EXTERNAL TABLE my_mapping_table
  2. STORED AS KUDU
  3. TBLPROPERTIES (
  4. 'kudu.table_name' = 'exist_kudu_table'

修改表的properties

  1. ALTER TABLE my_table RENAME TO my_new_table;--只是修改了impala映射
  2. ALTER TABLE my_internal_table
  3. SET TBLPROPERTIES('kudu.table_name' = 'new_name');--仅限内部表,修改kudu底层表名
  4. ALTER TABLE my_external_table_
  5. SET TBLPROPERTIES('kudu.table_name' = 'some_other_kudu_table');--修改外部表到别的kudu
  6. ALTER TABLE my_table
  7. SET TBLPROPERTIES('kudu.master_addresses' = 'ip:7051,...');--修改master地址
  8. ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE');--内部表修改为外部

kudu java的操作示例

kudu example kudu/examples/java

kudu 命令行操作

kudu操作表

  1. kudu table list kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251
  2. kudu table delete kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251 表名

kudu 分区

https://kudu.apache.org/docs/schema_design.html

Kudu 提供了两种类型的分区:range partitioning ( 范围分区 ) 和 hash partitioning ( 哈希分区 )。表也可能具有 多级分区 ,其结合范围和哈希分区,或多个散列分区实例。

Range Partitioning ( 范围分区 )

Kudu 允许在运行时从表中动态添加和删除范围分区,而不影响其他分区的可用性。删除分区将删除属于分区的 tablet 以及其中包含的数据。随后插入到丢弃的分区将失败。可以添加新分区,但不能与任何现有的分区分区重叠。 Kudu 允许在单个事务性更改表操作中删除并添加任意数量的范围分区。

动态添加和删除范围分区对于时间序列使用情况尤其有用。随着时间的推移,可以添加范围分区以覆盖即将到来的时间范围。例如,存储事件日志的表可以在每月开始之前添加一个月份的分区,以便保存即将到来的事件。必要时可以删除旧范围分区,以便有效地删除历史数据。

Hash Partitioning ( 哈希分区 )

哈希分区通过哈希值将行分配到许多 buckets ( 存储桶 )之一。哈希分区是一种有效的策略,当不需要对表进行有序访问时。哈希分区对于在 tablet 之间随机散布这些功能是有效的,这有助于减轻热点和 tablet 大小不均匀。

Multilevel Partitioning ( 多级分区 )

Kudu 允许一个表在单个表上组合多级分区。零个或多个哈希分区级别可以与可选的范围分区级别组合。

Flink kudu 入库

Flink Kudu Connector

官方推荐的Connect适合单个表的操作。

https://bahir.apache.org/docs/flink/current/flink-streaming-kudu/

  1. CREATE TABLE TestTable (
  2. first STRING,
  3. second STRING,
  4. third INT NOT NULL
  5. ) WITH (
  6. 'connector.type' = 'kudu',
  7. 'kudu.masters' = '...',
  8. 'kudu.table' = 'TestTable',
  9. 'kudu.hash-columns' = 'first',
  10. 'kudu.primary-key-columns' = 'first,second'
  11. )

自定义kudu sink

t_online、t_log_role_base_data测试表

  1. CREATE TABLE db_flink_26.t_log_online (
  2. pid BIGINT,
  3. agent_id INT,
  4. server_id INT,
  5. time INT,
  6. weekday INT,
  7. year INT,
  8. month INT,
  9. day INT,
  10. hour INT,
  11. min INT,
  12. online INT,
  13. platform INT,
  14. via STRING,
  15. client_version STRING,
  16. server_version STRING,
  17. mtime INT,
  18. pf_online STRING,
  19. PRIMARY KEY (pid,agent_id,server_id)
  20. )
  21. COMMENT 'online table'
  22. STORED AS KUDU
  23. TBLPROPERTIES (
  24. 'kudu.master_addresses' = 'kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251',
  25. 'kudu.num_tablet_replicas' = '1'
  26. );
  27. CREATE TABLE db_flink_26.t_log_role_base_data (
  28. pid BIGINT,
  29. agent_id INT,
  30. server_id INT,
  31. role_id BIGINT,
  32. role_name STRING,
  33. account_name STRING,
  34. sex TINYINT,
  35. create_time INT,
  36. category TINYINT,
  37. faction_id TINYINT,
  38. level INT,
  39. last_login_time INT,
  40. ip STRING,
  41. last_login_ip STRING,
  42. fight INT,
  43. money INT,
  44. bind_gold INT,
  45. gold INT,
  46. bind_copper INT,
  47. copper INT,
  48. platform INT,
  49. via STRING,
  50. upf INT,
  51. cid STRING,
  52. last_offline_time INT,
  53. regrow INT,
  54. device_id STRING,
  55. client_version STRING,
  56. server_version STRING,
  57. pay_money FLOAT,
  58. is_vip INT,
  59. mtime INT,
  60. idfa STRING,
  61. imei STRING,
  62. is_retran TINYINT,
  63. mac STRING,
  64. family_id BIGINT,
  65. family_level INT,
  66. PRIMARY KEY (pid,agent_id,server_id)
  67. )
  68. COMMENT 'role_base'
  69. STORED AS KUDU
  70. TBLPROPERTIES (
  71. 'kudu.master_addresses' = 'kudu-master-1:7051,kudu-master-2:7151,kudu-master-3:7251',
  72. 'kudu.num_tablet_replicas' = '1'
  73. );

主键中不支持更改,只能删除后重新添加

  1. 未指定分区字段时,其分区字段默认是主键,若主键有两个列则分区字段为两个,指定分区字段时,需要分区列是主键的子集;否则会报错Only key columns can be used in PARTITION BY
  2. 不指定分区:表依然会创建,但是只有一个分区,会提示Unpartitioned Kudu tables are inefficient for large data sizes.

JAVA API 提供了三种向 kudu 插入数据的刷新策略

  1. AUTO_FLUSH_SYNC
  2. AUTO_FLUSH_BACKGROUND
  3. MANUAL_FLUSH

https://blog.csdn.net/CZ_yjsy_data/article/details/88390696

Flink 入库 Kudu

  1. 自定义Kudu Sink继承RichSinkFunction,实现open、invoke、close方法

  2. 从传入McLog的POJO结构中获取tableName和Logs

kudu-3

  1. DEMO使用AUTO_FLUSH_BACKGROUND的方式,打印出执行的数据

kudu-4

  1. 查看impala,可以看到数据已经入库

kudu-5

问题

kudu分区过多会导致CPU占用高,需要根据业务按需调整

bigdata11