spark 安裝入門(一)scala spark單詞統計 ; java spark單詞統計spark反轉排序

spark 安裝入門

這篇博客 可以讓你學習到 三點 知識:
1.熟悉spark的相關概念。
2.搭建一個spark集羣。
3.編寫簡單的spark應用程序。

spark是一個針對於大規模數據處理的統一分析引擎。

為什麼要學spark?
一句話:spark處理速度比mapreduce快很多。
具體快的原因:
Spark是一個開源的類似於Hadoop MapReduce的通用的並行計算框架,Spark基於map reduce算法實現的分佈式計算,擁有Hadoop MapReduce所具有的優點;但不同於MapReduce的是Spark中的Job中間輸出和結果可以保存在內存中,從而不再需要讀寫HDFS,因此Spark能更好地適用於數據挖掘與機器學習等需要迭代的map reduce的算法。
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足

spark 的四大特性:
1.速度快:
spark比mapreduce在內存中快100x,比mapreduce在磁盤中快10x

  • spark比mapreduce快的主要2個原因

    (1)sparkjob中間結果數據可以保存在內存中mapreduce的job中間結果數據只能夠保存在磁盤。這樣依賴,後續又有其他的job需要依賴於前面job的輸出結果,對於spark來説,直接可以從內存獲取得到,大大減少磁盤io操作,對於mapreduce來説就需要進行大量磁盤io操作,性能來説肯定是降低了。
    select name,age from (select * from user where age >30)

    (2) mapreduce以進程的方式運行在整合yarn中,比如一個job有100個mapTask,這個時候在運行這100個map task就需要啓動100個進程spark以線程的方式運行的進程中。這個在運行這個100個map task可以只啓動1個進程,在一個進程中運行100個線程。啓動一個進程和啓動一個線程時間代價肯定不一樣,啓動進程需要的時間和調度大大增加。
    2.易用性
    可以快速寫一個spark應用程序通過 java/scala/python/R/SQL不同的語言去進行代碼開發
    3.通用性
    spark框架是一個生態系統,可以通過有很多不同的模(sparksqlsparkStreaming、Mlib、Graphx)應用到不同的業務場景中。
    4.兼容性
    spark程序就是一個計算任務的程序,哪裏可以給當前這個任務提供對應的資源,我們就可以把這個任務提交到哪裏去運行。

    • standAlone
      • spark自帶的集羣模式,整個任務的資源分配由Master負責。
    • yarn
      • spark可以把任務提交到yarn中去運行,整個任務的資源分配由resourceManager負責
    • mesos
      • 是一個apache開源的類似於yarn的資源管理平台

spark 官網:http://spark.apache.org

在這裏插入圖片描述
一 spark 安裝 開始:
上傳 spark 包到linux
在這裏插入圖片描述
1.安裝目錄:/export/servers
2.解壓安裝包到指定的規劃目錄:tar -zxvf spark-2.1.3-bin-hadoop2.7.tgz -C /export/servers
3.重命名解壓目錄:mv spark-2.1.3-bin-hadoop2.7 spark
4.修改配置文件:進入到spark安裝目錄下conf文件夾: vim spark-env.sh (mv spark-env.sh.template spark-env.sh)【此處到時加zookepper配置 實現集羣的高可用 考慮主機宕機情況 zk有選舉機制 主機宕機和選舉其他機器作為主機 過程可能要1-2分鐘】

#指定java環境變量
export JAVA_HOME=/export/servers/jdk
#指定spark集羣中老大地址
export SPARK_MASTER_HOST=node1
#指定spark集羣中老大端口
export SPARK_MASTER_PORT=7077
JAVA_HOME=/usr/jdk

  1. vim slaves (mv slaves.template slaves)
    指定哪些節點是worker
    node2
    node3
    6.添加spark的環境變量
    vim /etc/profile
    export SPARK_HOME=/export/servers/spark
    export PATH= P A T H : PATH: PATH:SPARK_HOME/bin:$SPARK_HOME/sbin
    7.分發spark的安裝目錄和spark變量
    scp -r spark note2:/export/servers
    scp -r spark note3:/export/servers

scp /etc/profile note2:/etc
scp /etc/profile note3:/etc

7讓所有節點的spark環境變量生效:source /etc/profile

8.啓動spark 集羣 :sbin 下 : start-all.sh
9.停止spark集羣:sbin 下 :stop-all.sh
10.啓動好spark集羣之後可以訪問地址: master 主機ip +8080端口

可以看到整個spark集羣相關信息,包括spark集羣健康狀態信息、spark集羣整個資源信息、spark集羣已經使用的資源信息、spark集羣還剩的資源信息、整個任務運行的相關信息、已經完成的任務相關信息。
Standby:休眠狀態。

在這裏插入圖片描述

考慮到主機有宕機的可能性,我們需要用到zookeeper 【因為zookeeper 有自己的選舉機制,當主機宕機 其他機器發現主機宕機 就會在其他機器上投票選舉主機,也就是出於standby:休眠狀態的機子 可能被選為主機。】

其基本原理是通過zookeeper來選舉一個Master,其他的Master處於Standby狀態
下面我們就來通過 zookeeper來實現 spark 集羣的高可用(HA).

HA方案 用起來非常簡單,
1.首先需要搭建一個 zookeeper 集羣,
2.然後啓動zookeeper集羣,
3.最後在不同的節點上啓動Master.

具體配置如下:
做配置之前 我們先改一下 上面 spark 集羣的配置,spark-env.sh中開始我們是 手動指定master 所在主機為 note1( export SPARK_MASTER_HOST=node1) 現在使用zookeeper 來實現投票選舉機制【防止 手動指定的note1 宕機後 沒有機器頂上】。
配置為:
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=note1:2181,note2:2181,note3:2181 -Dspark.deploy.zookeeper.dir=/spark"

參數説明:
1.spark.deploy.recoveryMode恢復模式(Master重新啓動的模式)
有三種:(1)ZooKeeper (2) FileSystem (3)NONE
2.spark.deploy.zookeeper.url:ZooKeeper的Server地址
3.spark.deploy.zookeeper.dir:保存集羣元數據信息的文件、目錄。
包括Worker,Driver和Application。【保存元數據的信心 位置路徑

啓動:
1.在普通模式下啓動spark集羣,只需要在主機上面執行start-all.sh 就可以了。
2.在高可用模式下啓動spark集羣,先需要在任意一台節點上啓動start-all.sh命令。然後在另外一台節點上單獨啓動master。命令start-master.sh

搭建zookeeper 集羣開始:

  # 清理掉以往安裝記錄 在每台機器上都需要執行
  rm -rf /export/servers/zk349/ && rm -rf /export/data/zk/ && rm -rf /export/logs/zk
  # 在note1上解壓zookeeper
  tar -zxvf zookeeper-3.4.9.tar.gz -C /export/servers/
  # 對安裝目錄進行修改
  cd /export/servers/
  mv zookeeper-3.4.9/ zk
  # 修改配置文件
  cd /export/servers/zk/conf
  mv zoo_sample.cfg  zoo.cfg
  cat zoo.cfg  |grep -v "#"
  # 得到以下結果
  ---
  tickTime=2000
  initLimit=10
  syncLimit=5
  dataDir=/export/data/zk
  logDataDir=/export/logs/zk
  clientPort=2181
  ---
  # 新增服務器節點信息,參見完整配置文件。
  ---
  server.1=note1:2888:3888
  server.2=note2:2888:3888
  server.3=note3:2888:3888
  ---
  # 分發到其他機器上
  scp -r /export/servers/zk/ note2:/export/servers/
  scp -r /export/servers/zk/ note3:/export/servers/
  
  # 在每台機器上創建一個myid文件,並寫入編號
  # note1 上執行
  mkdir -p /export/data/zk
  touch /export/data/zk/myid
  echo "1" > /export/data/zk/myid
  
  # note2上執行
  mkdir -p /export/data/zk
  touch /export/data/zk/myid
  echo "2" > /export/data/zk/myid
  
  # note3上執行
  mkdir -p /export/data/zk
  touch /export/data/zk/myid
  echo "3" > /export/data/zk/myid
  
  # 啓動服務,在每台機器上都執行一遍
  cd /export/servers/zk/bin/
  ./zkServer.sh start
  # 啓動完之後確定狀態
  ./zkServer.sh status

HA(高可用) :通過zookeeper 實現spark 集羣的高可用 啓動步驟:
1. 在三台機器上 cd /export/servers/zk/bin 執行 ./zkServer.sh start 【也可以寫個zookeeper 一鍵啓動腳本】
2. 在note1上 cd/export/servers/spark/sbin 執行 ./start-all.sh 【注意:不加點槓 我的裏面啓動的是yarn 集羣 不知道為什麼。】
3. 我們在其他一台機器上(note2)上 cd /export/servers/spark/sbin 執行 ./start-master.sh 【就是啓動第二台 master 作為備用 防止 note1 master 掛了note2頂上】
4. 驗證HA :為了驗證 note1 master 掛了,note2會頂上 kill -9 (note1 master 進程號) ,然後我們訪問網頁 note2:8080 【注意 需要一點時間 note2 的狀態才會被激活 狀態 過程: standby --recoveing—alive】
5. 補充 :在啓動完123後 我們可以在note1上 cd /export/servers/zk/bin 執行 ./zkCli.sh 輸入 :ls / 【查看 啓動的節點】 如圖
在這裏插入圖片描述
圖中的spark 節點 是我們在 spark-env.sh 中 配置的Dspark.deploy.zookeeper.dir=/spark"

二 spark 的角色
在這裏插入圖片描述
Spark是基於內存計算的大數據並行計算框架。因為其基於內存計算,比Hadoop中MapReduce計算框架具有更高的實時性,同時保證了高效容錯性和可伸縮性。
Spark架構使用了分佈式計算中master-slave模型,master是集羣中含有master進程的節點,slave是集羣中含有worker進程的節點。
1.Driver Program :運⾏main函數並且新建SparkContext的程序。
2.Application:基於Spark的應用程序,包含了driver程序和集羣上的executor。
3.Cluster Manager:指的是在集羣上獲取資源的外部服務。目前有三種類型
(1)Standalone: spark原生的資源管理,由Master負責資源的分配
(2)Apache Mesos:與hadoop MR兼容性良好的一種資源調度框架
(3)Hadoop Yarn: 主要是指Yarn中的ResourceManager
4.Worker Node: 集羣中任何可以運行Application代碼的節點,在Standalone模式中指的是通過slaves文件配置的Worker節點,在Spark on Yarn模式下就是NodeManager節點
5.Executor:是在一個worker node上為某應用啓動的⼀個進程,該進程負責運行任務,並且負責將數據存在內存或者磁盤上。每個應用都有各自獨立的executor。
6.Task :被送到某個executor上的工作單元。

三 spark 小程序
1.普通模式提交任務:

需求:該算法是利用蒙特·卡羅算法求圓周率PI,通過計算機模擬大量的隨機數,最終會計算出比較精確的π。
到spark安裝目錄下(cd /export/servers/spark) 執行:

bin/spark-submit
–class org.apache.spark.examples.SparkPi
–master spark://note1:7077
–executor-memory 1G
–total-executor-cores 2
examples/jars/spark-examples_2.11-2.1.3.jar
10

結果報錯:原因 1.note1 寫錯 2.spark-examples_2.11-2.1.3.jar 這個jar 有問題
成功結果如下:
在這裏插入圖片描述

1.2.高可用模式提交:不同點就是指定多個master ,因為在高可用模式下,涉及到多個Master,所以對於應用程序的提交就有了一點變化。【我們只需要在SparkContext指向一個Master列表就可以了 】【執行下面之前:在note1上 ./start-all.sh note2 上 ./start-master.sh 在note3上 ./start-master.sh】

bin/spark-submit
–class org.apache.spark.examples.SparkPi
–master spark://note1:7077,note2:7077,note3:7077
–executor-memory 1G
–total-executor-cores 2
examples/jars/spark-examples_2.11-2.1.3.jar
10

在這裏插入圖片描述

2.啓動 spark-shell【注意 spark-shell 跟spark 集羣沒有任何關係 不啓動spark集羣也沒關係 但是 在 linux 系統需要安裝 scala 因為運行scala 就是 相當於執行scala腳本,scala 安裝簡單 上傳 scala 2.11.8 解壓 配置 vim /etc/profile path 即可如::/export/servers/scala-2.11.8/bin】
spark-shell是Spark自帶的交互式Shell程序,方便用户進行交互式編程,用户可以在該命令行下用scala編寫spark程序。

	   2.1:運行spark-shell --master local[N] 讀取本地文件  **【local 是指本地運行 跟spark集羣沒有關係  n 代表設置幾個進程給當前這個進程跑】**
	需求:讀取本地文件,實現文件內的單詞計數。本地文件words.txt 內容如下:
	hello me 
	hello you 
	hello her

步驟:到 cd/export/servers/spark下 執行 【注意 ./spark-shell --master local[2] 沒有。/可能會報 command not found】
spark-shell --master local[2]
然後執行:
sc.textFile(“file:///root///words.txt”).flatMap(.split(" ")).map((,1)).reduceByKey(+).collect
在這裏插入圖片描述
代碼説明:
代碼説明:
sc:Spark-Shell中已經默認將SparkContext類初始化為對象sc。用户代碼如果需要用到,則直接應用sc即可。
textFile:讀取數據文件
flatMap:對文件中的每一行數據進行壓平切分,這裏按照空格分隔。
map:對出現的每一個單詞記為1(word,1)
reduceByKey:對相同的單詞出現的次數進行累加
collect:觸發任務執行,收集結果數據。

2.2 通過spark-shell  --master local[N]  讀取hdfs 上的文件 並統計單詞個數。
具體需求:
spark-shell運行時指定具體的master地址,讀取HDFS上的數據,做單詞計數,然後將結果保存在HDFS上。

1.在spark-env.sh ,添加HADOOP_CONF_DIR配置,指明瞭hadoop的配置文件後,默認它就是使用的hdfs上的文件
export HADOOP_CONF_DIR=/export/servers/hadoop/etc/hadoop
2.再啓動啓動hdfs,然後重啓spark集羣
3.向hdfs上傳一個文件到hdfs://node1:9000/words.txt 【hadoop fs -put words.txt / 或者hdfs dfs -put words.txt】
4.在spark shell中用scala語言編寫spark程序
sc.textFile("/words.txt").flatMap(.split(" ")).map((,1)).reduceByKey(+).collect

執行:spark-shell
–master spark://node1:7077
–executor-memory 1g
–total-executor-cores 2

參數説明
–master spark://node1:7077 指定Master的地址
–executor-memory 1g 指定每個worker可用內存為1g
–total-executor-cores 2 指定整個集羣使用的cup核數為2個

注意:
如果啓動spark shell時沒有指定master地址,但是也可以正常啓動spark shell和執行spark shell中的程序,其實是啓動了spark的local模式,該模式僅在本機啓動一個進程,沒有與集羣建立聯繫。
執行這個:
sc.textFile("/words.txt").flatMap(.split(" ")).map((,1)).reduceByKey(+).saveAsTextFile("/wc")

saveAsTextFile:保存結果數據到文件中

執行後 在hdfs 上查看:
1.hdfs dfs -cat /wc/part*
在這裏插入圖片描述
2.訪問note1:50070 也可以查看
在這裏插入圖片描述
**spark-shell僅在測試和驗證我們的程序時使用的較多,在生產環境中,通常會在IDEA中編寫程序,然後打成jar包,最後提交到集羣。**最常用的是創建一個Maven項目,利用Maven來管理jar包的依賴。

四 在idea 中 用scala 編寫 wordcount程序
1.創建一個maven項目
2.pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.test</groupId>
  <artifactId>spark</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>spark</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <scala.version>2.11.8</scala.version>
    <hadoop.version>2.7.4</hadoop.version>
    <spark.version>2.1.3</spark.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
    </dependency>

  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.4.3</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass></mainClass>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

3.創建src/main/scala和src/test/scala,與pom.xml中的配置保持一致 並將目錄mask一下
在這裏插入圖片描述

在這裏插入圖片描述
4.創建 scala object
代碼:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val master: SparkConf = new SparkConf().setAppName("WrodCount").setMaster("local[2]")

    val sc: SparkContext = new SparkContext(master)
    //讀取文件
    val file: RDD[String] = sc.textFile("f://words.txt")

    //對文件中每一行單詞進行壓平切分
    val words: RDD[String] = file.flatMap(_.split(" "))
    //對每一個單詞計數為1 轉化為(單詞,1)
    val wordAndOne: RDD[(String, Int)] = words.map(x=>(x,1))
    //相同的單詞進行彙總 前一個下劃線表示累加數據,後一個下劃線表示新數據
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    val finalResult: Array[(String, Int)] = result.collect()
    finalResult.foreach(println)

    //保存數據到HDFS
   // result.saveAsTextFile(args(1))
    sc.stop()
  }

}

可以本地運行 看輸出結果 如:
在這裏插入圖片描述
2.上面代碼有倆種形式,1.上面是將參數直接寫在代碼中 2.可以將參數動態 傳遞
就是將 文本文件地址 改成 arg(0) 和保存的路徑 改為 arg(1)
代碼如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val master: SparkConf = new SparkConf().setAppName("WrodCount").setMaster("local[2]")

    val sc: SparkContext = new SparkContext(master)
    //讀取文件
    val file: RDD[String] = sc.textFile(args(0))

    //對文件中每一行單詞進行壓平切分
    val words: RDD[String] = file.flatMap(_.split(" "))
    //對每一個單詞計數為1 轉化為(單詞,1)
    val wordAndOne: RDD[(String, Int)] = words.map(x=>(x,1))
    //相同的單詞進行彙總 前一個下劃線表示累加數據,後一個下劃線表示新數據
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
    //val finalResult: Array[(String, Int)] = result.collect()
   // finalResult.foreach(println)

    //保存數據到HDFS
    result.saveAsTextFile(args(1))
    sc.stop()
  }

}

我們將上面的2是將運行後的數據 保存到hdfs 中 我們打包 在linux系統中運行
【前提:開啓 hdfs ./start-dfs.sh ,開啓zk 一鍵啓動 ./start-all-zk.sh ,開始spark ./start-all.sh 】
執行代碼:

 ./spark-submit \
 --class WordCount \
 --master spark://note1:7077 \
 --executor-memory 1g \
 --total-executor-cores 2 \
 /root/sparkjar/spark-1.0-SNAPSHOT.jar \
/words.txt \
/spark_out_2

結果在note1:50070 中成功顯示:
在這裏插入圖片描述
4.3 上面是用scala語言完成的 單詞統計。我們現在有java spark 來完成單詞的統計
4.3.1
創建一個java class

package cn.test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Int;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class javaword {
    public static void main(String[] args) {
        //構建sparkconf,設置配置信息
        SparkConf sp = new SparkConf().setAppName("javaword").setMaster("local[2]");
        //構建java版的sparkContext
        JavaSparkContext javaSparkContext = new JavaSparkContext(sp);
        //構建java版的sparkContext
        JavaRDD<String> file = javaSparkContext.textFile("F://words.txt");
        //對每一行單詞進行切分
        JavaRDD<String> wordsRDD = file.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                String[] words = s.split(" ");

                return Arrays.asList(words).iterator();
            }
        });
        //給每個單詞計為 1
         Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為PairRDD。
        //mapToPair函數會對一個RDD中的每個元素調用f函數,其中原來RDD中的每一個元素都是T類型的,
        // 調用f函數後會進行一定的操作把每個元素都轉換成一個<K2,V2>類型的對象,其中Tuple2為多元組
        JavaPairRDD<String, Integer> wordAndOnePairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        //相同單詞出現的次數累加
        JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOnePairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        List<Tuple2<String, Integer>> list = resultJavaPairRDD.collect();
        for (Tuple2<String,Integer> tuple:list
             ) {
            System.out.println("單詞:"+tuple._1+" 出現的次數"+tuple._2);

        }
    }
}

運行結果如下:
在這裏插入圖片描述
上面java spark 單詞統計 只是簡單單詞統計,但如果有其他需求 該怎麼辦呢?如 反轉 ,排序等需求 ?
上面我們已經提到:spark 為包含鍵值對類型的rdd提供了一些專用的操作,這些rdd被稱為PairRDD ,maptopair函數會對一個rdd中的每個元素調用f函數。
反轉和排序實現代碼如下:

package cn.test;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Int;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class javaword {
    public static void main(String[] args) {
        //構建sparkconf,設置配置信息
        SparkConf sp = new SparkConf().setAppName("javaword").setMaster("local[2]");
        //構建java版的sparkContext
        JavaSparkContext javaSparkContext = new JavaSparkContext(sp);
        //構建java版的sparkContext
        JavaRDD<String> file = javaSparkContext.textFile("F://words.txt");
        //對每一行單詞進行切分
        JavaRDD<String> wordsRDD = file.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) throws Exception {
                String[] words = s.split(" ");

                return Arrays.asList(words).iterator();
            }
        });
        //給每個單詞計為 1
         Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為PairRDD。
        //mapToPair函數會對一個RDD中的每個元素調用f函數,其中原來RDD中的每一個元素都是T類型的,
        // 調用f函數後會進行一定的操作把每個元素都轉換成一個<K2,V2>類型的對象,其中Tuple2為多元組
        JavaPairRDD<String, Integer> wordAndOnePairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        //相同單詞出現的次數累加
        JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOnePairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        List<Tuple2<String, Integer>> list = resultJavaPairRDD.collect();
        for (Tuple2<String,Integer> tuple:list
             ) {
            System.out.println("單詞:"+tuple._1+" 出現的次數"+tuple._2);

        }

        //=================================
        //總需求按照單詞出現的次數降序
        //思路:將次數作為key 用maptopair 將(單詞,次數)-->(次數,單詞) 然後調用sortbyKey進行(false)降序排序,排序完成後 我們可以再調用maptopair 將(次數,單詞)-->(單詞,次數)

        //需求反轉
        JavaPairRDD<Integer, String> reverseJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
                return new Tuple2<Integer, String>(tuple._2,tuple._1);
            }
        });
        JavaPairRDD<String, Integer> sortJavaRdd = reverseJavaPairRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception {
                return new Tuple2<String, Integer>(tuple._2, tuple._1);
            }
        });
        List<Tuple2<String, Integer>> list1 = sortJavaRdd.collect();
        for (Tuple2<String,Integer> tuple:list1  
             ) {
            System.out.println("單詞:"+tuple._1+" 出現的次數"+tuple._2);
            
        }

    }
}

效果:
在這裏插入圖片描述

在這裏插入圖片描述