当前位置:网站首页>Flink DataStream API (十四)Flink 输出到 MySQL(JDBC)
Flink DataStream API (十四)Flink 输出到 MySQL(JDBC)
2022-07-20 05:31:00 【啦啦啦001】
尽管在大数据处理中直接与 MySQL 交互的场景不多,但最终处理的计算结果是要给外部应用消费使用的,而外部应用读取的数据存储往往就是 MySQL。所以我们也需要知道如何将数据输出到 MySQL 这样的传统数据库。
写入数据的 MySQL 的测试步骤如下。
(1)添加依赖
<!--Flink 对接 MySQL 依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
(2)启动 MySQL,在 database 库下建表 clicks
CREATE TABLE `writetodb` (
`sensorname` varchar(23) DEFAULT NULL,
`timestamp` bigint(20) DEFAULT NULL,
`wendu` double DEFAULT NULL
)
(3)编写输出到 MySQL 的示例代码
object TestToMySQLEasy {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.enableCheckpointing(3000L) //毫秒,写入 Mysql 必须开启检查点
env.setParallelism(1)
val sql = "INSERT INTO writetodb (sensorname, timestamp, wendu) VALUE (?,?,?)"
env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/test.txt")
.map(elem => {
val data = elem.split(",")
(data(0), data(1).toLong, data(2).toDouble)
})
.addSink(JdbcSink.sink(
sql, //
new JdbcStatementBuilder[(String,Long,Double)] {
override def accept(ps: PreparedStatement, u: (String, Long, Double)): Unit = {
ps.setString(1, u._1);
ps.setLong(2, u._2);
ps.setDouble(3, u._3);
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://192.168.23.34:3306/zlp")
.withUsername("root")
.withPassword("123456")
.build()
));
env.execute()
}
}
(4)运行代码,用客户端连接 MySQL,查看是否成功写入数据。
边栏推荐
猜你喜欢
随机推荐
antd mobile 表单验证 rc-form 使用
vscode拷贝 同步插件 拓展
ORALCE mapping CLOB
Ctfshow web entry information collection WP (1-20) (detailed explanation)
【学习笔记之菜Dog学C】数据存储
nvm安装使用
Los Angeles: t226229 arithmetic series
TypeScript
B. Making Towers
Introduction à la théorie des micro - services
C语言函数作业
C语言写一个环形广告跑马灯或改为表白系统
蓝桥杯2020年第十一届国赛真题-天干地支
问题 B: 蓝桥杯2020年第十一届省赛真题-回文日期
选择排序/基数排序
mysql_user表_字段含义
【排序】桶式排序与基数排序
2022-7-19 Gu Yujia's learning notes of group 8 (this keyword and packaging)
【学习笔记之菜Dog学C】初识指针
P1913 战斗之伞兵