当前位置:网站首页>Spark RDD算子:RDD分区,HashPartitioner、RangePartitioner、自定义分区
Spark RDD算子:RDD分区,HashPartitioner、RangePartitioner、自定义分区
2022-07-22 10:09:00 【小五家的二哈】
HashPartitioner
设定分区的数量。
Scala版
import org.apache.spark.rdd.RDD
import org.apache.spark.{
HashPartitioner, SparkConf, SparkContext}
object HashPartitionScala {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[2]").setAppName("HashPartScala")
val sc = new SparkContext(conf)
//构建一个查看分区的方法
def mapPartIndex(i:Int,it:Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
var list = List[(Int,(Int,Int))]()
while (it.hasNext) {
val next = it.next()
list=list.::(i,next)
}
list.iterator
}
def parRdd(rdd:RDD[(Int,Int)])={
val value = rdd.mapPartitionsWithIndex(mapPartIndex)
value.foreach(println)
}
val rdd = sc.parallelize(List((1,1),(1,2),(2,3),(2,4),(3,1)))
//分区数设为3
val partition = rdd.partitionBy(new HashPartitioner(3))
parRdd(rdd)
}
}
Java版
package Action;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class HashPartition {
//定义一个打印分区的方法
private static void printPartRdd(JavaPairRDD<Integer, Integer> pairRDD) {
JavaRDD<Tuple2<Integer, Tuple2<Integer, Integer>>> tuple2JavaRDD = pairRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>, Iterator<Tuple2<Integer, Tuple2<Integer, Integer>>>>() {
@Override
public Iterator<Tuple2<Integer, Tuple2<Integer, Integer>>> call(Integer i, Iterator<Tuple2<Integer, Integer>> tuple2) throws Exception {
List<Tuple2<Integer, Tuple2<Integer, Integer>>> list = new ArrayList<>();
while (tuple2.hasNext()) {
Tuple2<Integer, Integer> next = tuple2.next();
list.add(new Tuple2<Integer, Tuple2<Integer, Integer>>(i, next));
}
return list.iterator();
}
}, false);
tuple2JavaRDD.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Integer, Integer>>>() {
@Override
public void call(Tuple2<Integer, Tuple2<Integer, Integer>> itTp2) throws Exception {
System.out.println(itTp2);
}
});
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("HashPartition");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(
new Tuple2<Integer, Integer>(1, 2),
new Tuple2<Integer, Integer>(1, 3),
new Tuple2<Integer, Integer>(2, 3),
new Tuple2<Integer, Integer>(2, 4),
new Tuple2<Integer, Integer>(3, 1),
new Tuple2<Integer, Integer>(4, 2)
));
JavaPairRDD<Integer, Integer> rdd2 = JavaPairRDD.fromJavaRDD(rdd);
JavaPairRDD<Integer, Integer> HashPart = rdd2.partitionBy(new HashPartitioner(3));
printPartRdd(HashPart);
}
}
RangePartitioner
范围分区。将范围内的键分配给相应的分区。
scala版
val conf=new SparkConf().setMaster("local[2]").setAppName("HashPartScala")
val sc = new SparkContext(conf)
def mapPartIndex(i:Int,it:Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
var list = List[(Int,(Int,Int))]()
while (it.hasNext) {
val next = it.next()
list=list.::(i,next)
}
list.iterator
}
def parRdd(rdd:RDD[(Int,Int)])={
val value = rdd.mapPartitionsWithIndex(mapPartIndex)
value.foreach(println)
}
val rdd = sc.parallelize(List((1,1),(1,2),(4,3),(2,4),(3,1)))
val partition = rdd.partitionBy(new RangePartitioner(6,rdd))
parRdd(partition)
}
自定义分区
实现自定义分区需要继承Partitioner类,并实现相应的方法。
例如:
Scala端
注意包不要导错,否则无法完成重写方法。
import org.apache.spark.rdd.RDD
import org.apache.spark.{
Partitioner, SparkConf, SparkContext}
object UDP {
//自定义规则,继承Partitioner
class DefinedPartition(n:Int) extends Partitioner {
override def numPartitions: Int = n
override def getPartition(key: Any): Int = {
if (key.toString.toInt<=2) {
0
}else if (key.toString.toInt>2&&key.toString.toInt<4) {
1
}else{
2
}
}
}
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local[2]").setAppName("UDPScala")
val sc = new SparkContext(conf)
def mapPartIndex(i:Int,it:Iterator[(Int,Int)]):Iterator[(Int,(Int,Int))]={
var list = List[(Int,(Int,Int))]()
while (it.hasNext) {
val next = it.next()
list=list.::(i,next)
}
list.iterator
}
def parRdd(rdd:RDD[(Int,Int)])={
val value = rdd.mapPartitionsWithIndex(mapPartIndex)
value.foreach(println)
}
val rdd = sc.parallelize(List((1,1),(1,2),(4,3),(2,4),(3,1)))
val result = rdd.partitionBy(new DefinedPartition(3))
parRdd(result)
}
}
java端
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
public class UDPJava extends Partitioner {
int i;
public UDPJava(int i) {
this.i = i;
}
public UDPJava() {
}
@Override
public int numPartitions() {
return i;
}
@Override
public int getPartition(Object key) {
int keyCode = Integer.parseInt(key.toString());
if (keyCode <= 2) {
return 0;
} else if (keyCode < 4) {
return 1;
} else {
return 2;
}
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("HashPartition");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tuple2<Integer, Integer>> rdd = sc.parallelize(Arrays.asList(
new Tuple2<Integer, Integer>(1, 2),
new Tuple2<Integer, Integer>(1, 3),
new Tuple2<Integer, Integer>(2, 3),
new Tuple2<Integer, Integer>(2, 4),
new Tuple2<Integer, Integer>(3, 1),
new Tuple2<Integer, Integer>(4, 2)
));
JavaPairRDD<Integer, Integer> rdd2 = JavaPairRDD.fromJavaRDD(rdd);
JavaPairRDD<Integer, Integer> DefinePart = rdd2.partitionBy(new UDPJava(3));
//继承前面写的打印分区的方法
HashPartition print = new HashPartition();
print.printPartRdd(DefinePart);
}
}
边栏推荐
- The luckiest geek in China: in his early twenties, he was worth more than 100 million, and retired after three startups
- 安装mariadb 10.5.7(tar包安装)
- Si14t touch key chip is compatible with tms12
- Use of JVM Jinfo
- WPF TextBox限制只能输入数字的两种方法
- 你经常遇到的IO模型
- Shell counts all strings before the last occurrence of a string
- Use of JVM jmap (memory mapping tool)
- 数据仓库模型设计与工具
- From data standards to database design: solve the last mile problem of basic data standards (Part 1)
猜你喜欢
随机推荐
派生類的構造函數和析構函數
Neo4j example: figure relationship map of the annals of the Three Kingdoms
MySQL创建分区表,并按天自动分区
oracle用一条sql查出哪些数据不在某个表里
Solve couldn't determine repo type for URL
Initial experience of MariaDB spider sharding engine
08.01 邻接矩阵
C WinForm DataGridView column full width
记一次GCEasy的垃圾收集分析
Datablau5.0 data asset management product series heavy release
NiO character set and charset
WPF TextBox限制只能输入数字的两种方法
How can ZABBIX customize MySQL monitoring items and trigger alarms
warning: [mariadbmon] The current master server ‘srv-cls01-02‘ is no longer valid because it is in r
Static member
从数据标准到数据库设计:解决基础数据标准落地的最后一公里难题(下)
mysql数据库
ES6新特性分享(四)
Implementation of bytecode technology in dynamic proxy
The luckiest geek in China: in his early twenties, he was worth more than 100 million, and retired after three startups