当前位置:网站首页>Spark RDD operator: RDD partition, hashpartition, rangepartition, custom partition
Spark RDD operator: RDD partition, hashpartition, rangepartition, custom partition
2022-07-22 19:54:00 【Erha of Xiaowu family】
HashPartitioner
Set the number of partitions .
Scala edition
import org.apache.spark.rdd.RDD
import org.apache.spark.{
HashPartitioner, SparkConf, SparkContext}
object HashPartitionScala {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[2]").setAppName("HashPartScala")
val sc = new SparkContext(conf)
// Build a way to view partitions
def mapPartIndex(i:Int,it:Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
var list = List[(Int,(Int,Int))]()
while (it.hasNext) {
val next = it.next()
list=list.::(i,next)
}
list.iterator
}
def parRdd(rdd:RDD[(Int,Int)])={
val value = rdd.mapPartitionsWithIndex(mapPartIndex)
value.foreach(println)
}
val rdd = sc.parallelize(List((1,1),(1,2),(2,3),(2,4),(3,1)))
// The number of partitions is set to 3
val partition = rdd.partitionBy(new HashPartitioner(3))
parRdd(rdd)
}
}
Java edition
package Action;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class HashPartition {
// Define a method for printing partitions
private static void printPartRdd(JavaPairRDD<Integer, Integer> pairRDD) {
JavaRDD<Tuple2<Integer, Tuple2<Integer, Integer>>> tuple2JavaRDD = pairRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>, Iterator<Tuple2<Integer, Tuple2<Integer, Integer>>>>() {
@Override
public Iterator<Tuple2<Integer, Tuple2<Integer, Integer>>> call(Integer i, Iterator<Tuple2<Integer, Integer>> tuple2) throws Exception {
List<Tuple2<Integer, Tuple2<Integer, Integer>>> list = new ArrayList<>();
while (tuple2.hasNext()) {
Tuple2<Integer, Integer> next = tuple2.next();
list.add(new Tuple2<Integer, Tuple2<Integer, Integer>>(i, next));
}
return list.iterator();
}
}, false);
tuple2JavaRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Integer, Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<Integer, Integer>> itTp2) throws Exception {
System.out.println(itTp2);
}
});
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("HashPartition");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(
new Tuple2<Integer, Integer>(1, 2),
new Tuple2<Integer, Integer>(1, 3),
new Tuple2<Integer, Integer>(2, 3),
new Tuple2<Integer, Integer>(2, 4),
new Tuple2<Integer, Integer>(3, 1),
new Tuple2<Integer, Integer>(4, 2)
));
JavaPairRDD<Integer, Integer> rdd2 = JavaPairRDD.fromJavaRDD(rdd);
JavaPairRDD<Integer, Integer> HashPart = rdd2.partitionBy(new HashPartitioner(3));
printPartRdd(HashPart);
}
}
RangePartitioner
Range partitioning . Assign the keys in the range to the corresponding partition .
scala edition
val conf=new SparkConf().setMaster("local[2]").setAppName("HashPartScala")
val sc = new SparkContext(conf)
def mapPartIndex(i:Int,it:Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
var list = List[(Int,(Int,Int))]()
while (it.hasNext) {
val next = it.next()
list=list.::(i,next)
}
list.iterator
}
def parRdd(rdd:RDD[(Int,Int)])={
val value = rdd.mapPartitionsWithIndex(mapPartIndex)
value.foreach(println)
}
val rdd = sc.parallelize(List((1,1),(1,2),(4,3),(2,4),(3,1)))
val partition = rdd.partitionBy(new RangePartitioner(6,rdd))
parRdd(partition)
}
Custom partition
Implementing custom partitions requires inheritance Partitioner class , And implement the corresponding method .
for example :
Scala End
Be careful not to lead the wrong package , Otherwise, the rewriting method cannot be completed .
import org.apache.spark.rdd.RDD
import org.apache.spark.{
Partitioner, SparkConf, SparkContext}
object UDP {
// Custom rule , Inherit Partitioner
class DefinedPartition(n:Int) extends Partitioner {
override def numPartitions: Int = n
override def getPartition(key: Any): Int = {
if (key.toString.toInt<=2) {
0
}else if (key.toString.toInt>2&&key.toString.toInt<4) {
1
}else{
2
}
}
}
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[2]").setAppName("UDPScala")
val sc = new SparkContext(conf)
def mapPartIndex(i:Int,it:Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
var list = List[(Int,(Int,Int))]()
while (it.hasNext) {
val next = it.next()
list=list.::(i,next)
}
list.iterator
}
def parRdd(rdd:RDD[(Int,Int)])={
val value = rdd.mapPartitionsWithIndex(mapPartIndex)
value.foreach(println)
}
val rdd = sc.parallelize(List((1,1),(1,2),(4,3),(2,4),(3,1)))
val result = rdd.partitionBy(new DefinedPartition(3))
parRdd(result)
}
}
java End
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class UDPJava extends Partitioner {
int i;
public UDPJava(int i) {
this.i = i;
}
public UDPJava() {
}
@Override
public int numPartitions() {
return i;
}
@Override
public int getPartition(Object key) {
int keyCode = Integer.parseInt(key.toString());
if (keyCode <= 2) {
return 0;
} else if (keyCode < 4) {
return 1;
} else {
return 2;
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("HashPartition");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(
new Tuple2<Integer, Integer>(1, 2),
new Tuple2<Integer, Integer>(1, 3),
new Tuple2<Integer, Integer>(2, 3),
new Tuple2<Integer, Integer>(2, 4),
new Tuple2<Integer, Integer>(3, 1),
new Tuple2<Integer, Integer>(4, 2)
));
JavaPairRDD<Integer, Integer> rdd2 = JavaPairRDD.fromJavaRDD(rdd);
JavaPairRDD<Integer, Integer> DefinePart = rdd2.partitionBy(new UDPJava(3));
// Inherit the method of printing partition written above
HashPartition print = new HashPartition();
print.printPartRdd(DefinePart);
}
}
边栏推荐
- Modify the contents of /etc/crontab file directly, and the scheduled task will not take effect
- Implementation of bytecode technology in dynamic proxy
- Compared with technology and products, listening and opinions are the top priority of Web3 at present
- Common performance tools: if you want to be good at something, you must first sharpen its tools
- pyinstaller打包scrapy
- Pure function and higher order function
- warning: [mariadbmon] The current master server ‘srv-cls01-02‘ is no longer valid because it is in r
- Niofiles tool class
- Si14t touch key chip is compatible with tms12
- ES6 new features sharing (III)
猜你喜欢
Initial experience of MariaDB spider sharding engine
SVN服务端与客户端安装(汉化包)以及简单使用
Docker - 通过容器安装部署DB2数据库教程
Spark:图(Graph)
Datablau5.0数据资产管理产品系列重磅发布
分享一下Typora工具
Oracle uses an SQL to find out which data is not in a table
Neo4j example: figure relationship map of the annals of the Three Kingdoms
Cv520 domestic card reader chip instead of ci520 contactless reader
Dp4361 domestic six channel stereo d/a audio converter chip replaces cs4361
随机推荐
shell script “<< EOF”我的用途和遇到的问题
NiO file lock
IO models you often encounter
直接修改/etc/crontab 文件内容,定时任务不生效
ES6新特性分享(三)
oracle用一条sql查出哪些数据不在某个表里
栈题目:基本计算器 II
Common performance tools: if you want to be good at something, you must first sharpen its tools
How does ZABBIX create a template that only monitors the status of a port
How mysql/mariadb generates core files
Data model design of newsql database
mysql/mariadb怎样生成core文件
Understanding and analysis of modules and components, modularity and componentization
oracle怎样将字符串转为多行
MySql使用mysqldump 导入与导出方法总结
Tensor和NumPy相互转换「建议收藏」
Niobuffer (buffer)
数据湖(十八):Flink与Iceberg整合SQL API操作
安装Mariadb columnStore(10.3版本)
Vs Code common shortcut keys