博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
intersection &union&zip
阅读量:4982 次
发布时间:2019-06-12

本文共 7219 字,大约阅读时间需要 24 分钟。

&& 对于键值对类型的RDD,如果键是自定义类型(比如:Person),则需要重写其hashCode 和equals方法。

1、 intersection

底层用的是groupByKey;subtract底层用的是subtractByKey;

import java.net.MalformedURLException;import java.net.URL;import java.util.ArrayList;import java.util.List;import org.apache.spark.HashPartitioner;import org.apache.spark.Partitioner;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class IntersectionDemo {    public static void main(String[] xx){    	SparkConf conf = new SparkConf();    	conf.setMaster("local");    	conf.setAppName("WordCounter");    	conf.set("spark.testing.memory", "2147480000");    	JavaSparkContext ctx = new JavaSparkContext(conf);    	List
lines1 = new ArrayList
(); lines1.add("Hello"); lines1.add("How"); lines1.add("Moon"); // JavaRDD
rd1=ctx.parallelize(lines1); JavaPairRDD
rdd1 = ctx.parallelize(lines1, 2). mapToPair(new PairFunction
() { @Override public Tuple2
call(String s) throws Exception { return new Tuple2
(s, 1); } }).partitionBy(new HashPartitioner(3)); System.out.println("rdd1:" + rdd1.partitioner());// rdd1.foreach(x -> {// int index = x.hashCode() % 2;// System.out.println("当前数据:" + x + " 它的hashindex:" + index);// });// System.out.println(rdd1.glom().collect()); List
lines2 = new ArrayList
(); lines2.add("Hello"); lines2.add("How"); lines2.add("Good"); JavaRDD
rd2=ctx.parallelize(lines2); JavaPairRDD
rdd2 = ctx.parallelize(lines2, 2). mapToPair(new PairFunction
() { @Override public Tuple2
call(String s) throws Exception { return new Tuple2
(s, 1); } }).partitionBy(new HashPartitioner(2)); System.out.println("rdd2:" + rdd2.partitioner()); //底层是groupByKey 结合HashMap和hashset来使用 代码复用// JavaPairRDD
rdd3 = rdd1.intersection(rdd2); JavaPairRDD
rdd3 = rdd1.subtract(rdd2);// JavaPairRDD
rdd3 = rdd1.union(rdd2); System.out.println("rdd3:" + rdd3.partitioner()); System.out.println("rdd3:" + rdd3.getNumPartitions()); rdd3.foreach(x->System.out.println(x)); }}

 

2、 union

操作:父RDD分区对子RDD的影响

import java.net.MalformedURLException;import java.net.URL;import java.util.ArrayList;import java.util.List;import org.apache.spark.HashPartitioner;import org.apache.spark.Partitioner;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class UnionDemo {    public static void main(String[] xx){    	SparkConf conf = new SparkConf();    	conf.setMaster("local");    	conf.set("spark.testing.memory", "2147480000");    	conf.setAppName("WordCounter");    	JavaSparkContext ctx = new JavaSparkContext(conf);    	//创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合    	List
> urls = new ArrayList
>(); urls.add(new Tuple2
("http://www.baidu.com/about.html", 3)); urls.add(new Tuple2
("http://www.ali.com/index.html", 2)); urls.add(new Tuple2
("http://www.sina.com/first.html", 4)); urls.add(new Tuple2
("http://www.sohu.com/index.html", 3)); urls.add(new Tuple2
("http://www.baidu.com/index.jsp",7)); urls.add(new Tuple2
("http://www.sina.com/help.html",1)); JavaPairRDD
urlRdd1 = ctx.parallelizePairs(urls,2);// JavaPairRDD
urlRdd1 = ctx.parallelizePairs(urls).// partitionBy(new HashPartitioner(2)); System.out.println("urlRdd1:" + urlRdd1.partitioner()); System.out.println("urlRdd1:" + urlRdd1.glom().collect()); List
> anotherUrls = new ArrayList
>(); anotherUrls.add(new Tuple2
("http://www.163.com/about.html", 3)); anotherUrls.add(new Tuple2
("http://www.taobao.com/index.html", 2)); anotherUrls.add(new Tuple2
("http://www.sina.com/first.html", 4)); anotherUrls.add(new Tuple2
("http://www.csdn.com/index.html", 3)); anotherUrls.add(new Tuple2
("http://www.facebook.com/index.jsp",7)); anotherUrls.add(new Tuple2
("http://www.sina.com/help.html",1)); JavaPairRDD
urlRdd2 = ctx.parallelizePairs(anotherUrls,2);// JavaPairRDD
urlRdd2 = ctx.parallelizePairs(anotherUrls).// partitionBy(new HashPartitioner(3)); System.out.println("urlRdd2:" + urlRdd2.partitioner()); System.out.println("urlRdd2:" + urlRdd2.glom().collect()); //当设置了分区器和分区数相同,则union之后的分区是一样的 //若分区器没有设置,就算分区数相同,则union之后的分区是两分区之和 JavaPairRDD
rdd3 = urlRdd1.union(urlRdd2); System.out.println("rdd3:" + rdd3.partitioner()); System.out.println("rdd3:" + rdd3.getNumPartitions()); System.out.println("urlRdd3:" + rdd3.glom().collect()); rdd3.foreach(x->System.out.println(x)); }}

3、 zip操作 与 zipPartitions操作

(zip底层实现就是zipPartitions)

import java.net.MalformedURLException;import java.net.URL;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import org.apache.spark.Partitioner;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import scala.collection.Iterator;public class ZipDemo {    public static void main(String[] xx){    	SparkConf conf = new SparkConf();    	conf.setMaster("local");    	conf.set("spark.testing.memory", "2147480000");    	conf.setAppName("WordCounter");    	JavaSparkContext ctx = new JavaSparkContext(conf);    	List
lines1 = new ArrayList
(); lines1.add("Hello"); lines1.add("How"); lines1.add("Moon");// lines1.add("Hope");// lines1.add("Dog");// lines1.add("House"); JavaRDD
rdd1 = ctx.parallelize(lines1, 2); System.out.println(rdd1.glom().collect()); List
lines2 = new ArrayList
(); lines2.add("1"); lines2.add("2"); lines2.add("3"); JavaRDD
rdd2 = ctx.parallelize(lines2, 2); System.out.println(rdd2.glom().collect()); //使用zip必须数量和分区数相同,不然会报错 // JavaPairRDD
rdd3 = rdd1.zip(rdd2);// rdd3.foreach(x->System.out.println(x));// (Hello,1)// (How,2)// (Moon,3) JavaRDD
> rdd3 = rdd1.zipPartitions(rdd2, (x, y)-> { System.out.println("*****************"); List
> lines = new ArrayList
>();// List
list1 = new ArrayList
(); while(x.hasNext()){ // list1.add(x.next()); System.out.println(x.next()); }// List
list2 = new ArrayList
(); while(y.hasNext()){// list2.add(y.next()); System.out.println(y.next()); } return lines.iterator(); }); rdd3.foreach(x->System.out.println(x)); // *****************// Hello// 1// *****************// How// Moon// 2// 3 // JavaRDD
rdd3 = rdd1.zipPartitions(rdd2,// new FlatMapFunction2
,// Iterator
,// Iterator
>(){//// @Override// public java.util.Iterator
> call(// Iterator
x, Iterator
y)// throws Exception {// return null;// }// // });// System.out.println(rdd3.collect());// rdd3.foreach(x->System.out.println(x)); }}

 

 

 

 

转载于:https://www.cnblogs.com/apppointint/p/8885283.html

你可能感兴趣的文章
【转】C#生成验证码
查看>>
Linux环境下JDK/Eclipse一键安装脚本
查看>>
HwUI,CMS管理系统模板,漂亮,简单,兼容好
查看>>
特意给我的轩写的小知识
查看>>
LibreOJ #2003. 「SDOI2017」新生舞会
查看>>
sublime text there are no packages available for installation 解决办法
查看>>
Piston Pump Manufacturers - Mobile Cartridge Piston Pump: Advantages
查看>>
我喜欢的几款不错的vim插件
查看>>
eclipse在ubuntu13.04下崩溃crash
查看>>
wpf 右键ListBox可编辑
查看>>
hihocoder offer收割编程练习赛11 C 岛屿3
查看>>
maven+springmvc项目启动时,request mapping not found……
查看>>
提高JetBrains软件的性能
查看>>
ASP.NET:MVC中文件上传与地址变化处理
查看>>
Python 单向链表、双向链表
查看>>
Arrays, Hashtables and Dictionaries
查看>>
JAVA1种C++3种继承方式
查看>>
C#中DataTable排序
查看>>
架构学习提炼笔记(二):架构设计的流程是什么?
查看>>
hive常见问题解决干货大全
查看>>