侧边栏壁纸
博主头像
码途 博主等级

行动起来,活在当下

  • 累计撰写 72 篇文章
  • 累计创建 0 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Dinky - FlinkSQL Studio & 实时计算平台

htmltoo
2023-12-09 / 0 评论 / 0 点赞 / 13 阅读 / 0 字

1.Dinky介绍

1.1.介绍

Dinky 是一个开箱即用、易扩展,以 Apache Flink 为基础,连接 OLAP 和数据湖等众多框架的一站式实时计算平台,致力于流批一体和湖仓一体的探索与实践。

1.2.功能

沉浸式 FlinkSQL 数据开发:自动提示补全、语法高亮、语句美化、在线调试、语法校验、执行计划、MetaStore、血缘分析、版本对比等
支持 FlinkSQL 多版本开发及多种执行模式:Local、Standalone、Yarn/Kubernetes Session、Yarn Per-Job、Yarn/Kubernetes Application
支持 Apache Flink 生态:Connector、FlinkCDC、Table Store 等
支持 FlinkSQL 语法增强:表值聚合函数、全局变量、执行环境、语句合并、整库同步等
支持 FlinkCDC 整库实时入仓入湖、多库输出、自动建表、模式演变
支持 Flink Java / Scala / Python UDF 开发与自动提交
支持 SQL 作业开发:ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、Presto、SqlServer、StarRocks 等
支持实时在线调试预览 Table、 ChangeLog、统计图和 UDF
支持 Flink Catalog、数据源元数据在线查询及管理
支持自动托管的 SavePoint/CheckPoint 恢复及触发机制:最近一次、最早一次、指定一次等
支持实时任务运维:上线下线、作业信息、集群信息、作业快照、异常信息、数据地图、数据探查、历史版本、报警记录等
支持作为多版本 FlinkSQL Server 以及 OpenApi 的能力
支持实时作业报警及报警组:钉钉、微信企业号、飞书、邮箱等
支持多种资源管理:集群实例、集群配置、Jar、数据源、报警组、报警实例、文档、系统配置等
支持企业级管理功能:多租户、用户、角色、命名空间等
更多隐藏功能等待小伙伴们探索

1.3.概述

Dinky 作为 Apache Flink 的 FlinkSQL 的实时计算平台,具有以下核心特点。
支持 Flink 原生语法、连接器、UDF 等: 几乎零成本将 Flink 作业迁移至 Dinky。
增强 FlinkSQL 语法: 表值聚合函数、全局变量、CDC多源合并、执行环境、语句合并等。
支持 Flink 多版本: 支持作为多版本 FlinkSQL Server 的能力以及 OpenApi。
支持外部数据源的 DB SQL 操作: 如 ClickHouse、Doris、Hive、Mysql、Oracle、Phoenix、PostgreSql、Presto、SqlServer、StarRocks 等。
支持实时任务运维: 作业上线下线、作业信息、集群信息、作业快照、异常信息、作业日志、数据地图、即席查询、历史版本、报警记录等。

1.4.管理控制台介绍

Dinky 实时计算平台开发模块包括 数据开发、运维中心、注册中心 和 系统设置 四大模块。

1.5.数据开发

数据开发包括作业管理、作业配置和运维管理等

1.6.运维中心

图片.png

1.7.注册中心

注册中心包括集群管理、Jar管理、数据源管理、报警管理和文档管理

图片.png

1.8.系统设置

系统设置包括用户管理和Flink设置

图片.png

2.快速开始

2.1.环境准备

通过 dinky-mysql-server 和 dinky-standalone-server 镜像快速体验 Flink 实时计算平台。

-启动该镜像提供 Dinky 的 Mysql 业务库能力。
docker run --name dinky-mysql dinkydocker/dinky-mysql-server:0.7.2
-启动该镜像提供 Dinky 实时计算平台。
docker run --restart=always -p 8888:8888 -p 8081:8081 -e MYSQL_ADDR=dinky-mysql:3306 --name dinky --link dinky-mysql:dinky-mysql dinkydocker/dinky-standalone-server:0.7.2-flink14

2.2.入门示例

2.2.1.创建作业

IP:8888 地址打开平台并 admin/admin 登录,创建 功能示例 目录,创建 HelloWorld 的 FlinkSQL 作业。
执行模式选择 Local 并输入以下语句:

CREATE TABLE Orders (
    order_number BIGINT,
    price        DECIMAL(32,2),
    buyer        ROW<first_name STRING, last_name STRING>,
    order_time   TIMESTAMP(3)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'number-of-rows' = '50'
);
select order_number,price,first_name,last_name,order_time from Orders 

图片.png

2.2.2.调试查询

点击 执行按钮(执行当前的SQL),下方切换至 结果 选项卡,点击 获取最新数据 ,即可查看 Select 语句的执行结果。

图片.png

2.3.作业教程

-FlinkSQL 操作步骤
1.进入 Dinky 的 Data Studio
2.在左侧菜单栏,右键 目录
3.新建目录或作业
4.在新建文件的对话框,填写作业信息
参数说明备注作业名称作业名称在当前项目中必须保持唯一作业类型流作业和批作业均支持以下作业类型:FlinkSQL:支持SET、DML、DDL语法FlinkSQLEnv:支持SET、DDL语法FlinkSQLEnv 场景适用于所有作业的SET、DDL语法统一管理的场景,当前FlinkSQLEnv 在SQL编辑器的语句限制在1000行以内
5.在作业开发 SQL 编辑器,编写 DDL 和 DML 代码
示例代码如下:

-创建源表datagen_source
CREATE TABLE datagen_source(
  id  BIGINT,
  name STRING
) WITH (
  'connector' = 'datagen'
);
创建结果表blackhole_sink
CREATE  TABLE blackhole_sink(
   id  BIGINT,
   name STRING
) WITH (
  'connector' = 'blackhole'
);
将源表数据插入到结果表
INSERT INTO blackhole_sink
SELECT
   id  ,
   name 
from datagen_source;

新建作业如下图:

图片.png
6.在作业开发页面右侧 执行配置,填写配置信息
类型配置项备注作业配置执行模式区别详见用户手册数据开发中的作业概述作业配置集群实例Standalone 和 Session 执行模式需要选择集群实例,详见集群实例管理作业配置集群配置Per-Job 和 Application 执行模式需要选择集群配置,详见集群配置管理作业配置FlinkSQL 环境选择已创建的 FlinkSQLEnv,如果没有则不选作业配置任务并行度指定作业级任务并行度,默认为 1作业配置Insert 语句集默认禁用,开启后将 SQL编辑器中编写的多个 Insert 语句合并为一个 JobGraph 进行提交作业配置全局变量默认禁用,开启后可以使用数据源连接配置变量、自定义变量等作业配置批模式默认禁用,开启后启用 Batch Mode作业配置SavePoint 策略默认禁用,策略包括:最近一次最早一次指定一次作业配置报警组报警组配置详见报警管理作业配置其他配置其他的 Flink 作业配置,具体可选参数,请参考 Flink 官网
作业配置如下图:

图片.png

2.4.作业提交

-功能说明
执行当前的 SQL: 提交执行未保存的作业配置,并可以同步获取 SELECT、SHOW 等执行结果,常用于 Local、Standalone、Session 执行模式;
异步提交: 提交执行最近保存的作业配置,可用于所有执行模式;
发布 发布当前作业的最近保存的作业配置,发布后无法修改;
上线 提交已发布的作业配置,可触发报警;
下线 停止已上线的作业,并触发 SavePoint;
停止 只停止已提交的作业;
维护 使已发布的作业进入维护状态,可以被修改;
注销 标记作业为注销不可用状态。
-常用场景
查看 FlinkSQL 执行结果: 执行当前的 SQL。
提交作业: 异步提交。
上线作业: SavePoint 最近一次 + 上线。
下线作业: 下线。
升级作业: 下线后上线。
全新上线作业: SavePoint 禁用 + 上线。
-Flink作业启动操作步骤
1.首先登录Dinky数据开发控制台
2.在左侧菜单栏选择目录 > 作业名称 > 检查当前的FlinkSql > 发布 > 上线
或者选择目录 > 作业名称 > 检查当前的FlinkSql > 异步提交
作业启动异步提交如下图:

图片.png
作业启动发布上线如下图:

图片.png

2.5.作业调试

可以选择使用 Standalone 或 Session 集群在开发测试环境对作业调试,如作业运行、检查结果等。配置 Standalone 或 Session 集群请参考注册中心中集群管理的集群实例管理。
也可以调试普通的 DB SQL 作业。
FlinkSQL作业调试步骤
1.进入 Data Studio
2.点击 目录 > 新建目录 > 新建作业
3.填写完作业信息后,单击 确认,作业类型选择 FlinkSQL
4.编写完整的 FlinkSQL 语句,包含 CREATE TABLE 等
示例代码如下:

-创建源表datagen_source
CREATE TABLE datagen_source(
  id  BIGINT,
  name STRING
) WITH (
  'connector' = 'datagen'
);
将源表数据插入到结果表
SELECT
   id  BIGINT,
   name STRING
from datagen_source

5.单击保存
6.单击语法检查
7.配置执行配置
配置项 说明
预览结果 默认开启,可预览 FlinkSQL 的执行结果
打印流 默认禁用,开启后将展示 ChangeLog
最大行数 默认 100,可预览的执行结果最大的记录数
自动停止 默认禁用,开启后达到最大行数将停止作业
注意: 预览聚合结果如 count 等操作时,关闭打印流可合并最终结果。
8.单击执行当前的SQL
9.结果 或者 历史 > 预览数据 可以手动查询最新的执行结果

2.6.FlinkSQL 预览结果的必要条件

1.执行模式必须是 Local、Standalone、Yarn Session、Kubernetes Session 其中的一种;
2.必须关闭 Insert 语句集;
3.除 SET 和 DDL 外,必须只提交一个 SELECT 或 SHOW 或 DESC 语句;
4.必须开启 预览结果;
5.作业必须是提交成功并且返回 JID,同时在远程集群可以看到作业处于 RUNNING 或 FINISHED 状态;
6.Dinky 重启后,之前的预览结果将失效

2.7.DB SQL 调试

选择对应数据源,并书写其 sql 执行即可。

3.SQL 快速入门

Dinky 是基于 Flink 的流批一体化数据汇聚、数据同步的实时计算平台,通过阅读本文档,您将可以零基础上手实时计算平台 Dinky 。

3.1.1创建集群配置或集群实例

首先,登录 Dlinky,选择注册中心>>集群管理>>集群实例管理或集群配置管理,点击新建 Flink 集群

图片.png

-创建作业
选择数据开发>>目录,首先点击创建目录,点击创建好的目录右键即可创建作业

图片.png

3.1.2.集群实例

Dinky 推荐您在使用 Yarn Session、K8s Session、StandAlone 采用集群实例的方式注册集群。
1.可通过数据开发中的快捷引导注册集群实例。或者通过注册中心中的集群管理注册集群实例。
2.添加 Flink 集群

图片.png
集群配置
Dinky 推荐您在使用 Yarn Per Job、Yarn Application、K8s Application 采用集群配置的方式注册集群。
1.可通过数据开发中的快捷引导注册集群配置。或者通过注册中心中的集群管理注册集群配置。
2.添加集群配置
图片.png

3.1.3.作业开发

-创建集群完成后,就可进一步开发 FlinkSQL 作业
-脚本选用 Flink 官网提供的 SQL 脚本,参考链接如下:
https://github.com/ververica/flink-sql-cookbook
-下载 flink-faker 放入$FLINK_HOME/lib下及Dlinky的plugins下
https://github.com/knaufk/flink-faker/releases

3.1.3.1.FlinkSQL 作业创建

下面创建一个作业名称为"test66"的作业
图片.png
创建完成后,即可在"test66"作业下写 SQL 及 配置作业参数
图片.png

3.1.3.2.FlinkSQL 语句编写

FlinkSQL 作业编写,分为三部分内容,分别是 SET 参数设置、DDL 语句编写、DML 语句编写。下面以Inserting Into Tables 为例。
图片.png

3.1.3.3.作业配置

当 FlinkSQL 编写完成后,即可进行作业的配置。作业配置的详细说明详见用户手册的作业基础配置
在作业配置中,您可以选择作业执行模式、Flink 集群、SavePoint策略等配置,对作业进行提交前的配置。
图片.png

3.1.3.4.SQL查询预览

上述 FlinkSQL 作业配置完成后,可以对 SQL 做查询预览。
点击执行配置,开启打印流,保存。点击执行当前的SQL。即可获取到最新结果。
图片.png

3.1.3.5.发布运行作业

在数据写入 Sink 端时,Dlinky 提供了异步提交 和 上线发布功能,将其作业提交到远程集群
图片.png

3.1.3.6.查看作业运行情况

当作业提交到远程集群后,您可以在运维中心查看作业的运行情况。
图片.png

3.1.3.7.作业开发指引

图片.png

3.2.DB SQL 作业快速入门

3.2.1.创建数据源

选择注册中心>>数据源管理>>新建,假设您连接Doris。
图片.png
图片.png
图片.png
测试创建成功后,显示如下
图片.png

3.2.2.创建作业

点击数据开发>>目录>>右键,出现创建作业菜单。作业类型选择Doris
图片.png

3.2.3.作业配置

作业创建完成后,在最右侧会出现数据源,选择连接的数据源
图片.png

3.2.4.ETL 作业编写

外部数据源可以创建 DDL、DML语句对其进行ETL开发。
图片.png

3.2.5.作业提交运行

当 ETL 开发结束 或者做即席查询时,可以点击保存>>语法检查>>运行当前的SQL 将 SQL 提交。
图片.png

4.数据集成指南

目前通过 FlinkCDC 进行会存在诸多问题,如需要定义大量的 DDL 和编写大量的 INSERT INTO,更为严重的是会占用大量的数据库连接,对 Mysql 和网络造成压力。
Dinky 定义了 CDCSOURCE 整库同步的语法,该语法和 CDAS 作用相似,可以直接自动构建一个整库入仓入湖的实时任务,并且对 source 进行了合并,不会产生额外的 Mysql 及网络压力,支持对任意 sink 的同步,如 kafka、doris、hudi、jdbc 等等
图片.png

4.1.原理

4.1.1.source合并

Dinky 采用的是只构建一个 source,然后根据 schema、database、table 进行分流处理,分别 sink 到对应的表。

4.1.2.元数据映射

Dinky 是通过自身的数据源中心的元数据功能捕获源库的元数据信息,并同步构建 sink 阶段 datastream 或 tableAPI 所使用的 FlinkDDL。
图片.png

4.1.3.多种 sink 方式

Dinky 提供了各式各样的 sink 方式,通过修改语句参数可以实现不同的 sink 方式。Dinky 支持通过 DataStream 来扩展新的 sink,也可以使用 FlinkSQL 无需修改代码直接扩展新的 sink。
图片.png

4.2.环境准备

4.2.1.作业配置

禁用全局变量、禁用语句集、禁用批模式。

目前 dlink-client-1.14 内的整库同步能力最多且主要维护,如果要使用其他 flink 版本的整库同步,如果 SQLSink 不满足需求,需要DataStreamSink 支持,请手动仿照 dlink-client-1.14 扩展相应代码实现,很简单。

4.2.3.其他 FlinkCDC 支持

目前 dlink-client-1.14 内默认实现常用的 Flink CDC,如 MysqlCDC、OracleCDC、PostgresCDC 和 SQLServerCDC,如果要使用其他 FlinkCDC,请在 Dinky 源码中仿照 MysqlCDC 进行扩展,很简单。

4.2.4.依赖上传

由于 CDCSOURCE 是 Dinky 封装的新功能,Apache Flink 源码不包含,非 Application 模式提交需要在远程 Flink 集群所使用的依赖里添加一下依赖:
-将下面 Dinky根目录下 整库同步依赖包放置 $FLINK_HOME/lib下
lib/dlink-client-base-1.8.0.jar
lib/dlink-common-1.8.0.jar
plugins/flink-​{flink-version}/dlink-client-{version}.jar

4.2.5.Application 模式提交

目前已经支持 application ,需提前准备好相关jar包,或者和 add jar语法并用。以 mysqlcdc-2.3.0 和 flink-1.14 为例,需要以下 jar
flink-shaded-guava-18.0-13.0.jar
HikariCP-4.0.3.jar
druid-1.2.8.jar
dlink-metadata-mysql-0.7.2.jar
dlink-metadata-base-0.7.2.jar
jackson-datatype-jsr310-2.13.4.jar
flink-sql-connector-mysql-cdc-2.3.0.jar
dlink-client-1.14-0.7.2.jar
cdcsource_example.png

4.2.6.注意事项

一个 FlinkSQL 任务只能写一个 CDCSOURCE,CDCSOURCE 前可写 set、add jar 和 ddl 语句。
配置项中的英文逗号前不能加空格,需要紧随右单引号。

4.3.配置

配置项	     是否必须	默认值	说明
connector	是	无	指定要使用的连接器
hostname	是	无	数据库服务器的 IP 地址或主机名
port	是	无	数据库服务器的端口号
username	是	无	连接到数据库服务器时要使用的数据库的用户名
password	是	无	连接到数据库服务器时要使用的数据库的密码
scan.startup.mode	否	latest-offset	消费者的可选启动模式,有效枚举为“initial”和“latest-offset”
database-name	否	无	此参数非必填
table-name	否	无	只支持正则,示例:"test\.student,test\.score",所有表示例:"test\..*"
source.*	否	无	指定个性化的 CDC 配置,如 source.server-time-zone 即为 server-time-zone 配置参数。
checkpoint	否	无	单位 ms
parallelism	否	无	任务并行度
sink.connector	是	无	指定 sink 的类型,如 datastream-kafka、datastream-doris、datastream-hudi、kafka、doris、hudi、jdbc 等等,以 datastream- 开头的为 DataStream 的实现方式
sink.sink.db	否	无	目标数据源的库名,不指定时默认使用源数据源的库名
sink.table.prefix	否	无	目标表的表名前缀,如 ODS 即为所有的表名前拼接 ODS
sink.table.suffix	否	无	目标表的表名后缀
sink.table.upper	否	false	目标表的表名全大写
sink.table.lower	否	false	目标表的表名全小写
sink.auto.create	否	false	目标数据源自动建表,目前只支持 Mysql,其他可自行扩展
sink.timezone	否	UTC	指定目标数据源的时区,在数据类型转换时自动生效
sink.column.replace.line-break	否	false	指定是否去除换行符,即在数据转换中进行 REGEXP_REPLACE(column, '\n', '')
sink.*	否	无	目标数据源的配置信息,同 FlinkSQL,使用 ${schemaName} 和 ${tableName} 可注入经过处理的源表名
sink[N].*	否	无	N代表为多数据源写入, 默认从0开始到N, 其他配置参数信息参考sink.*的配置.

4.4.示例

4.4.1.整库同步到 Mysql

该示例为将 mysql 整库同步到另一个 mysql 数据库,写入 test 库,表名前缀 test_,表名全小写,开启自动建表。

EXECUTE CDCSOURCE cdc_mysql WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'checkpoint' = '3000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'table-name' = 'bigdata\.products,bigdata\.orders',
 'sink.connector' = 'jdbc',
 'sink.url' = 'jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false',
 'sink.username' = 'root',
 'sink.password' = '123456',
 'sink.sink.db' = 'test',
 'sink.table.prefix' = 'test_',
 'sink.table.lower' = 'true',
 'sink.table-name' = '${tableName}',
 'sink.driver' = 'com.mysql.jdbc.Driver',
 'sink.sink.buffer-flush.interval' = '2s',
 'sink.sink.buffer-flush.max-rows' = '100',
 'sink.sink.max-retries' = '5',
 'sink.auto.create' = 'true'

4.4.2.整库同步到 Oracle

该示例将 Oracle 数据库 TEST 下所有表同步到该数据库的 TEST2下。

EXECUTE CDCSOURCE cdc_oracle WITH (
 'connector' = 'oracle-cdc',
 'hostname' = '127.0.0.1',
 'port' = '1521',
 'username'='root',
 'password'='123456',
 'database-name'='ORCL',
 'checkpoint' = '3000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'table-name' = 'TEST\..*',
 'connector' = 'jdbc',
 'url' = 'jdbc:oracle:thin:@127.0.0.1:1521:orcl',
 'username' = 'root',
 'password' = '123456',
 'table-name' = 'TEST2.${tableName}'
)

4.4.3.整库同步到 Kafka

汇总到一个 topic
当指定 sink.topic 参数时,所有 Change Log 会被写入这一个 topic。

EXECUTE CDCSOURCE cdc_kafka_one WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'checkpoint' = '3000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'table-name' = 'bigdata\.products,bigdata\.orders',
 'sink.connector'='datastream-kafka',
 'sink.topic'='cdctest',
 'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092'
)

同步到对应 topic
当不指定 sink.topic 参数时,所有 Change Log 会被写入对应库表名的 topic。

EXECUTE CDCSOURCE cdc_kafka_mul WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'checkpoint' = '3000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'table-name' = 'bigdata\.products,bigdata\.orders',
 'sink.connector'='datastream-kafka',
 'sink.brokers'='bigdata2:9092,bigdata3:9092,bigdata4:9092'
)

使用 FlinkSQL 同步到对应 topic

EXECUTE CDCSOURCE cdc_upsert_kafka WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'checkpoint' = '3000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'table-name' = 'bigdata\.products,bigdata\.orders',
 'sink.connector' = 'upsert-kafka',
 'sink.topic' = '${tableName}',
 'sink.properties.bootstrap.servers' = 'bigdata2:9092,bigdata3:9092,bigdata4:9092',
 'sink.key.format' = 'avro',
 'sink.value.format' = 'avro'
)

4.4.4.整库同步到 ClickHouse

EXECUTE CDCSOURCE cdc_clickhouse WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'checkpoint' = '3000',
 'scan.startup.mode' = 'initial',
 'parallelism' = '1',
 'table-name' = 'bigdata\.products,bigdata\.orders',
  'sink.connector' = 'clickhouse',
  'sink.url' = 'clickhouse://127.0.0.1:8123',
  'sink.username' = 'default',
  'sink.password' = '123456',
  'sink.sink.db' = 'test',
  'sink.table.prefix' = 'test_',
  'sink.table.lower' = 'true',
  'sink.database-name' = 'test',
  'sink.table-name' = '${tableName}',
  'sink.sink.batch-size' = '500',
  'sink.sink.flush-interval' = '1000',
  'sink.sink.max-retries' = '3'
)

4.4.5.整库同步到两个数据源

EXECUTE CDCSOURCE jobname WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'checkpoint' = '3000',
  'scan.startup.mode' = 'initial',
  'parallelism' = '1',
  'table-name' = 'test\.student,test\.score',
  'sink[0].connector' = 'doris',
  'sink[0].fenodes' = '127.0.0.1:8030',
  'sink[0].username' = 'root',
  'sink[0].password' = 'dw123456',
  'sink[0].sink.batch.size' = '1',
  'sink[0].sink.max-retries' = '1',
  'sink[0].sink.batch.interval' = '60000',
  'sink[0].sink.db' = 'test',
  'sink[0].table.prefix' = 'ODS_',
  'sink[0].table.upper' = 'true',
  'sink[0].table.identifier' = '${schemaName}.${tableName}',
  'sink[0].sink.label-prefix' = '${schemaName}_${tableName}_1',
  'sink[0].sink.enable-delete' = 'true',
  'sink[1].connector'='datastream-kafka',
  'sink[1].topic'='cdc',
  'sink[1].brokers'='127.0.0.1:9092'
)

5.使用手册

图片.png
图片.png
图片.png

6.开发者指南-本地调试

6.1.前置条件

在搭建Dinky开发环境之前请确保你已经安装如下软件
Git:版本控制软件
JDK:后端开发
Maven:Java包管理
Node:前端开发;

6.2.环境要求

环境版本npm7.19.0node.js14.17.0jdk1.8maven3.6.0+lombokIDEA插件安装mysql5.7+

6.3.代码克隆

请通过 git 管理工具从 GitHub 中拉取 Dinky 源码
mkdir workspace
cd workspace
git clone https://github.com/DataLinkDC/dlink.git

6.4.安装 Lombok 插件

IDEA 提供了插件设置来安装 Lombok 插件。如果尚未安装,请在导入 Dlink 之前按照以下说明来进行操作以启用对 Lombok 注解的支持:
转到 IDEA Settings → Plugins 并选择 Marketplace 。
选择并安装 Lombok 插件。
如果出现提示,请重启 IDEA 。

6.5.导入 Dinky

启动 IDEA 并选择 Open。
选择已克隆的 Dlink 存储库的根文件夹。
等待项目加载完成。
设置 JDK 1.8 和 Maven 3.6.0。

6.6.前端环境

安装 node.js, 安装 npm
因 node.js 安装后 npm 版本较高,因此需要可用版本 7.19.0,升级npm命令如下:
npm install npm@7.19.0 -g
初始化依赖
npm install --force

6.7.源码编译

IDEA 里 Build → Build Project

6.8.打包

mvn clean install -Dmaven.test.skip=true
-如若修改版本,按以下指定即可。flink可支持多版本(1.11-1.16)
mvn clean install -Dmaven.test.skip=true -P pord,scala-2.11,flink-1.14,flink-1.15
-如若不需要web编译,-P 后面加: !web
mvn clean install -Dmaven.test.skip=true -P !web,pord,scala-2.11,flink-1.14,flink-1.15

6.9.修改pom文件

需要修改 dlink根目录下的pom文件,下面以本地开发为例,修改如下:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <!--  `provided` for product environment ,`compile` for dev environment  -->
        <scope.runtime>compile</scope.runtime>
</properties>

6.10.修改配置文件

修改dlink根目录下/dlink-admin/src/main/resources/application.ym文件
配置数据库连接信息:

spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
    username: dlink
    password: dlink
    driver-class-name: com.mysql.cj.jdbc.Driver

6.11.初始化数据库

在MySQL数据库创建 dlink 用户并在 dlink 数据库中执行 script/sql/dinky-mysql.sql 文件。此外 script/sql/upgrade 目录下存放了了各版本的升级 sql 请依次按照版本号执行。

6.12.启动后端服务

启动 dlink-admin 下的 Dlink 启动类,可见 8888 端口。
稍微等待一会,即可访问 127.0.0.1:8888 可见登录页。
登录用户/密码: admin/admin

7.升级指南

7.1.后端编译包升级

下载最新的编译包,对比一下 安装目录/config 下的文件,主要是 application.yml ,如果没用到最新特性,在最新版中修改一下mysql连接配置即可, 如果需要使用新特性,相关配置看相关文档描述即可

7.2.数据库 SQL 升级

-- 注意: 按照版本号依次升级 切不可跨版本升级 1.8.0 代表的是你目前的 dinky版本+1 依次往下执行

-其中 /opt/dinky 是你dinky安装的目录 
mysql> source /opt/dinky/sql/upgrade/1.8.0_schema/mysql/dinky_ddl.sql -- 表的ddl
mysql> source /opt/dinky/sql/upgrade/1.8.0_schema/mysql/dinky_dml.sql  -- 表初始化数据 (部分版本无)

0

评论区