当前位置:网站首页>Spark RDD operator: partition operation, mappartitions and mappartitionswithindex
Spark RDD operator: partition operation, mappartitions and mappartitionswithindex
2022-07-22 19:54:00 【Erha of Xiaowu family】
mapPartitions
mapPartitions The implementation process of is to partition first , Then perform corresponding operations on the elements in the partition . If you need to create additional objects frequently in the process of mapping , Use mapPartitions Than map Be more efficient .
For example : Calculate each element times 2.
Java End
package Action;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.*;
public class Partition {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("partition");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
// Multiply each element by 2
JavaRDD<Integer> mapPartitions = rdd.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public Iterator<Integer> call(Iterator<Integer> i) throws Exception {
List<Integer> list = new ArrayList<>();
while (i.hasNext()) {
Integer num = i.next();
list.add(num * 2);
}
return list.iterator();
}
});
mapPartitions.foreach(new VoidFunction<Integer>() {
@Override
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
System.out.println("==================");
// Output key value pair form
JavaRDD<Tuple2<Integer, Integer>> tuple2JavaRDD = rdd.mapPartitions(new FlatMapFunction<Iterator<Integer>, Tuple2<Integer, Integer>>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Iterator<Integer> it) throws Exception {
ArrayList list = new ArrayList();
while (it.hasNext()) {
Integer next = it.next();
list.add(new Tuple2<Integer, Integer>(next, next * 2));
}
return list.iterator();
}
});
tuple2JavaRDD.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override
public void call(Tuple2<Integer, Integer> tuple2) throws Exception {
System.out.println(tuple2);
}
});
}
}
Scala End
object partitions {
def main(args: Array[String]): Unit = {
val conf= new SparkConf().setMaster("local[2]").setAppName("partition")
val sc=new SparkContext(conf)
val rdd = sc.parallelize(List(1,2,3,4,5))
def mapPart(it:Iterator[Int]):Iterator[(Int,Int)]={
var list=List[(Int,Int)]()
while (it.hasNext) {
val i = it.next()
list=list.::(i,i*2)
}
list.iterator
}
val result = rdd.mapPartitions(mapPart)
result.foreach(x=>println(x))
}
}
hold (i,j) become (i,j*j) In the form of
java End :
JavaRDD<Tuple2<Integer, Integer>> rdd1 = sc.parallelize(Arrays.asList(
new Tuple2<Integer, Integer>(1, 2),
new Tuple2<Integer, Integer>(2, 2),
new Tuple2<Integer, Integer>(2, 3),
new Tuple2<Integer, Integer>(3, 4)
));
JavaPairRDD<Integer, Integer> partPair = JavaPairRDD.fromJavaRDD(rdd1);
JavaRDD<Tuple2<Integer, Integer>> result = partPair.mapPartitions(new FlatMapFunction<Iterator<Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Iterator<Tuple2<Integer, Integer>> tp2) throws Exception {
List<Tuple2<Integer, Integer>> list = new ArrayList<>();
while (tp2.hasNext()) {
Tuple2<Integer, Integer> next = tp2.next();
list.add(new Tuple2<Integer, Integer>(next._1, next._2 * next._2));
}
return list.iterator();
}
});
result.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override
public void call(Tuple2<Integer, Integer> tuple2) throws Exception {
System.out.println(tuple2);
}
});
scala End
val rdd2=sc.parallelize(List((1,2),(2,2),(2,3),(3,4)))
def mapPart2(it:Iterator[(Int,Int)]):Iterator[(Int,Int)]={
var list=List[(Int,Int)]()
while (it.hasNext) {
val tuple = it.next()
list = list.::(tuple._1,tuple._2*tuple._2)
}
list.iterator
}
val result2 = rdd2.mapPartitions(mapPart2)
result2.foreach(x=>println(x))
mapPartitionsWithIndex
Operate according to the partition , However, the parameter passed in here has an additional partition value . for example , Count the elements in each partition :
Scala End
val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8))
def mapPairIndex(i:Int,it:Iterator[Int]):Iterator[(Int,Int)]={
var list = List[(Int,Int)]()
while (it.hasNext) {
val next = it.next()
list=list.::(i,next)
}
list.iterator
}
val result = rdd.mapPartitionsWithIndex(mapPairIndex)
result.foreach(println)
result :
(1,8)
(0,4)
(0,3)
(0,2)
(0,1)
(1,7)
(1,6)
(1,5)
Java End
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
JavaRDD<Tuple2<Integer, Integer>> tuple2 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Tuple2<Integer, Integer>>>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Integer i, Iterator<Integer> it) throws Exception {
List<Tuple2<Integer, Integer>> list = new ArrayList<>();
while (it.hasNext()) {
Integer next = it.next();
//i Is the partition value ,next Is the element in each partition
list.add(new Tuple2<Integer, Integer>(i, next));
}
return list.iterator();
}
}, false);
tuple2.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override
public void call(Tuple2<Integer, Integer> tuple2) throws Exception {
System.out.println(tuple2);
}
});
result :
(1,5)
(0,1)
(0,2)
(0,3)
(0,4)
(1,6)
(1,7)
(1,8)
final false It refers to whether the partition is maintained . Choose here false. Take a look at the source code of this :
def mapPartitionsWithIndex[R](f :Function2[Integer, Iterator[T],
Iterator[R]],
preservesPartitioning : scala.Boolean =
{
/* compiled code */ }) : org.apache.spark.api.java.JavaRDD[R] =
{
/* compiled code */ }
Function2 The partition value and T Iterators for type elements , Output one more R Iterator of type , Finally, determine whether to partition .
Count the key value pair elements in each partition
scala edition
val rdd2 = sc.parallelize(List((1,2),(1,3),(2,3),(3,5),(4,5),(4,7)))
def mapPart2(i:Int,it:Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
var list = List[(Int,(Int,Int))]()
while (it.hasNext) {
var next = it.next()
list=list.::(i,next)
}
list.iterator
}
val result2 = rdd2.mapPartitionsWithIndex(mapPart2)
result2.foreach(x=>println(x))
result :
(0,(2,3))
(0,(1,3))
(0,(1,2))
(1,(4,7))
(1,(4,5))
(1,(3,5))
Java edition
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("partition2");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
JavaRDD<Tuple2<Integer, Integer>> tuple2 = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Tuple2<Integer, Integer>>>() {
@Override
public Iterator<Tuple2<Integer, Integer>> call(Integer i, Iterator<Integer> it) throws Exception {
List<Tuple2<Integer, Integer>> list = new ArrayList<>();
while (it.hasNext()) {
Integer next = it.next();
//i Is the partition value ,next Is the element in each partition
list.add(new Tuple2<Integer, Integer>(i, next));
}
return list.iterator();
}
}, false);
tuple2.foreach(new VoidFunction<Tuple2<Integer, Integer>>() {
@Override
public void call(Tuple2<Integer, Integer> tuple2) throws Exception {
System.out.println(tuple2);
}
});
Use glom Print each partition
scala edition
val result3 = rdd2.glom()
result3.foreach(x=>println("["+x.mkString(",")+"]"))
result
[(1,2),(1,3),(2,3)]
[(3,5),(4,5),(4,7)]
Java edition
JavaRDD<List<Integer>> glom = rdd.glom();
glom.foreach(new VoidFunction<List<Integer>>() {
@Override
public void call(List<Integer> integers) throws Exception {
System.out.println(integers);
}
});
result :
[5, 6, 7, 8]
[1, 2, 3, 4]
If you want to know more about , You can refer to this article further :
RDD Operator partition operation
边栏推荐
- Vs2017 project changed to VS2010
- ansible简单使用示例
- error: cannot open Packages database in /var/lib/rpm
- JVM-VisualVM:多合-故障处理工具
- linux开启MySQL binlog日志
- NewSQL数据库数据模型设计
- spark Json日志分析
- Domestic stereo audio frequency d/a converter dp4344 replaces compatible cs4344
- The luckiest geek in China: in his early twenties, he was worth more than 100 million, and retired after three startups
- How can ZABBIX customize MySQL monitoring items and trigger alarms
猜你喜欢
Oracle 11g 基于CentOS7安装并启动em
Initial experience of MariaDB spider sharding engine
ansible简单使用示例
Vs Code常用快捷键
warning: [mariadbmon] The current master server ‘srv-cls01-02‘ is no longer valid because it is in r
安装Mariadb columnStore(10.3版本)
Cv520 domestic card reader chip instead of ci520 contactless reader
zabbix怎样自定义mysql监控项并触发告警
Pyinstaller packaging scene
NewSQL数据库数据模型设计
随机推荐
动态内存管理及柔性数组
From data standards to database design: solve the last mile problem of basic data standards (Part 1)
Tensor和NumPy相互转换「建议收藏」
No code coverage driver is available
Vs Code common shortcut keys
SVN服务端与客户端安装(汉化包)以及简单使用
Mysql之一主多从复制
解决Couldn‘t determine repo type for URL
linux开启MySQL binlog日志
Vs2017 project changed to VS2010
NIO文件锁
MySQL converts milliseconds into time string
HTB- Armageddon
promise
Datablau5.0 data asset management product series heavy release
数据仓库模型设计与工具
Oracle uses an SQL to find out which data is not in a table
ES6 new features sharing (end)
Docker - 通过容器安装部署DB2数据库教程
MySQL创建分区表,并按天自动分区