&& 对于键值对类型的RDD,如果键是自定义类型(比如:Person),则需要重写其hashCode 和equals方法。
1、 intersection
底层用的是groupByKey;subtract底层用的是subtractByKey;
import java.net.MalformedURLException;import java.net.URL;import java.util.ArrayList;import java.util.List;import org.apache.spark.HashPartitioner;import org.apache.spark.Partitioner;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class IntersectionDemo { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("WordCounter"); conf.set("spark.testing.memory", "2147480000"); JavaSparkContext ctx = new JavaSparkContext(conf); Listlines1 = new ArrayList (); lines1.add("Hello"); lines1.add("How"); lines1.add("Moon"); // JavaRDD rd1=ctx.parallelize(lines1); JavaPairRDD rdd1 = ctx.parallelize(lines1, 2). mapToPair(new PairFunction () { @Override public Tuple2 call(String s) throws Exception { return new Tuple2 (s, 1); } }).partitionBy(new HashPartitioner(3)); System.out.println("rdd1:" + rdd1.partitioner());// rdd1.foreach(x -> {// int index = x.hashCode() % 2;// System.out.println("当前数据:" + x + " 它的hashindex:" + index);// });// System.out.println(rdd1.glom().collect()); List lines2 = new ArrayList (); lines2.add("Hello"); lines2.add("How"); lines2.add("Good"); JavaRDD rd2=ctx.parallelize(lines2); JavaPairRDD rdd2 = ctx.parallelize(lines2, 2). mapToPair(new PairFunction () { @Override public Tuple2 call(String s) throws Exception { return new Tuple2 (s, 1); } }).partitionBy(new HashPartitioner(2)); System.out.println("rdd2:" + rdd2.partitioner()); //底层是groupByKey 结合HashMap和hashset来使用 代码复用// JavaPairRDD rdd3 = rdd1.intersection(rdd2); JavaPairRDD rdd3 = rdd1.subtract(rdd2);// JavaPairRDD rdd3 = rdd1.union(rdd2); System.out.println("rdd3:" + rdd3.partitioner()); System.out.println("rdd3:" + rdd3.getNumPartitions()); rdd3.foreach(x->System.out.println(x)); }}
2、 union
操作:父RDD分区对子RDD的影响
import java.net.MalformedURLException;import java.net.URL;import java.util.ArrayList;import java.util.List;import org.apache.spark.HashPartitioner;import org.apache.spark.Partitioner;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;public class UnionDemo { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.set("spark.testing.memory", "2147480000"); conf.setAppName("WordCounter"); JavaSparkContext ctx = new JavaSparkContext(conf); //创建RDD:1)通过读取外部存储 ----- 集群环境使用 2)通过内存中的集合 List> urls = new ArrayList >(); urls.add(new Tuple2 ("http://www.baidu.com/about.html", 3)); urls.add(new Tuple2 ("http://www.ali.com/index.html", 2)); urls.add(new Tuple2 ("http://www.sina.com/first.html", 4)); urls.add(new Tuple2 ("http://www.sohu.com/index.html", 3)); urls.add(new Tuple2 ("http://www.baidu.com/index.jsp",7)); urls.add(new Tuple2 ("http://www.sina.com/help.html",1)); JavaPairRDD urlRdd1 = ctx.parallelizePairs(urls,2);// JavaPairRDD urlRdd1 = ctx.parallelizePairs(urls).// partitionBy(new HashPartitioner(2)); System.out.println("urlRdd1:" + urlRdd1.partitioner()); System.out.println("urlRdd1:" + urlRdd1.glom().collect()); List > anotherUrls = new ArrayList >(); anotherUrls.add(new Tuple2 ("http://www.163.com/about.html", 3)); anotherUrls.add(new Tuple2 ("http://www.taobao.com/index.html", 2)); anotherUrls.add(new Tuple2 ("http://www.sina.com/first.html", 4)); anotherUrls.add(new Tuple2 ("http://www.csdn.com/index.html", 3)); anotherUrls.add(new Tuple2 ("http://www.facebook.com/index.jsp",7)); anotherUrls.add(new Tuple2 ("http://www.sina.com/help.html",1)); JavaPairRDD urlRdd2 = ctx.parallelizePairs(anotherUrls,2);// JavaPairRDD urlRdd2 = ctx.parallelizePairs(anotherUrls).// partitionBy(new HashPartitioner(3)); System.out.println("urlRdd2:" + urlRdd2.partitioner()); System.out.println("urlRdd2:" + urlRdd2.glom().collect()); //当设置了分区器和分区数相同,则union之后的分区是一样的 //若分区器没有设置,就算分区数相同,则union之后的分区是两分区之和 JavaPairRDD rdd3 = urlRdd1.union(urlRdd2); System.out.println("rdd3:" + rdd3.partitioner()); System.out.println("rdd3:" + rdd3.getNumPartitions()); System.out.println("urlRdd3:" + rdd3.glom().collect()); rdd3.foreach(x->System.out.println(x)); }}
3、 zip操作 与 zipPartitions操作
(zip底层实现就是zipPartitions)
import java.net.MalformedURLException;import java.net.URL;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import org.apache.spark.Partitioner;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import scala.collection.Iterator;public class ZipDemo { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.set("spark.testing.memory", "2147480000"); conf.setAppName("WordCounter"); JavaSparkContext ctx = new JavaSparkContext(conf); Listlines1 = new ArrayList (); lines1.add("Hello"); lines1.add("How"); lines1.add("Moon");// lines1.add("Hope");// lines1.add("Dog");// lines1.add("House"); JavaRDD rdd1 = ctx.parallelize(lines1, 2); System.out.println(rdd1.glom().collect()); List lines2 = new ArrayList (); lines2.add("1"); lines2.add("2"); lines2.add("3"); JavaRDD rdd2 = ctx.parallelize(lines2, 2); System.out.println(rdd2.glom().collect()); //使用zip必须数量和分区数相同,不然会报错 // JavaPairRDD rdd3 = rdd1.zip(rdd2);// rdd3.foreach(x->System.out.println(x));// (Hello,1)// (How,2)// (Moon,3) JavaRDD > rdd3 = rdd1.zipPartitions(rdd2, (x, y)-> { System.out.println("*****************"); List > lines = new ArrayList >();// List list1 = new ArrayList (); while(x.hasNext()){ // list1.add(x.next()); System.out.println(x.next()); }// List list2 = new ArrayList (); while(y.hasNext()){// list2.add(y.next()); System.out.println(y.next()); } return lines.iterator(); }); rdd3.foreach(x->System.out.println(x)); // *****************// Hello// 1// *****************// How// Moon// 2// 3 // JavaRDD rdd3 = rdd1.zipPartitions(rdd2,// new FlatMapFunction2 ,// Iterator ,// Iterator >(){//// @Override// public java.util.Iterator > call(// Iterator x, Iterator y)// throws Exception {// return null;// }// // });// System.out.println(rdd3.collect());// rdd3.foreach(x->System.out.println(x)); }}