当前位置:网站首页>Flink Doris connector design
Flink Doris connector design
2022-07-21 03:04:00 【Zhangjiafeng】
Flink Doris Connector design scheme
The program first thanks the community Spark Doris Connector The author of
from Doris Perspective , Bring its data into Flink, have access to Flink A rich range of ecological products , Broaden the imagination of the product , It also makes Doris Joint queries with other data sources are possible
Starting from our business architecture and business requirements , We chose Flink As part of our architecture , For data ETL And real time computing framework , The community is now supporting Spark doris connector, So we refer to Spark doris connector The design developed Flink doris Connector.
Technology selection
At the beginning of our selection , Also and Spark Doris Connector equally , The first thing to think about is JDBC The way , But it's like Spark doris connector The article says , It has advantages , But the disadvantages are more obvious . Then we read and tested Spark Code for , Decided to stand on the shoulders of giants to achieve ( remarks : Direct copy code modification ).
The following is from Spark Doris Connector Blog , It's a direct copy
So we developed a system for Doris The new Datasource,Spark-Doris-Connector. Under this scheme ,Doris Can expose Doris The data is distributed to Spark.Spark Of Driver visit Doris Of FE obtain Doris Tabular Schema And the underlying data distribution . after , Based on this data distribution , Reasonably allocate data query tasks to Executors. Last ,Spark Of Executors Visit different BE The query . Greatly improve the efficiency of query
Usage method
stay Doris The code base of extension/flink-doris-connector/ Compile and generate under the directory doris-flink-1.0.0-SNAPSHOT.jar, Put this jar Package join flink Of ClassPath in , You can use Flink-on-Doris The function of
SQL The way
CREATE TABLE flink_doris_source ( name STRING, age INT, price DECIMAL(5,2), sale DOUBLE ) WITH ( 'connector' = 'doris', 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', 'username' = '$YOUR_DORIS_USERNAME', 'password' = '$YOUR_DORIS_PASSWORD');CREATE TABLE flink_doris_sink ( name STRING, age INT, price DECIMAL(5,2), sale DOUBLE ) WITH ( 'connector' = 'doris', 'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT', 'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME', 'username' = '$YOUR_DORIS_USERNAME', 'password' = '$YOUR_DORIS_PASSWORD');INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
DataStream The way
DorisOptions.Builder options = DorisOptions.builder() .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") .setUsername("$YOUR_DORIS_USERNAME") .setPassword("$YOUR_DORIS_PASSWORD") .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME");env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();
Applicable scenario
1. Use Flink Yes Doris And other data sources
Many business departments will put their data on different storage systems , For example, some online analysis 、 The data of the report is placed in Doris in , Some structured retrieval data is placed in Elasticsearch in 、 Some data that needs to be put in MySQL in , wait . Business often needs to be analyzed across multiple storage sources , adopt Flink Doris Connector Get through Flink and Doris after , Business can be used directly Flink, take Doris Do joint query calculation with multiple external data sources .
2. Real time data access
Flink Doris Connector Before : For business irregular data , It is often necessary to standardize the processing of messages , Write new null filter values topic, And then it starts Routine load write in Doris.
Flink Doris Connector after :flink Read kafka, Direct write doris.
Technical realization
Architecture diagram
Doris Provide more capabilities to the outside world
Doris FE
Access to metadata information of internal tables is open to the outside world 、 Interface between single table query planning and partial statistical information .
be-all Rest API All interfaces need to be HttpBasic authentication , The user name and password are the user name and password to log into the database , We need to pay attention to the correct allocation of permissions .
// Get the table schema Meta information GET api/{database}/{table}/_schema// Get the query planning template for a single table POST api/{database}/{table}/_query_plan{"sql": "select k1, k2 from {database}.{table}"}// Get table size GET api/{database}/{table}/_count
Doris BE
adopt Thrift agreement , Direct external data filtering 、 Scanning and clipping capabilities .
service TDorisExternalService { // Initialize the query executor TScanOpenResult open_scanner(1: TScanOpenParams params);// streaming batch get data ,Apache Arrow data format TScanBatchResult get_next(1: TScanNextBatchParams params);// End scan TScanCloseResult close_scanner(1: TScanCloseParams params);}
Thrift For the definition of related structures, please refer to :
https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift
Realization DataStream
Inherit org.apache.flink.streaming.api.functions.source.RichSourceFunction , Customize DorisSourceFunction, On initialization , Get the execution plan of the related table , Get the corresponding partition .
rewrite run Method , Loop reads data from the partition .
public void run(SourceContext sourceContext){ // Read each partition circularly for(PartitionDefinition partitions : dorisPartitions){ scalaValueReader = new ScalaValueReader(partitions, settings); while (scalaValueReader.hasNext()){ Object next = scalaValueReader.next(); sourceContext.collect(next); } }}
Realization Flink SQL on Doris
Refer to the Flink Customize Source&Sink and Flink-jdbc-connector, It achieves the following effect , It can be realized by Flink SQL Direct manipulation Doris Table of , Including reading and writing .
Implementation details
1. Realization DynamicTableSourceFactory , DynamicTableSinkFactory register doris connector
2. Customize DynamicTableSource and DynamicTableSink Generating logical plans
3.DorisRowDataInputFormat and DorisDynamicOutputFormat Get the logical plan and start execution .
The main implementation is based on RichInputFormat and RichOutputFormat custom DorisRowDataInputFormat and DorisDynamicOutputFormat.
stay DorisRowDataInputFormat in , Will get dorisPartitions stay createInputSplits in Cut into pieces , For Parallel Computing .
public DorisTableInputSplit[] createInputSplits(int minNumSplits) { List<DorisTableInputSplit> dorisSplits = new ArrayList<>(); int splitNum = 0; for (PartitionDefinition partition : dorisPartitions) { dorisSplits.add(new DorisTableInputSplit(splitNum++,partition)); } return dorisSplits.toArray(new DorisTableInputSplit[0]);} public RowData nextRecord(RowData reuse) { if (!hasNext) { // I've read the data , return null return null; } List next = (List)scalaValueReader.next(); GenericRowData genericRowData = new GenericRowData(next.size()); for(int i =0;i<next.size();i++){ genericRowData.setField(i, next.get(i)); } // Judge if there is any data hasNext = scalaValueReader.hasNext(); return genericRowData;}
stay DorisRowDataOutputFormat in , adopt streamload The way to doris Write data in .streamload Procedure reference org.apache.doris.plugin.audit.DorisStreamLoader
public void writeRecord(RowData row) throws IOException { //streamload Default separator \t StringJoiner value = new StringJoiner("\t"); GenericRowData rowData = (GenericRowData) row; for(int i = 0; i < row.getArity(); ++i) { value.add(rowData.getField(i).toString()); } //streamload Writing data DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value.toString()); System.out.println(loadResponse);}
follow-up Flink Doris Connector plan
1.doris sink Batch write
2.doris sink Support json Data writing
Official account number , Welcome to your attention , The article will start the official account
边栏推荐
- Software testing career development direction (don't be confused, roll up quickly)
- Learning and using websocket
- 老域名建站有什么优势?
- What is integer lifting (instance)
- Highlight first! 2022 open atom global open source summit is scheduled to be held in Beijing on July 25-29
- HMS core machine learning service creates a new "sound" state of simultaneous interpreting translation, and AI makes international exchanges smoother
- DNP3 模拟器使用教程
- J9 Digital Platform Theory: the possibilities and limitations of defi in the metauniverse
- Principle of triode
- Understand and apply continuous deployment Argo CD
猜你喜欢
浏览器工作原理剖析
Buying insight and channel evaluation help optimize marketing decisions
三极管原理
About Variables
What is integer lifting (instance)
[wechat applet request encapsulation] [advanced version] process 401 request token expiration -- login again -- reissue the request that just expired
HMS core audio editing service supports 7 kinds of audio effects to help one-stop audio processing
不掌握这些坑,你敢用BigDecimal吗?
Jenkins自动化部署
5-FU/DEX-g-PLA纳米微粒/BSA-AgNCs-PEI纳米粒/Cu(DDC)2蛋白纳米粒的制备
随机推荐
Highlight first! 2022 open atom global open source summit is scheduled to be held in Beijing on July 25-29
What should virtual host agents pay attention to?
The week of the year corresponding to the return date
泛型和包装类
开发中常见环境配置名词-dev、sit、pro、fac等
Date get the name of the working day from the object
Laravel scheduled task
Practice of online problem feedback module (VIII): realize image upload function (Part 1)
[wechat applet]: paging request data, pull-up load, pull-down refresh.
[Muduo] build project compile cmake file and noncopyable
Solution to the blank display of user management in Zhimeng Dede background system
Lombok详细介绍
Calculate the date a few days ago from today
毕业季--各应用场景案例使用技术
About Variables
二叉树实现(根据层级数组生成二叉树)
STL list constructor, size
[wechat applet] text field input with maximum word limit (1/100)
电子招标采购商城系统:优化传统采购业务,提速企业数字化升级
Apache Doris ODBC Mysql外表在Ubuntu下使用方法及配置