發布時間: 2018-01-09 00:33:38
目前,Spark生態系統已經發展成為一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基于內存計算的大數據并行計算框架。Spark基于內存計算,提高了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark部署在大量廉價硬件之上,形成集群。Spark得到了眾多大數據公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的Spark已應用于鳳巢、大搜索、直達號、百度大數據等業務;阿里利用GraphX構建了大規模的圖計算和圖挖掘系統,實現了很多生產系統的推薦算法;騰訊Spark集群達到8000臺的規模,是當前已知的世界上較大的Spark集群。?
Spark Core:
包含Spark的基本功能,包含任務調度,內存管理,容錯機制等
內部定義了RDDs(彈性分布式數據集)
提供了很多APIs來創建和操作這些RDDs
應用場景,為其他組件提供底層的服務
Spark SQL:
是Spark處理結構化數據的庫,就像Hive SQL,Mysql一樣
應用場景,企業中用來做報表統計
Spark Streaming:
是實時數據流處理組件,類似Storm
Spark Streaming提供了API來操作實時流數據
應用場景,企業中用來從Kafka接收數據做實時統計
MLlib:
一個包含通用機器學習功能的包,Machine learning lib
包含分類,聚類,回歸等,還包括模型評估和數據導入。
MLlib提供的上面這些方法,都支持集群上的橫向擴展。
應用場景,機器學習。
Graphx:
是處理圖的庫(例如,社交網絡圖),并進行圖的并行計算。
像Spark Streaming,Spark SQL一樣,它也繼承了RDD API。
它提供了各種圖的操作,和常用的圖算法,例如PangeRank算法。
應用場景,圖計算。
Cluster Managers:
就是集群管理,Spark自帶一個集群管理是單獨調度器。
常見集群管理包括Hadoop YARN,Apache Mesos
基于MapReduce的計算引擎通常會將中間結果輸出到磁盤上,進行存儲和容錯。
出于任務管道承接的,考慮,當一些查詢翻譯到MapReduce任務時,往往會產生多個Stage,而這些串聯的Stage又依賴于底層文件系統(如HDFS)來存儲每一個Stage的輸出結果HadoopSpark Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。?
Spark特點:
快:與Hadoop的MapReduce相比,Spark基于內存的運算要快100倍以上,基于硬盤的運算也要快10倍以上。Spark實現了高效的DAG執行引擎,可以通過基于內存來高效處理數據流 ??
易用:
Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶可以快速構建不同的應用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark集群來驗證解決問題的方法。?
通用:
Spark提供了統一的解決方案。Spark可以用于批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不同類型的處理都可以在同一個應用中無縫使用。Spark統一的解決方案非常具有吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減少開發和維護的人力成本和部署平臺的物力成本。?
兼容性:
Spark可以非常方便地與其他的開源產品進行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,并且可以處理所有Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對于已經部署Hadoop集群的用戶特別重要,因為不需要做任何數據遷移就可以使用Spark的強大處理能力。Spark也可以不依賴于第三方的資源管理和調度器,它實現了Standalone作為其內置的資源管理和調度框架,這樣進一步降低了Spark的使用門檻,使得所有人都可以非常容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集群的工具。?
Spark集群安裝:
機器部署
準備兩臺以上Linux服務器,安裝好JDK1.8
下載Spark安裝包
http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
上傳解壓安裝包
上傳spark-2.2.0-bin-hadoop2.7.tgz安裝包到Linux上
解壓安裝包到指定位置
[hadoop@hdp01 ~]$ tar zxvf spark-2.2.0-bin-hadoop2.7.tgz -C apps
[hadoop@hdp01 ~]$ cd apps
[hadoop@hdp01 apps]$ mv spark-2.2.0-bin-hadoop2.7 spark
配置Spark:
進入到Spark安裝目錄
[hadoop@hdp01 apps]$ cd spark/
進入conf目錄并重命名并修改spark-env.sh.template文件
[hadoop@hdp01 apps]$cd conf/
[hadoop@hdp01 apps]$mv spark-env.sh.template spark-env.sh
[hadoop@hdp01 apps]$vi spark-env.sh
在該配置文件中添加如下配置
export JAVA_HOME=/opt/jdk1.8.0_121
export SPARK_MASTER_IP=hdp08
export SPARK_MASTER_PORT=7077
保存退出
重命名并修改slaves.template文件
[hadoop@hdp01 conf]$ mv slaves.template slaves
[hadoop@hdp01 conf]$ vi slaves
在該文件中添加子節點所在的位置(Worker節點)
hdp05
hdp06
hdp07
保存退出
將配置好的Spark拷貝到其他節點上
[hadoop@hdp01 spark]$ scp -r spark hadoop@hdp05:/home/hadoop/apps
[hadoop@hdp01 spark]$ scp -r spark hadoop@hdp06:/home/hadoop/apps
[hadoop@hdp01 spark]$ scp -r spark hadoop@hdp06:/home/hadoop/apps
Spark集群配置完畢,目前是1個Master,3個Work,在hdp08上啟動Spark集群
[hadoop@hdp01 spark]$ sbin/start-all.sh
啟動后執行jps命令,主節點上有Master進程,其他子節點上有Work進行,登錄Spark管理界面查看集群狀態(主節點):http://hdp08:8080/
到此為止,Spark集群安裝完畢,但是有一個很大的問題,那就是Master節點存在單點故障,要解決此問題,就要借助zookeeper,并且啟動至少兩個Master節點來實現高可靠,配置方式比較簡單:
Spark集群規劃:hdp01,hdp02是Master;hdp05,hdp06,hdp07是Worker
安裝配置zk集群,并啟動zk集群
停止spark所有服務,修改配置文件spark-env.sh,在該配置文件中刪掉SPARK_MASTER_IP并添加如下配置
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp05,hdp06,hdp07 - Dspark.deploy.zookeeper.dir=/spark"
1.在hdp01節點上修改slaves配置文件內容指定worker節點
2.在hdp01上執行sbin/start-all.sh腳本,然后在hdp02上執行sbin/start-master.sh啟動第二個Master
執行Spark程序:
spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶可以在該命令行下用scala編寫spark程序。
啟動spark shell:
bin/spark-shell \
--master spark://hdp08:7077 \
--executor-memory 1g \
--total-executor-cores 2
參數說明:
--master spark://node1.togogo.cn:7077 指定Master的地址
--executor-memory 512M 指定每個worker可用內存為512M,也可以指定為2g
--total-executor-cores 2 指定整個集群使用的cup核數為2個
注意:
如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執行spark shell中的程序,其實是啟動了spark的local模式,該模式僅在本機啟動一個進程,沒有與集群建立聯系。
Spark Shell中已經默認將SparkContext類初始化為對象sc。用戶代碼如果需要用到,則直接應用sc即可
在spark shell中編寫WordCount程序:
1.首先啟動hdfs
2.向hdfs上傳一個文件到hdfs://hdp01:9000/ README.md
3.在spark shell中用scala語言編寫spark程序
sc.textFile("hdfs://hdp08:9000/spark/README.md").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hdp08:9000/spark/out")
4.使用hdfs命令查看結果
[hadoop@hdp01 spark]$ hadoop fs -ls /spark/out
說明:
sc是SparkContext對象,該對象時提交spark程序的入口
textFile(hdfs://hdp01:9000/spark/README.md)是hdfs中讀取數據
flatMap(_.split(" "))先map在壓平
map((_,1))將單詞和1構成元組
reduceByKey(_+_)按照key進行reduce,并將value累加
saveAsTextFile("hdfs://hdp01:9000/spark/out")將結果寫入到hdfs中
注意一:scala中的下劃線含義
1、 作為“通配符”,類似Java中的*。如import scala.math._
2、 _*作為一個整體,告訴編譯器你希望將某個參數當作參數序列處理!例如val s = sum(1 to 5:_*)就是將1 to 5當作參數序列處理。
3、 指代一個集合中的每個元素。例如我們要在一個Array a中篩出偶數,并乘以2,可以用以下辦法:
a.filter(_%2==0).map(2*_)。
又如要對緩沖數組ArrayBuffer b排序,可以這樣:
val bSorted = b.sorted(_
4、 在元組中,可以用方法_1, _2, _3訪問組員。如a._2。其中句點可以用空格替代。
5、 使用模式匹配可以用來獲取元組的組員,例如
val (first, second, third) = t
但如果不是所有的部件都需要,那么可以在不需要的部件位置上使用_。比如上一例中val (first, second, _) = t
6、 還有一點,下劃線_代表的是某一類型的默認值。
對于Int來說,它是0。
對于Double來說,它是0.0
對于引用類型,它是null。
注意二:map()與flatMap()區別
aap()是將函數用于RDD中的每個元素,將返回值構成新的RDD。
flatmap()是將函數應用于RDD中的每個元素,將返回的迭代器的所有內容構成新的RDD,這樣就得到了一個由各列表中的元素組成的RDD,而不是一個列表組成的RDD。
有些拗口,看看例子就明白了。
val rdd = sc.parallelize(List("coffee panda","happy panda","happiest panda party"))
輸入
rdd.map(x=>x).collect
結果
res9: Array[String] = Array(coffee panda, happy panda, happiest panda party)
輸入
rdd.flatMap(x=>x.split(" ")).collect
結果
res8: Array[String] = Array(coffee, panda, happy, panda, happiest, panda, party)
flatMap說明白就是先map然后再flat,再來看個例子
val rdd1 = sc.parallelize(List(1,2,3,3))
scala> rdd1.map(x=>x+1).collect
res10: Array[Int] = Array(2, 3, 4, 4)
scala> rdd1.flatMap(x=>x.to(3)).collect
res11: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
---------------------------------------------------------------------------------------------------------------------------
var li=List(1,2,3,4)
var res =li.flatMap(x=> x match {
case 3 => List(3.1,3.2)
case _ =>List(x*2)
})
println(res)
li= List(1,2,3,4)
var res2 =li.map(x=> x match {
case 3 =>List(3.1,3.2)
case _ =>x*2
})
println(res2)
//output=>
List(2,4, 3.1,3.2, 8)
List(2,4, List(3.1,3.2), 8)
Program exited.
這個過程就像是先 map, 然后再將 map 出來的這些列表首尾相接 (flatten).
在IDEA中編寫WordCount程序:
spark shell僅在測試和驗證我們的程序時使用的較多,在生產環境中,通常會在IDE中編制程序,然后打成jar包,然后提交到集群。
2.新建一個scala class,類型為Object
3.編寫spark程序
package net.togogo.scalasparkdemo
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCount {
def main(args: Array[String]) {
//創建SparkConf()并設置App名稱
val conf = new SparkConf().setAppName("WC")
//創建SparkContext,該對象是提交spark App的入口
val sc = new SparkContext(conf)
//使用sc創建RDD并執行相應的transformation和action
sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _, 1).sortBy(_._2, false).saveAsTextFile(args(1))
//停止sc,結束該任務
sc.stop()
}
}
?4.導出jar包
5.首先啟動hdfs和Spark集群
啟動hdfs
start-dfs.sh
啟動spark
/sbin/start-all.sh
6.使用spark-submit命令提交Spark應用(注意參數的順序)
/home/hadoop/apps/spark/bin/spark-submit \
--class net.togogo.sparkscalaproject.WordCountDemo \
--master spark://hdp08:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
/home/hadoop/wcdemo.jar \
hdfs://hdp08:9000/work/README.md \
hdfs://hdp08:9000/work/out3
查看程序執行結果
上一篇: {大數據}RDD分布式數據集
下一篇: {大數據}hdfs的工作機制