当前位置:网站首页>Filter/split/sideoutput comparison of Flink diversion
Filter/split/sideoutput comparison of Flink diversion
2022-07-21 03:43:00 【Lalala001】
Streaming scene
We often encounter such scenes in production practice , You need to split the input source as needed , For example, I expect to split user access logs according to the geographical location of visitors . How to operate in the face of such a demand ?
Generally speaking, for different scenarios , There are three ways to split streams .
- Filter shunt
- Split shunt
- SideOutPut shunt
1. Filter shunt
Scala Case study
/** * Flink The way of diversion * 1. Filter shunt ( The original stream is filtered many times , Lead to consumption performance ) * 2. Split shunt ( Secondary diversion is not supported ) * 3. SideOutput shunt ( The official recommendation ) */
object filterStreamExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
//2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
val littleStream = inputStream.filter(_.split(",")(0).toInt < 500)
val bigStream = inputStream.filter(_.split(",")(0).toInt >= 500)
// Print the results
littleStream.print("little------")
bigStream.print("big------")
env.execute()
}
}
Output results :
little------> 496,2012-05-10,2,1,5,0,4,1,1,0.505833,0.491783,0.552083,0.314063,1026,5546,6572
little------> 498,2012-05-12,2,1,5,0,6,0,1,0.564167,0.544817,0.480417,0.123133,2622,4807,7429
little------> 499,2012-05-13,2,1,5,0,0,0,1,0.6125,0.585238,0.57625,0.225117,2172,3946,6118
big------> 500,2012-05-14,2,1,5,0,1,1,2,0.573333,0.5499,0.789583,0.212692,342,2501,2843
big------> 501,2012-05-15,2,1,5,0,2,1,2,0.611667,0.576404,0.794583,0.147392,625,4490,5115
Filter The shortcomings of :
Filter The disadvantages of : In order to get the stream data we need , You need to traverse the original stream multiple times , This virtually wastes our cluster resources .
2. Split shunt
Scala Case study
object splitStreamExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
//1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
//2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
val splitStream: SplitStream[String] = inputStream.split(new OutputSelector[String] {
override def select(out: String): lang.Iterable[String] = {
val tags = new util.ArrayList[String]()
if (out.split(",")(0).toInt < 500) {
tags.add("littleStream")
} else if (out.split(",")(0).toInt >= 500) {
tags.add("bigStream")
}
return tags
}
})
splitStream.select("littleStream").print("little------")
splitStream.select("bigStream").print("big------")
env.execute()
}
}
Output results :
little------:13> 36,2011-02-05,1,0,2,0,6,0,2,0.233333,0.243058,0.929167,0.161079,100,905,1005
little------:15> 137,2011-05-17,2,0,5,0,2,1,2,0.561667,0.538529,0.837917,0.277354,678,3445,4123
little------:13> 37,2011-02-06,1,0,2,0,0,0,1,0.285833,0.291671,0.568333,0.1418,354,1269,1623
big------:9> 592,2012-08-14,3,1,8,0,2,1,1,0.726667,0.676779,0.686667,0.169158,1128,5656,6784
little------:13> 38,2011-02-07,1,0,2,0,1,1,1,0.271667,0.303658,0.738333,0.0454083,120,1592,1712
big------:9> 593,2012-08-15,3,1,8,0,3,1,1,0.706667,0.654037,0.619583,0.169771,1198,6149,7347
Split The shortcomings of :
But be careful , Use split The stream cut by the operator , It's impossible to do a second segmentation , If the above-mentioned cut out littleStream and bigStream The stream calls again split segmentation , The console will throw the following exception .
Exception in thread “main” java.lang.IllegalStateException:
Consecutive multiple splits are not supported. Splits are deprecated.
Please use side-outputs.
What's the reason for this ? We can see comments in the source code , This method has been abandoned and it is recommended to use the latest SideOutPut Shunt operation .
3. SideOutPut shunt
SideOutPut yes Flink The framework provides us with the latest and most recommended diversion methods , In the use of SideOutPut when , You need to follow these steps :
• Definition OutputTag
• Call a specific function to split data
• ProcessFunction
• KeyedProcessFunction
• CoProcessFunction
• KeyedCoProcessFunction
• ProcessWindowFunction
• ProcessAllWindowFunction
Here we use ProcessFunction How to use it SideOutPut:
Scala Case study
object sideOutStreamExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
//1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
//2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
// Define two OutTag
val littleOutTag = new OutputTag[String]("littleStream")
val bigOutTag = new OutputTag[String]("bigStream")
val processStream = inputStream.process(new ProcessFunction[String, String] {
override def processElement(i: String,
context: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
if (i.split(",")(0).toInt < 500) {
context.output(littleOutTag, i)
} else if (i.split(",")(0).toInt >= 500) {
context.output(bigOutTag, i)
}
}
})
val littleStream = processStream.getSideOutput(littleOutTag)
val bigStream = processStream.getSideOutput(bigOutTag)
littleStream.print("little------")
bigStream.print("big------")
env.execute()
}
}
Output results :
big------:15> 682,2012-11-12,4,1,11,1,1,0,1,0.485,0.475383,0.741667,0.173517,1097,5172,6269
little------:4> 184,2011-07-03,3,0,7,0,0,0,2,0.716667,0.668575,0.6825,0.228858,2282,2367,4649
big------:15> 683,2012-11-13,4,1,11,0,2,1,2,0.343333,0.323225,0.662917,0.342046,327,3767,4094
little------:4> 185,2011-07-04,3,0,7,1,1,0,2,0.726667,0.665417,0.637917,0.0814792,3065,2978,6043
You can see , We split the flow , And successfully printed out the results . Pay attention here ,Flink The latest offer SideOutPut Method split flow can be split multiple times , There is no need to worry about an exception .
summary
- Filter shunt ( The original stream is filtered many times , Lead to consumption performance )
- Split shunt ( Secondary diversion is not supported )
- SideOutput shunt ( The official recommendation )
边栏推荐
- 【CANN训练营】基于昇腾CANN平台的AI CPU算子开发
- 【2022年第一期 CANN训练营进阶班模型课】第一次大作业和附加内容
- [cann training camp] essays on the realization of GaN based on Shengsi
- Centos8 (Linux) install mysql8.0.28
- Koa2 cannot receive the formdata data submitted by the post method (value: {})
- Performance optimization of uitableview
- 昇腾工业质检应用实战
- Xiaobai tutorial -- Anaconda's jupyter notebook automatic completion configuration tutorial
- 【深度学习】卷积神经网络最大池化运算
- Docker remote connection MySQL
猜你喜欢
Redis持久化
基于华为自研NPU Ascend 910的TensorFlow 1.x训练脚本迁移和使能混合精度记录
Hyperledger fabric super ledger CA construction and use
基于昇腾AI异构计算架构CANN的通用目标检测与识别一站式方案初体验
电脑端微信有很多垃圾可以清理
MIMO - OFDM Wireless Communication Technology and matlab Implementation (2) - outdoor Channel Model under SISO
(2) Pytorch deep learning: gradient descent
李宏毅2020机器学习--P17 CNN &P 14
【深度学习】卷积神经网络最大池化运算
理解原始Gan Loss 和 Hinge Gan Loss
随机推荐
Data warehouse products
MySQL master-slave replication docker cluster construction
【CANN训练营】基于昇腾CANN平台的AI CPU算子开发
Realization of data warehouse technology
#HPDC 华为伙伴暨开发者大会2022随笔
(6) Pytorch deep learning: logistic regression (multi-layer and multi-dimensional feature input)
Endnote x9 import journal reference format of paper submission
Paper reading -- risk resistant resource allocation for embB and urllc coexistence under m/g/1 queuing model
SQL子查询
Detection model of 2D target detection overview (2)
JS event flow (capture phase, target phase, bubble phase) cancels the default bubble behavior of the browser
(八)PyTorch深度学习:卷积神经网络(基础)---将(七)全连接神经网络改成卷积神经网络
Dameng index management
[camp d'entraînement can] essai de mise en œuvre du Gan basé sur Shengsi
事务(隔离界别)
(三)PyTorch深度学习:反向传播梯度下降
(5) Pyqt5 series tutorials: use pychart to design the internal logic of pyqt5 in the serial port assistant parameter options (II)
docker 远程连接 mysql
Select all on the current page of Ali vector Gallery
The insertion order of single chain storage structure