• <fieldset id="2mqau"></fieldset>
    <tbody id="2mqau"><tfoot id="2mqau"></tfoot></tbody>
    <option id="2mqau"><noframes id="2mqau"></noframes></option>
  • <input id="2mqau"></input>
  • 第十周,大數(shù)據(jù)建模學(xué)習(xí)應(yīng)用于模具行業(yè)的培訓(xùn)班

    2019-11-03 16:09:50

    早上, 很早到書城, 今天最后一天培訓(xùn), 還是在  SPARK 復(fù)習(xí)。


            開發(fā) SPARK 使用  scala 、java 、 python 、 api 以及    shell 語言開發(fā)的  搜索引擎。


           缺點(diǎn):    不適合  web服務(wù)  ,   dao層    、web爬蟲 


           優(yōu)點(diǎn):    伯克利數(shù)據(jù)分析 生態(tài)圈  , 機(jī)器學(xué)習(xí) , 數(shù)據(jù)挖掘, 數(shù)據(jù)庫, 信息檢索, 自然語言處理, 語音識別

                          以  spark core 為核心 , 從  hdfs  , amazon  s3  hbase 等持久讀取數(shù)據(jù)


     image.png


    SPARK SQL  基于內(nèi)存的, 



    image.png

    查看數(shù)據(jù)庫

    深圳塑膠模具廠,深圳市模具廠,深圳模具廠,深圳模具,深圳塑膠模具

    image.png

    image.png

    image.png

    image.png


    image.png



    這個(gè)在前端也能看到,點(diǎn)擊進(jìn)去, 詳細(xì)說明。


    image.png

           image.png

    image.png

    image.png



    image.png

    image.png

    image.pngimage.png


    HIVE 不支持事物的, 增刪改查。 ACRD 使用 


    image.png



    在那一臺機(jī)器完成的, 可以看看

    image.png


    提交的人物, 由系統(tǒng)分配節(jié)點(diǎn)去跑。


    image.png

    image.png



    image.png





    HIVE 主要做 數(shù)據(jù)倉庫  DW   , 

    image.png



    插入數(shù)據(jù)完成, 做一下  查詢數(shù)據(jù)

    image.png



    將  HIVE 的內(nèi)核, 改成  SPARK 以后, 提升速度 80倍

    image.png



    image.png



    image.png




    上面是   HOVE 的插入和查詢的過程。


    接著跑  SPARK SQL 的過程


    image.png


    image.png


    所有的  數(shù)據(jù)庫和表, 都是存儲在  HDFS 上的。


    現(xiàn)在要用  SPARK SQL 來執(zhí)行。


    image.png


    image.png


    image.png


    show database  

    use  

    select 

    image.png


    image.png




    這個(gè)是 SPARK的管理頁面


    image.png



    image.png

    image.png


    image.png

    image.png




    image.png

    image.png

    image.png

    image.png

    image.png

    image.png

    image.png


    image.png

    image.png

    image.png


    image.png


    DAG 的有向性的結(jié)合圖



    DAG Scheduler 





    image.png


    RDD 經(jīng)過 4個(gè) 聚合步驟后, 形成一個(gè)  DAG , 多個(gè) DAG 組合編程  DAG計(jì)劃表


    下面是  scala 寫的


    image.png



    要統(tǒng)計(jì)下面文件中的個(gè)數(shù)


    image.png


    image.png

    上述雖然是  一行代碼, 但是也顯示的淋漓盡致


    image.png


    image.png




    運(yùn)行的狀況


    image.png



    image.png


    image.png




    image.png


    image.png


    image.png



    管理頁面的結(jié)果image.png


    image.png


    環(huán)境配置image.png


    image.png



    image.png


    image.png


    SPARK   單詞統(tǒng)計(jì)的 DEMO  , 接下來再看看


    image.png

    image.png


    image.png



    SPARK STREM 消費(fèi)kaFka 的數(shù)據(jù),  管理者確定是   ZooKeeper


    image.png



    image.png



    image.png


    下午講解     Klin       做一些報(bào)表的功能, SPARK 




    image.png

    要搭建不同的集群, 由原來的 store 的程序, 遷移到  spark 上來,    Flink 是把交互式查詢和實(shí)時(shí)計(jì)算合在一起了。

    Flink 也是用 Scala 來寫。

    image.png


    1、客戶端提交作業(yè)后,啟動Driver,Driver是Spark作業(yè)的Master(也就是通過Driver來啟動Receiver,定時(shí)去啟動任務(wù)的處理,

    注意的是,驅(qū)動啟動任務(wù)會受前一個(gè)任務(wù)執(zhí)行的影響。也就是前一個(gè)任務(wù)沒有執(zhí)行完成后,是不會啟動后邊的任務(wù)的。 

    所以,注意你的streaming的執(zhí)行時(shí)間,絕對不要超過Recive數(shù)據(jù)的時(shí)間)


    2、每個(gè)作業(yè)包含多個(gè)Executor,每個(gè)Executor以線程的方式運(yùn)行task,Spark Streaming至少包含一個(gè)Receiver task。

    (一個(gè)Executor就是一個(gè)spark進(jìn)程,在yarn中就是一個(gè)container,這個(gè)大家應(yīng)該知道。

    然后Receiver task是在driver中創(chuàng)建的,我理解一個(gè)Receiver是運(yùn)行在一個(gè)Executor中的。

    然后如果想要創(chuàng)建多個(gè)Receiver,那么需要大概這樣做(1 to 10).map(_.createStream…),這樣就能創(chuàng)建10個(gè)receiver task啦。 

    注意這個(gè)數(shù)量當(dāng)然不能超過你的結(jié)點(diǎn)數(shù)量啦。 

    還有個(gè)問題,通常使用kafka比較合適,因?yàn)閗afka是stream向kafka來poll數(shù)據(jù)。

    而他媽的flume默認(rèn)只支持pull,如果想支持poll,那需要定制sink,那真是太惡心了。)


    3、Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報(bào)給Driver,然后備份到另外一個(gè)Executor上。

    (默認(rèn)情況下接受數(shù)據(jù)是200毫秒生成一個(gè)block,我理解一個(gè)block應(yīng)該是一個(gè)partition?

    這個(gè)還不確定,需要對照源代碼看一下;

    然后會把生成的Block隨機(jī)扔到不同的Executor,同時(shí),driver去派發(fā)任務(wù)時(shí),也會找到就近的Executor。

    我理解,節(jié)點(diǎn)中的所有executor都應(yīng)該會有數(shù)據(jù)才對)


    4、ReceiverTracker維護(hù)Receiver匯報(bào)的BlockId。

        (這個(gè)ReceiverTracker應(yīng)該是維護(hù)在Driver中,Driver會根據(jù)維護(hù)的這些數(shù)據(jù)塊進(jìn)行任務(wù)的派發(fā))


    5、Driver定時(shí)生成JobGenerator,根據(jù)DStream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。


    6、JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,

          每個(gè)stage包含一到多個(gè)task。(我記得DAGScheduler會對任務(wù)做一層優(yōu)化)


    7、TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護(hù)task的運(yùn)行狀態(tài)。


    8、當(dāng)tasks,stages,jobset完成后,單個(gè)batch(批處理)才算完成。


    image.png

    具體流程:

    1. 客戶端提交作業(yè)后啟動Driver,Driver是spark作業(yè)的Master

    2. 每個(gè)作業(yè)包含多個(gè)Executor,每個(gè)Executor以線程的方式運(yùn)行task,Spark Streaming至少包含一個(gè)receiver task

    3. Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報(bào)給Driver,然后備份到另外一個(gè)Executor上。

    4. ReceiverTracker維護(hù)Reciver匯報(bào)的BlockId。

    5. Driver定時(shí)啟動JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。

    6. JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個(gè)stage包含一到多個(gè)task。

    7. TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護(hù)task的運(yùn)行狀態(tài)。

    8. 當(dāng)tasks,stages,jobset完成后,單個(gè)batch才算完成。



    image.png



    ---------------------------------------------------   第十周  盧老師 講課筆記


    Spark Streaming原理:


    Spark Streaming是基于spark的流式批處理引擎,其基本原理是把輸入的數(shù)據(jù)以某一時(shí)間間隔批量的處理,當(dāng)批處理縮短到秒級時(shí),便可以用于處理實(shí)時(shí)數(shù)據(jù)流。


    kafka


    flume                                        HDFS


    hdfs      -------> spark streaming --------> DataBase


    zeromq                   |                   Dashboards

                                   | 

    twitter                    |

                                  |

    input data streaming -->sparkstreaming-->batches of input data--->spark engine --->batches of processed data




    live input stream --> spark Streaming --->t t t --->spark job --->results



    sparkstreaming架構(gòu)


    spark streaming作業(yè)流程:


    Driver:

    receivertracker

    jobGenerator

    JobScheduler

    DagScheduler

    TaskScheduler


    BlockManagerMaster


    Executor:

    Receiver

    BlockManagerSlave


    Executor:

    Task

    BlockManagerSlave



    運(yùn)行機(jī)制:

    1、客戶端提交作業(yè)后啟動一個(gè)Driver,Driver是spark作業(yè)的Master。


    2、每個(gè)作業(yè)包含多個(gè)Executor,每個(gè)Executor以線程的方式運(yùn)行Task,Spark Streaing至少包含一個(gè)receiver task。


    3、Receiver接收數(shù)據(jù)后生成Block,并把Blockid匯報(bào)給Driver,然后備份到另外一個(gè)Executor上


    4、ReceiverTracker維護(hù)Recivver匯報(bào)的BlockId。


    5、Driver定時(shí)啟動JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建JobSet,交給JobScheduler.


    6、JobScheduler負(fù)責(zé)調(diào)度JobSet,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個(gè)Stages包含一個(gè)到多個(gè)Task。


    7、TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護(hù)task的運(yùn)行狀態(tài)。


    8、當(dāng)tasks、stages、jobset完成后,單個(gè)batch才算完成。





    Spark Streaming消費(fèi)Kafka數(shù)據(jù):


    import org.apache.kafka.clients.consumer.ConsumerRecord

    import org.apache.kafka.common.serialization.StringDeserializer

    import org.apache.spark.streaming.kafka010._

    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


    val kafkaParams = Map[String, Object](

      "bootstrap.servers" -> "localhost:9092,anotherhost:9092",

      "key.deserializer" -> classOf[StringDeserializer],

      "value.deserializer" -> classOf[StringDeserializer],

      "group.id" -> "use_a_separate_group_id_for_each_stream",

      "auto.offset.reset" -> "latest",

      "enable.auto.commit" -> (false: java.lang.Boolean)

    )


    val topics = Array("topicA", "topicB")

    val stream = KafkaUtils.createDirectStream[String, String](

      streamingContext,

      PreferConsistent,

      Subscribe[String, String](topics, kafkaParams)

    )


    stream.map(record => (record.key, record.value))





    Spark統(tǒng)計(jì)單詞次數(shù):


    text_file = sc.textFile("hdfs://...")

    counts = text_file.flatMap(lambda line: line.split(" ")) \

                 .map(lambda word: (word, 1)) \

                 .reduceByKey(lambda a, b: a + b)

    counts.saveAsTextFile("hdfs://...")




    SparkSQL數(shù)據(jù)查詢:


    // Creates a DataFrame based on a table named "people"

    // stored in a MySQL database.

    val url =

      "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"

    val df = sqlContext

      .read

      .format("jdbc")

      .option("url", url)

      .option("dbtable", "people")

      .load()


    // Looks the schema of this DataFrame.

    df.printSchema()


    // Counts people by age

    val countsByAge = df.groupBy("age").count()

    countsByAge.show()


    // Saves countsByAge to S3 in the JSON format.

    countsByAge.write.format("json").save("s3a://...")




    Spark機(jī)器學(xué)習(xí)實(shí)現(xiàn)邏輯回歸算法:


    // Every record of this DataFrame contains the label and

    // features represented by a vector.

    val df = sqlContext.createDataFrame(data).toDF("label", "features")


    // Set parameters for the algorithm.

    // Here, we limit the number of iterations to 10.

    val lr = new LogisticRegression().setMaxIter(10)


    // Fit the model to the data.

    val model = lr.fit(df)


    // Inspect the model: get the feature weights.

    val weights = model.weights


    // Given a dataset, predict each point's label, and show the results.

    model.transform(df).show()









    -------------------------------------------------------------------------------------------------------


    最后做一次復(fù)習(xí) ,  有始有終。


    image.png


    image.png

    -------在最下面增加 2行代碼。


    image.png

    如果有 幾千臺服務(wù)器, 必須配置免登錄,  生成的密鑰, 追加到這個(gè)文件中去。


    image.png


    image.png

    image.png


    hadoop 的官方網(wǎng)站, 有非常詳細(xì)的內(nèi)容。


    image.png


    image.png


    多種方式都可以查看是不是啟動成功


    image.png


    image.png


    image.png


    image.png

    50070端口, HDFS的端口


    搭建完成后,     


    image.png



    下面是  Yarn 的配置信息

    image.png


    需要 shuffle  的, 需要增加 各類  shuffle .


    FIFO     FAIR    先進(jìn)先出    公平調(diào)度  


    上面是 Yarn 的配置文件, 這是第三周講的


    第四周講   Hbase , 是 google  的大表實(shí)現(xiàn)

    image.png

    image.png

    image.png

    image.png

    image.png

    image.png


    image.png


    image.png


    image.png


    image.png


    image.png



    image.png

    image.png


    image.png


    image.png


    image.pngimage.png

    image.png


    image.png


    單機(jī)版和集群版的不同

    image.png


    image.png

    image.png


    image.png

    image.png


    image.png

    image.png

    image.png

    image.png



    image.pngimage.png


    image.png


    第5周 、 第六周 


    image.png


    image.png

    image.png


    image.png

    image.png

    一切階是對像


    image.png

    image.png


    image.png

    image.png

    image.png



    image.png


    image.png


    以上是   5周   Scala  



    image.png




    image.png


    image.png


    image.png



    image.png



    image.png


    image.png


    image.png



    image.png

    image.png


    image.png


    image.png




















    首頁
    產(chǎn)品
    新聞
    聯(lián)系
    国产精品美女久久久久AV爽,俺去俺来也在线WWW色官网,成年美女黄的视频网站,正在播放无套少妇出租屋 吉木乃县| 泗阳县| 大石桥市| 新干县| 镶黄旗| 普格县| 方城县| 栾城县| 阿拉善盟| 青岛市| 塔河县| 剑阁县| 龙口市| 信阳市| 荥经县| 隆化县| 鄯善县| 蒙城县| 石首市| 江津市| 治多县| 新巴尔虎右旗| 邵阳市| 河间市| 岑巩县| 乐都县| 翼城县| 涞水县| 清水县| 和田市| 宜春市| 大城县| 平罗县| 广宗县| 平顶山市| 黔西县| 扶风县| 财经| 太白县| 三明市| 九寨沟县|