当前位置:网站首页>数据湖(十八):Flink与Iceberg整合SQL API操作
数据湖(十八):Flink与Iceberg整合SQL API操作
2022-07-22 08:43:00 【Lanson】
Flink与Iceberg整合SQL API操作
Flink SQL 在操作Iceberg时,对应的版本为Flink 1.11.x 与Iceberg0.11.1版本,目前,Flink1.14.2版本与Iceberg0.12.1版本对于SQL API 来说兼容有问题,所以这里使用Flink1.11.6版本与Iceberg0.11.1版本来演示Flink SQL API 操作Iceberg。
一、SQL API 创建Iceberg表并写入数据
1、创建新项目,导入如下maven依赖包
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!-- flink 1.11.x 与Iceberg 0.11.1 合适-->
<flink.version>1.11.6</flink.version>
<hadoop.version>3.2.2</hadoop.version>
</properties>
<dependencies>
<!-- Flink 操作Iceberg 需要的Iceberg依赖 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime</artifactId>
<version>0.11.1</version>
</dependency>
<!-- java 开发Flink 所需依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 读取hdfs文件需要jar包-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Flink SQL & Table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
2、编写Flink SQL 创建Iceberg表并写入数据
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.使用当前Catalog
tblEnv.useCatalog("hadoop_iceberg");
//3.创建数据库
tblEnv.executeSql("create database iceberg_db");
//4.使用数据库
tblEnv.useDatabase("iceberg_db");
//5.创建iceberg表 flink_iceberg_tbl
tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");
//6.写入数据到表 flink_iceberg_tbl
tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");
3、在Hive中映射Iceberg表并查询
在Hive中执行如下命令创建对应的Iceberg表:
#在Hive中创建Iceberg表
CREATE TABLE flink_iceberg_tbl2 (
id int,
name string,
age int,
loc string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
#在Hive中查询Iceberg表中的数据
hive> select * from flink_iceberg_tbl2;
OK
3 ww 20 guangzhou
1 zs 18 beijing
2 ls 19 shanghai
二、SQL API 批量查询Iceberg表数据
Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.批量读取表数据
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");
tableResult.print();
结果如下:
三、SQL API 实时查询Iceberg表数据
Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据
// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s
TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
tableResult.print();
启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。
#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;
#向Hive 中Iceberg 表插入两条数据
hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');
在控制台可以看到实时新增数据
四、SQL API指定基于快照实时增量读取数据
Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +
"'type'='iceberg'," +
"'catalog-type'='hadoop'," +
"'warehouse'='hdfs://mycluster/flink_iceberg')");
//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取
//start-snapshot-id :快照ID
TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");
tableResult2.print();
边栏推荐
- Super practical transformation strategy: 2022 central state-owned enterprise cloud native landing practical guide was officially released
- 请教下,oracle-cdc是不是不支持检查点,当实时采集过程中任务挂了到重启这段时间的数据变化是不
- 基于细粒度嵌入空间预留的密文域图像可逆信息隐藏方法
- Shell script uses expect automatic interactive login to remote host for batch shutdown
- How MySQL queries the data in JSON data
- Shallow solution of ZMQ lockless queue
- Multi type low rate DDoS attack detection method based on hybrid deep learning
- 开源dns软件PowerDNS BIND9 mydns
- 如何破解IT中年危机下的惨淡人生
- 关于STemwin中,外部实体按键操作Spinbox控件(fishing_2)
猜你喜欢
[opencv introduction practice] use the front camera of the computer for face detection
「武汉理工大学 软件工程复习」第一章 | 软件工程概述
QT notes - qudpsocket of network communication
QT笔记—— QTableWidget 之 拖拽行数 和 移动
「武汉理工大学 软件工程复习」第七章 | 软件测试
「武汉理工大学 软件工程复习」第五章 | 软件体系结构
Shallow solution of ZMQ lockless queue
QT笔记——操作Execl
Concurrent model values actor and CSP
Web3 sharing
随机推荐
计算机网络学习笔记7-TCP编程流程及面试题
Shell script uses expect automatic interactive login to remote host for batch shutdown
「武汉理工大学 软件工程复习」第三章 | 软件需求
Gbase8s database set autofree statement
Super practical transformation strategy: 2022 central state-owned enterprise cloud native landing practical guide was officially released
Verilog for loop (1)
VCs and Verdi learning records
【Rust】为什么我建议你学一下 Rust | Rust 初探
Verilog basic syntax (2)
会议OA项目之会议发布功能
verilog之for循环(1)
zmq无锁队列的浅解
基于细粒度嵌入空间预留的密文域图像可逆信息隐藏方法
面向项目版本差异性的漏洞识别技术研究
「武汉理工大学 软件工程复习」第七章 | 软件测试
Yunyuanyuan (10) | introduction to kubernetes in kubernetes
超实用转型攻略:《2022央国企云原生落地实用指南》正式发布
[bug] datetime format failed
Dokcer running Nacos container automatic exit problem
C#入门系列(二十七) -- LINQ简析