zepplin

zepplin

zepplin

zeppelin

Apache Zeppelin

Flink on Zeppelin

Apache Zeppelin 是一个可以进行大数据可视化分析的交互式开发系统,可以承担数据接入、数据发现、数据分析、数据可视化、数据协作等任务,其前端提供丰富的可视化图形库,不限于SparkSQL,后端支持HBase、Flink 等大数据系统以插件扩展的方式,并支持Spark、Python、JDBC、Markdown、Shell 等各种常用Interpreter,这使得开发者可以方便地使用SQL 在 Zeppelin 中做数据开发。

为什么使用Zepplin

在不熟悉java、Flink的情况能否快速调试Flink Sql。使用纯SQl跑通Flink任务。

  1. 打通多种语言,Java、Scala、Python、SQL
  2. 支持SQL Comment,运行多条SQL语句
  3. 更容易提交和管理 Flink Job,并行执行多个 Flink Job
  4. 前端可视化支持

Zeppelin 架构

zepplin-framework-01.png

Zeppelin 主要分3层

  • Web前端
  • Zeppelin Server
  • Interpreter

Zeppelin前端负责前端页面的交互,通过Rest API 和WebSocket的方式与Zeppelin Server进行交互

Zeppelin Server是一个Web server,负责管理所有的note,interpreter 等等,Zeppelin Server不做具体的代码执行,会交给Interpreter来执行代码

Interpreter 是一个独立的进程,负责具体前端用户提交的代码的执行(比如Spark Scala代码或者SQL代码等等)。Zeppelin Server与 Interpreter 之间是通过thrift 来进行通信,而且是双向通信。Zeppelin支持目前大部分流行的大数据引擎

Zeppelin Server是独立的进程,进程日志在logs目录下的 zeppelin-{user}-{host}.log, 每个Interpreter也是一个独立的进程,进程日志是 logs目录下的 zeppelin-interpreter-{interpreter}-*.log

Flink on Zeppelin 架构

zepplin-framework-02.png

左边的Flink Interpreter可以理解成Flink的客户端,负责Flink Job的编译,提交和管理(比如Cancel Job,记录Checkpoint等等)。右边的Flink Cluster就是真正运行Flink Job的地方,可以是一个MiniCluster(local 模式),Standalone Cluster (remote模式),Yarn Session Cluster (yarn 模式)

Flink Interpreter内部会有2个重要的组件:Scala Shell,Python Shell

  • Scala Shell 是整个Flink Interpreter的入口,ExecutionEnvironment,StreamExecutionEnvironment 以及所有类型的TableEnvironment都是在Scala Shell里创建,Scala shell负责Scala代码的编译和执行
  • Python Shell 是PyFlink的入口,负责Python代码的编译和执行

zepplin搭建

wget https://mirrors.bfsu.edu.cn/apache/zeppelin/zeppelin-0.9.0/zeppelin-0.9.0-bin-all.tgz
tar -zvxf zeppelin-0.9.0-bin-all.tgz

修改端口号:默认是8080,为避免冲突,修改为其他端口号

cp zeppelin-site.xml.template zeppelin-site.xml vi zeppelin-site.xml

<property>
  <name>zeppelin.server.port</name>
  <value>8080</value>
  <description>Server port.</description>
</property>

进入bin目录下执行,访问 (主机ip:设置的端口号)

./zeppelin-daemon.sh start

关闭zepplin

./zeppelin-daemon.sh stop

配置nginx访问

server
{
    listen 80 ;
    server_name zeppelin-debug-webapp.xx.com;
    access_log off;

    location / {
        add_header Access-Control-Allow-Origin *;
        add_header Cache-Control no-cache;
        add_header Cache-Control private;
        expires -1s;
        default_type 'application/json; charset=UTF-8';
        proxy_http_version 1.1;
        # nginx,当它想要使用WebSocket时,响应http升级请求
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_pass http://127.0.0.1:8080;
    }
}

nginx访问连接websocket

整合Hive,让Flink使用Hive Catalog存储Flink SQL 元数据。才能在不同的Flink Task(进程)使用Table。(表只能由Flink读写使用,不要用Hive去读写)。在Hive命令行中使用DESCRIBE FORMATTED命令查看表的元数据,如果是is_generic=true代表是Flink专用表),也可以直接使用Flink读写Hive表数据。

hive_catalog

Hive 整合

  • Flink能够读取Hive的元数据 (HiveCatalog)
  • Flink能够读取Hive表 (Flink hive connector)
  1. Interpreters搜索Flink,配置HIVE_CONF_DIR = /etc/hive/conf(HIVE_CONF_DIR 为 含有hive-site.xml的目录)

  2. 配置

zeppelin.flink.enableHive = true
zeppelin.flink.hive.version = 1.1.0 (配置为对应Hive版本)
  1. 依赖

Copy 下面这些 dependencies 到flink的 lib 目录

  • flink-connector-hive_2.11-{flink.version}.jar
  • flink-hadoop-compatibility_2.11-{flink.version}.jar
  • Hive 2.x的话只要拷贝 hive-exec-2.x.jar 到flink下的lib目录 ,如果是 Hive 1.x, 需要拷贝 hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar (如果是其他版本的Hive可能会略有不同,需要自己尝试,特别是CDH或者HDP里的Hive版本)
  1. 启动Hive metastore

Impala 整合

Impala 整合

下载依赖包

wget https://downloads.cloudera.com/connectors/impala_jdbc_2.5.41.1061.zip
unzip impala_jdbc_2.5.41.1061.zip
cd 2.5.41.1061\ GA/
unzip Cloudera_ImpalaJDBC41_2.5.41.zip

cd zeppelin/interpreter/jdbc
mkdir -p impala/lib

把依赖的jar移到到 zeppelin/interpreter/jdbc/impala/lib

在zeppelin中配置依赖包

zepplin-impala-01.png

zepplin-impala-02.png

zepplin-impala-03.png

impala

首行必须是

%impala

zepplin-impala-04.png

Flink on Zeppelin三种模式

  • Local 模式
  • Remote 模式
  • Yarn 模式

Local 模式会在本地启动一个MiniCluster(会以线程的方式跑在Flink Interpreter进程中)。MiniCluster的JobManager需要使用8081端口作为Rest API的端口。

Remote模式会连接一个已经创建好的Flink集群。配置FLINK_HOME 和 flink.execution.mode 为 remote 外,还需要配置 flink.execution.remote.host 和 flink.execution.remote.port 来指定JobManager的Rest API 地址。

Yarn模式会在Yarn集群中动态创建一个Flink Cluster,可以往这个Flink Session Cluster提交Flink Job了。

除了配置 FLINK_HOME 和 flink.execution.mode 外还需要配置 HADOOP_CONF_DIR,并且要确保Zeppelin这台机器可以访问你的hadoop集群, 你可以运行hadoop fs 命令来验证是否能够连接hadoop集群。

SQL

Zeppelin中有2种不同模式的SQL

  • %flink.ssql (Streaming SQL,用StreamTableEnvironment 来执行 SQL)
  • %flink.bsql (Batch SQL, 用BatchTableEnvironment 来执行SQL)

支持的SQL 语句类型

  • DDL(Data Definition Language)
  • DML(Data Manipulation Language)
  • DQL(Data Query Language)
  • Flink 定制语句(比如set,help等等)

多语句支持,在Zeppelin 的每一个Paragraph中写多条SQL语句,每条SQL语句以分号间隔

Comment支持

  • -- 开头的单行comment
  • /* */ 包围的多行comment

Job并行度支持

设置每个Paragraph的local properties:parallelism 来控制Flink SQL Job的并行度

%flink.ssql(parallelism=2)

JobName的设置,通过设置 jobName 的方式来指定 job name。只有单条insert 才支持设置jobName,select语句不支持。

%flink.ssql(jobName="test job")

Streaming SQL 结果可视化

  • Single 模式,适合当输出结果是一行的情况
  • Update 模式,适合多行输出的情况,会定期更新这多行数据(group by,limit 等)
  • Append 模式,适合不断有新数据输出,但不会覆盖原有数据,只会不断append的情况

第三方依赖

  • flink.execution.packages,Zeppelin会下载该选项指定的package以及该package的依赖放到flink interpreter的classpath
  • flink.execution.jars

不要放到flink/lib

  • 放在flink/lib 下会影响所有的flink job,即使你的job不依赖这些jar
  • 不要把第三方jar放到flink/lib 下,除非万不得已(比如hive integration的hive相关jar)

权限

zeppelin可采用LDAP做身份认证+shiro做权限控制

数据脱敏-Credentials

Zeppelin Credentials机制

ddl中定义了数据库连接信息,这些信息十分敏感不想暴露给其他人,这个时候我们可以用Credentials。

  1. Credentials配置 先在interpreter配置injectCredentials true,也可在notebook界面做配置,比如执行时使用%flink(injectCredential=true)

  2. Credentials打开

  3. Credentials定义

  • Entity就相当于是你的Credentials的Key,在访问时使用
  1. Credentials使用
  • 每个人只能使用自己的Credentials。格式为user.EntityName和password.EntityName
%flink.ssql

CREATE DATABASE IF NOT EXISTS flink_source_test;
DROP TABLE IF EXISTS flink_source_test.s_log_online;
CREATE TABLE flink_source_test.s_log_online  (
    pid bigint,
    agent_id int,
    server_id int,
    online int,
    platform int,
    via varchar,
    mtime int,
    pf_online varchar,
    rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(mtime,'yyyy-MM-dd HH:mm:ss')),
    WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = '0.9',
    'connector.topic' = 'flink_dsp_log_online',
    'connector.properties.zookeeper.connect' = 'hd2.xx.com/kafka',
    'connector.properties.bootstrap.servers' = 'hd2.xx.com:9092',
    'connector.properties.group.id' = 'flink_stream_consumer_26',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);

%flink.ssql

DROP TABLE IF EXISTS c_log_online;
CREATE TABLE c_log_online (
    `agent_id` INT,
    `server_id` INT,
    `online` INT,
    `mdate` TIMESTAMP(3)
)
WITH (
    'connector.type' = 'jdbc',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.url' = 'jdbc:mysql://xxx:3306/test_log_flink?useSSL=false&characterEncoding=utf8',
    'connector.table' = 'c_log_online',
    'connector.username' = 'test_log_flink',
    'connector.password' = 'xxx',
    'connector.write.flush.max-rows' = '1',
    'connector.write.flush.interval' = '5s',
    'connector.write.max-retries' = '3'
);

%flink.ssql(type=update)

select * from flink_source_test.s_log_online order by mtime desc limit 10;

%flink.ssql

INSERT INTO c_log_online SELECT agent_id,server_id, max(online) AS online, TUMBLE_END(rowtime, INTERVAL '10' MINUTE) as mdate from flink_source_test.s_log_online GROUP BY agent_id,server_id,TUMBLE(rowtime, INTERVAL '10' MINUTE);
bigdata11