zepplin
zepplin
Apache Zeppelin 是一个可以进行大数据可视化分析的交互式开发系统,可以承担数据接入、数据发现、数据分析、数据可视化、数据协作等任务,其前端提供丰富的可视化图形库,不限于SparkSQL,后端支持HBase、Flink 等大数据系统以插件扩展的方式,并支持Spark、Python、JDBC、Markdown、Shell 等各种常用Interpreter,这使得开发者可以方便地使用SQL 在 Zeppelin 中做数据开发。
为什么使用Zepplin
在不熟悉java、Flink的情况能否快速调试Flink Sql。使用纯SQl跑通Flink任务。
- 打通多种语言,Java、Scala、Python、SQL
- 支持SQL Comment,运行多条SQL语句
- 更容易提交和管理 Flink Job,并行执行多个 Flink Job
- 前端可视化支持
Zeppelin 架构
Zeppelin 主要分3层
- Web前端
- Zeppelin Server
- Interpreter
Zeppelin前端负责前端页面的交互,通过Rest API 和WebSocket的方式与Zeppelin Server进行交互
Zeppelin Server是一个Web server,负责管理所有的note,interpreter 等等,Zeppelin Server不做具体的代码执行,会交给Interpreter来执行代码
Interpreter 是一个独立的进程,负责具体前端用户提交的代码的执行(比如Spark Scala代码或者SQL代码等等)。Zeppelin Server与 Interpreter 之间是通过thrift 来进行通信,而且是双向通信。Zeppelin支持目前大部分流行的大数据引擎
Zeppelin Server是独立的进程,进程日志在logs目录下的 zeppelin-{user}-{host}.log
, 每个Interpreter也是一个独立的进程,进程日志是 logs目录下的 zeppelin-interpreter-{interpreter}-*.log
Flink on Zeppelin 架构
左边的Flink Interpreter可以理解成Flink的客户端,负责Flink Job的编译,提交和管理(比如Cancel Job,记录Checkpoint等等)。右边的Flink Cluster就是真正运行Flink Job的地方,可以是一个MiniCluster(local 模式),Standalone Cluster (remote模式),Yarn Session Cluster (yarn 模式)
Flink Interpreter内部会有2个重要的组件:Scala Shell,Python Shell
- Scala Shell 是整个Flink Interpreter的入口,ExecutionEnvironment,StreamExecutionEnvironment 以及所有类型的TableEnvironment都是在Scala Shell里创建,Scala shell负责Scala代码的编译和执行
- Python Shell 是PyFlink的入口,负责Python代码的编译和执行
zepplin搭建
wget https://mirrors.bfsu.edu.cn/apache/zeppelin/zeppelin-0.9.0/zeppelin-0.9.0-bin-all.tgz
tar -zvxf zeppelin-0.9.0-bin-all.tgz
修改端口号:默认是8080,为避免冲突,修改为其他端口号
cp zeppelin-site.xml.template zeppelin-site.xml vi zeppelin-site.xml
<property>
<name>zeppelin.server.port</name>
<value>8080</value>
<description>Server port.</description>
</property>
进入bin目录下执行,访问 (主机ip:设置的端口号)
./zeppelin-daemon.sh start
关闭zepplin
./zeppelin-daemon.sh stop
配置nginx访问
server
{
listen 80 ;
server_name zeppelin-debug-webapp.xx.com;
access_log off;
location / {
add_header Access-Control-Allow-Origin *;
add_header Cache-Control no-cache;
add_header Cache-Control private;
expires -1s;
default_type 'application/json; charset=UTF-8';
proxy_http_version 1.1;
# nginx,当它想要使用WebSocket时,响应http升级请求
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass http://127.0.0.1:8080;
}
}
Flink Hive
整合Hive,让Flink使用Hive Catalog存储Flink SQL 元数据。才能在不同的Flink Task(进程)使用Table。(表只能由Flink读写使用,不要用Hive去读写)。在Hive命令行中使用DESCRIBE FORMATTED命令查看表的元数据,如果是is_generic=true代表是Flink专用表),也可以直接使用Flink读写Hive表数据。
- Flink能够读取Hive的元数据 (HiveCatalog)
- Flink能够读取Hive表 (Flink hive connector)
-
Interpreters搜索Flink,配置HIVE_CONF_DIR = /etc/hive/conf(HIVE_CONF_DIR 为 含有hive-site.xml的目录)
-
配置
zeppelin.flink.enableHive = true
zeppelin.flink.hive.version = 1.1.0 (配置为对应Hive版本)
- 依赖
Copy 下面这些 dependencies 到flink的 lib 目录
- flink-connector-hive_2.11-{flink.version}.jar
- flink-hadoop-compatibility_2.11-{flink.version}.jar
- Hive 2.x的话只要拷贝 hive-exec-2.x.jar 到flink下的lib目录 ,如果是 Hive 1.x, 需要拷贝 hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar (如果是其他版本的Hive可能会略有不同,需要自己尝试,特别是CDH或者HDP里的Hive版本)
- 启动Hive metastore
Impala 整合
下载依赖包
wget https://downloads.cloudera.com/connectors/impala_jdbc_2.5.41.1061.zip
unzip impala_jdbc_2.5.41.1061.zip
cd 2.5.41.1061\ GA/
unzip Cloudera_ImpalaJDBC41_2.5.41.zip
cd zeppelin/interpreter/jdbc
mkdir -p impala/lib
把依赖的jar移到到 zeppelin/interpreter/jdbc/impala/lib
在zeppelin中配置依赖包
impala
首行必须是
%impala
Flink On Zepplin
Flink on Zeppelin三种模式
- Local 模式
- Remote 模式
- Yarn 模式
Local 模式会在本地启动一个MiniCluster(会以线程的方式跑在Flink Interpreter进程中)。MiniCluster的JobManager需要使用8081端口作为Rest API的端口。
Remote模式会连接一个已经创建好的Flink集群。配置FLINK_HOME 和 flink.execution.mode 为 remote 外,还需要配置 flink.execution.remote.host 和 flink.execution.remote.port 来指定JobManager的Rest API 地址。
Yarn模式会在Yarn集群中动态创建一个Flink Cluster,可以往这个Flink Session Cluster提交Flink Job了。
除了配置 FLINK_HOME 和 flink.execution.mode 外还需要配置 HADOOP_CONF_DIR,并且要确保Zeppelin这台机器可以访问你的hadoop集群, 你可以运行hadoop fs 命令来验证是否能够连接hadoop集群。
SQL
Zeppelin中有2种不同模式的SQL
%flink.ssql
(Streaming SQL,用StreamTableEnvironment 来执行 SQL)%flink.bsql
(Batch SQL, 用BatchTableEnvironment 来执行SQL)
支持的SQL 语句类型
- DDL(Data Definition Language)
- DML(Data Manipulation Language)
- DQL(Data Query Language)
- Flink 定制语句(比如set,help等等)
多语句支持,在Zeppelin 的每一个Paragraph中写多条SQL语句,每条SQL语句以分号间隔
Comment支持
- 以
--
开头的单行comment /* */
包围的多行comment
Job并行度支持
设置每个Paragraph的local properties:parallelism 来控制Flink SQL Job的并行度
%flink.ssql(parallelism=2)
JobName的设置,通过设置 jobName 的方式来指定 job name。只有单条insert 才支持设置jobName,select语句不支持。
%flink.ssql(jobName="test job")
Streaming SQL 结果可视化
- Single 模式,适合当输出结果是一行的情况
- Update 模式,适合多行输出的情况,会定期更新这多行数据(group by,limit 等)
- Append 模式,适合不断有新数据输出,但不会覆盖原有数据,只会不断append的情况
第三方依赖
- flink.execution.packages,Zeppelin会下载该选项指定的package以及该package的依赖放到flink interpreter的classpath
- flink.execution.jars
不要放到flink/lib
- 放在flink/lib 下会影响所有的flink job,即使你的job不依赖这些jar
- 不要把第三方jar放到flink/lib 下,除非万不得已(比如hive integration的hive相关jar)
权限
zeppelin可采用LDAP做身份认证+shiro做权限控制
数据脱敏-Credentials
ddl中定义了数据库连接信息,这些信息十分敏感不想暴露给其他人,这个时候我们可以用Credentials。
-
Credentials配置 先在interpreter配置injectCredentials true,也可在notebook界面做配置,比如执行时使用%flink(injectCredential=true)
-
Credentials打开
-
Credentials定义
- Entity就相当于是你的Credentials的Key,在访问时使用
- Credentials使用
- 每个人只能使用自己的Credentials。格式为user.EntityName和password.EntityName
Flink Sql 例子
%flink.ssql
CREATE DATABASE IF NOT EXISTS flink_source_test;
DROP TABLE IF EXISTS flink_source_test.s_log_online;
CREATE TABLE flink_source_test.s_log_online (
pid bigint,
agent_id int,
server_id int,
online int,
platform int,
via varchar,
mtime int,
pf_online varchar,
rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(mtime,'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.9',
'connector.topic' = 'flink_dsp_log_online',
'connector.properties.zookeeper.connect' = 'hd2.xx.com/kafka',
'connector.properties.bootstrap.servers' = 'hd2.xx.com:9092',
'connector.properties.group.id' = 'flink_stream_consumer_26',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
%flink.ssql
DROP TABLE IF EXISTS c_log_online;
CREATE TABLE c_log_online (
`agent_id` INT,
`server_id` INT,
`online` INT,
`mdate` TIMESTAMP(3)
)
WITH (
'connector.type' = 'jdbc',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.url' = 'jdbc:mysql://xxx:3306/test_log_flink?useSSL=false&characterEncoding=utf8',
'connector.table' = 'c_log_online',
'connector.username' = 'test_log_flink',
'connector.password' = 'xxx',
'connector.write.flush.max-rows' = '1',
'connector.write.flush.interval' = '5s',
'connector.write.max-retries' = '3'
);
%flink.ssql(type=update)
select * from flink_source_test.s_log_online order by mtime desc limit 10;
%flink.ssql
INSERT INTO c_log_online SELECT agent_id,server_id, max(online) AS online, TUMBLE_END(rowtime, INTERVAL '10' MINUTE) as mdate from flink_source_test.s_log_online GROUP BY agent_id,server_id,TUMBLE(rowtime, INTERVAL '10' MINUTE);