自慰套教室~女子全员妊娠,精品无码国产自产拍在线观看蜜桃,亚洲国产精品成人精品无码区,久别的草原在线看视频免费

集團站切換校區

驗證碼已發送,請查收短信

復制成功
微信號:togogoi
添加微信好友, 詳細了解課程
已復制成功,如果自動跳轉微信失敗,請前往微信添加好友
打開微信
圖標

學習文章

當前位置:首頁 > >學習文章 > >

{大數據}spark入門

發布時間: 2018-01-09 00:33:38


        

目前,Spark生態系統已經發展成為一個包含多個子項目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項目,Spark是基于內存計算的大數據并行計算框架。Spark基于內存計算,提高了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark部署在大量廉價硬件之上,形成集群。Spark得到了眾多大數據公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優酷土豆。當前百度的Spark已應用于鳳巢、大搜索、直達號、百度大數據等業務;阿里利用GraphX構建了大規模的圖計算和圖挖掘系統,實現了很多生產系統的推薦算法;騰訊Spark集群達到8000臺的規模,是當前已知的世界上較大的Spark集群。?


Spark Core:


  包含Spark的基本功能,包含任務調度,內存管理,容錯機制等


  內部定義了RDDs(彈性分布式數據集)


  提供了很多APIs來創建和操作這些RDDs


  應用場景,為其他組件提供底層的服務


Spark SQL:


  是Spark處理結構化數據的庫,就像Hive SQL,Mysql一樣


  應用場景,企業中用來做報表統計


Spark Streaming:


  是實時數據流處理組件,類似Storm


  Spark Streaming提供了API來操作實時流數據


  應用場景,企業中用來從Kafka接收數據做實時統計


MLlib:


   一個包含通用機器學習功能的包,Machine learning lib


   包含分類,聚類,回歸等,還包括模型評估和數據導入。


  MLlib提供的上面這些方法,都支持集群上的橫向擴展。


  應用場景,機器學習。


Graphx:


  是處理圖的庫(例如,社交網絡圖),并進行圖的并行計算。


  像Spark Streaming,Spark SQL一樣,它也繼承了RDD API。


  它提供了各種圖的操作,和常用的圖算法,例如PangeRank算法。


  應用場景,圖計算。


Cluster Managers:


  就是集群管理,Spark自帶一個集群管理是單獨調度器。


  常見集群管理包括Hadoop YARN,Apache Mesos






基于MapReduce的計算引擎通常會將中間結果輸出到磁盤上,進行存儲和容錯。




出于任務管道承接的,考慮,當一些查詢翻譯到MapReduce任務時,往往會產生多個Stage,而這些串聯的Stage又依賴于底層文件系統(如HDFS)來存儲每一個Stage的輸出結果HadoopSpark  Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生態系統,以彌補MapReduce的不足。?


Spark特點:

     快:與Hadoop的MapReduce相比,Spark基于內存的運算要快100倍以上,基于硬盤的運算也要快10倍以上。Spark實現了高效的DAG執行引擎,可以通過基于內存來高效處理數據流  ??


     易用:

         Spark支持Java、Python和Scala的API,還支持超過80種高級算法,使用戶可以快速構建不同的應用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark集群來驗證解決問題的方法。?



     通用:

         Spark提供了統一的解決方案。Spark可以用于批處理、交互式查詢(Spark SQL)、實時流處理(Spark Streaming)、機器學習(Spark MLlib)和圖計算(GraphX)。這些不同類型的處理都可以在同一個應用中無縫使用。Spark統一的解決方案非常具有吸引力,畢竟任何公司都想用統一的平臺去處理遇到的問題,減少開發和維護的人力成本和部署平臺的物力成本。?


     兼容性:


         Spark可以非常方便地與其他的開源產品進行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調度器,并且可以處理所有Hadoop支持的數據,包括HDFS、HBase和Cassandra等。這對于已經部署Hadoop集群的用戶特別重要,因為不需要做任何數據遷移就可以使用Spark的強大處理能力。Spark也可以不依賴于第三方的資源管理和調度器,它實現了Standalone作為其內置的資源管理和調度框架,這樣進一步降低了Spark的使用門檻,使得所有人都可以非常容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集群的工具。?


Spark集群安裝:   

     機器部署

         準備兩臺以上Linux服務器,安裝好JDK1.8


     下載Spark安裝包


     http://mirrors.tuna.tsinghua.edu.cn/apache/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz

     上傳解壓安裝包

     上傳spark-2.2.0-bin-hadoop2.7.tgz安裝包到Linux上

     解壓安裝包到指定位置

     [hadoop@hdp01 ~]$ tar zxvf spark-2.2.0-bin-hadoop2.7.tgz -C apps

     [hadoop@hdp01 ~]$ cd apps

     [hadoop@hdp01 apps]$ mv spark-2.2.0-bin-hadoop2.7 spark

配置Spark:

     進入到Spark安裝目錄

     [hadoop@hdp01 apps]$ cd spark/

     進入conf目錄并重命名并修改spark-env.sh.template文件

     [hadoop@hdp01 apps]$cd conf/

     [hadoop@hdp01 apps]$mv spark-env.sh.template spark-env.sh

     [hadoop@hdp01 apps]$vi spark-env.sh

     在該配置文件中添加如下配置

     export JAVA_HOME=/opt/jdk1.8.0_121

     export SPARK_MASTER_IP=hdp08

     export SPARK_MASTER_PORT=7077

     保存退出

     重命名并修改slaves.template文件

     [hadoop@hdp01 conf]$ mv slaves.template slaves

     [hadoop@hdp01 conf]$ vi slaves

     在該文件中添加子節點所在的位置(Worker節點)

     hdp05

     hdp06

     hdp07

     保存退出

     將配置好的Spark拷貝到其他節點上

     [hadoop@hdp01 spark]$ scp -r spark hadoop@hdp05:/home/hadoop/apps

     [hadoop@hdp01 spark]$ scp -r spark hadoop@hdp06:/home/hadoop/apps

     [hadoop@hdp01 spark]$ scp -r spark hadoop@hdp06:/home/hadoop/apps

     Spark集群配置完畢,目前是1個Master,3個Work,在hdp08上啟動Spark集群

     [hadoop@hdp01 spark]$ sbin/start-all.sh

     啟動后執行jps命令,主節點上有Master進程,其他子節點上有Work進行,登錄Spark管理界面查看集群狀態(主節點):http://hdp08:8080/        


     到此為止,Spark集群安裝完畢,但是有一個很大的問題,那就是Master節點存在單點故障,要解決此問題,就要借助zookeeper,并且啟動至少兩個Master節點來實現高可靠,配置方式比較簡單:


Spark集群規劃:hdp01,hdp02是Master;hdp05,hdp06,hdp07是Worker

     


     安裝配置zk集群,并啟動zk集群

     停止spark所有服務,修改配置文件spark-env.sh,在該配置文件中刪掉SPARK_MASTER_IP并添加如下配置

     export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hdp05,hdp06,hdp07         - Dspark.deploy.zookeeper.dir=/spark"

     1.在hdp01節點上修改slaves配置文件內容指定worker節點

     2.在hdp01上執行sbin/start-all.sh腳本,然后在hdp02上執行sbin/start-master.sh啟動第二個Master


執行Spark程序:

     spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶可以在該命令行下用scala編寫spark程序。


啟動spark shell:       

bin/spark-shell \

--master spark://hdp08:7077 \

--executor-memory 1g \

--total-executor-cores 2


參數說明:

--master spark://node1.togogo.cn:7077 指定Master的地址

--executor-memory 512M 指定每個worker可用內存為512M,也可以指定為2g

--total-executor-cores 2 指定整個集群使用的cup核數為2個


注意:


如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執行spark shell中的程序,其實是啟動了spark的local模式,該模式僅在本機啟動一個進程,沒有與集群建立聯系。

Spark Shell中已經默認將SparkContext類初始化為對象sc。用戶代碼如果需要用到,則直接應用sc即可

在spark shell中編寫WordCount程序:       

1.首先啟動hdfs

2.向hdfs上傳一個文件到hdfs://hdp01:9000/ README.md

3.在spark shell中用scala語言編寫spark程序

sc.textFile("hdfs://hdp08:9000/spark/README.md").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hdp08:9000/spark/out")


4.使用hdfs命令查看結果

[hadoop@hdp01 spark]$ hadoop fs -ls /spark/out

說明:

sc是SparkContext對象,該對象時提交spark程序的入口

textFile(hdfs://hdp01:9000/spark/README.md)是hdfs中讀取數據

flatMap(_.split(" "))先map在壓平

map((_,1))將單詞和1構成元組

reduceByKey(_+_)按照key進行reduce,并將value累加

saveAsTextFile("hdfs://hdp01:9000/spark/out")將結果寫入到hdfs中


注意一:scala中的下劃線含義

1、 作為“通配符”,類似Java中的*。如import scala.math._

2、 _*作為一個整體,告訴編譯器你希望將某個參數當作參數序列處理!例如val s = sum(1 to 5:_*)就是將1 to 5當作參數序列處理。

3、 指代一個集合中的每個元素。例如我們要在一個Array a中篩出偶數,并乘以2,可以用以下辦法:

a.filter(_%2==0).map(2*_)。

又如要對緩沖數組ArrayBuffer b排序,可以這樣:

val bSorted = b.sorted(_

4、 在元組中,可以用方法_1, _2, _3訪問組員。如a._2。其中句點可以用空格替代。

5、 使用模式匹配可以用來獲取元組的組員,例如

val (first, second, third) = t

但如果不是所有的部件都需要,那么可以在不需要的部件位置上使用_。比如上一例中val (first, second, _) = t


6、 還有一點,下劃線_代表的是某一類型的默認值。


對于Int來說,它是0。


對于Double來說,它是0.0


對于引用類型,它是null。


注意二:map()與flatMap()區別


aap()是將函數用于RDD中的每個元素,將返回值構成新的RDD。


flatmap()是將函數應用于RDD中的每個元素,將返回的迭代器的所有內容構成新的RDD,這樣就得到了一個由各列表中的元素組成的RDD,而不是一個列表組成的RDD。


有些拗口,看看例子就明白了。


val rdd = sc.parallelize(List("coffee panda","happy panda","happiest panda party"))


輸入


rdd.map(x=>x).collect


結果


res9: Array[String] = Array(coffee panda, happy panda, happiest panda party)


輸入


rdd.flatMap(x=>x.split(" ")).collect


結果


res8: Array[String] = Array(coffee, panda, happy, panda, happiest, panda, party)


flatMap說明白就是先map然后再flat,再來看個例子


val rdd1 = sc.parallelize(List(1,2,3,3))


scala> rdd1.map(x=>x+1).collect


res10: Array[Int] = Array(2, 3, 4, 4)


scala> rdd1.flatMap(x=>x.to(3)).collect


res11: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)


---------------------------------------------------------------------------------------------------------------------------


var li=List(1,2,3,4)


var res =li.flatMap(x=> x match {


  case 3 => List(3.1,3.2)


  case _ =>List(x*2)


})


println(res)


li= List(1,2,3,4)


var res2 =li.map(x=> x match {


case 3 =>List(3.1,3.2)


case _ =>x*2


})


println(res2)


//output=>


List(2,4, 3.1,3.2, 8)

List(2,4, List(3.1,3.2), 8)

Program exited.


這個過程就像是先 map, 然后再將 map 出來的這些列表首尾相接 (flatten).


在IDEA中編寫WordCount程序:




     spark shell僅在測試和驗證我們的程序時使用的較多,在生產環境中,通常會在IDE中編制程序,然后打成jar包,然后提交到集群。


2.新建一個scala class,類型為Object


3.編寫spark程序


package net.togogo.scalasparkdemo


import org.apache.spark.SparkConf


import org.apache.spark.SparkContext

object WordCount {


def main(args: Array[String]) {


//創建SparkConf()并設置App名稱


val conf = new SparkConf().setAppName("WC")


//創建SparkContext,該對象是提交spark App的入口


val sc = new SparkContext(conf)


//使用sc創建RDD并執行相應的transformation和action


sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _, 1).sortBy(_._2, false).saveAsTextFile(args(1))


//停止sc,結束該任務


sc.stop()




}


}




?4.導出jar包


5.首先啟動hdfs和Spark集群


啟動hdfs


start-dfs.sh


啟動spark


/sbin/start-all.sh

6.使用spark-submit命令提交Spark應用(注意參數的順序)


/home/hadoop/apps/spark/bin/spark-submit  \


--class net.togogo.sparkscalaproject.WordCountDemo  \


--master spark://hdp08:7077  \


--executor-memory 1G  \


--total-executor-cores 2 \


/home/hadoop/wcdemo.jar  \


hdfs://hdp08:9000/work/README.md \


hdfs://hdp08:9000/work/out3


查看程序執行結果

上一篇: {大數據}RDD分布式數據集

下一篇: {大數據}hdfs的工作機制

十五年老品牌
微信咨詢:togogoi 咨詢電話:18922156670 咨詢網站客服:在線客服

相關課程推薦

在線咨詢 ×

您好,請問有什么可以幫您?我們將竭誠提供最優質服務!

<蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <蜘蛛词>| <文本链> <文本链> <文本链> <文本链> <文本链> <文本链>