偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

基于Spark的數(shù)據(jù)分析實(shí)踐

大數(shù)據(jù) Spark
本文主要分析了 Spark RDD 以及 RDD 作為開發(fā)的不足之處,介紹了 SparkSQL 對(duì)已有的常見數(shù)據(jù)系統(tǒng)的操作方法,以及重點(diǎn)介紹了普元在眾多數(shù)據(jù)開發(fā)項(xiàng)目中總結(jié)的基于 SparkSQL Flow 開發(fā)框架。

引言:

Spark是在借鑒了MapReduce之上發(fā)展而來的,繼承了其分布式并行計(jì)算的優(yōu)點(diǎn)并改進(jìn)了MapReduce明顯的缺陷。Spark主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等組件。

本文主要分析了 Spark RDD 以及 RDD 作為開發(fā)的不足之處,介紹了 SparkSQL 對(duì)已有的常見數(shù)據(jù)系統(tǒng)的操作方法,以及重點(diǎn)介紹了普元在眾多數(shù)據(jù)開發(fā)項(xiàng)目中總結(jié)的基于 SparkSQL Flow 開發(fā)框架。

目錄:

  1. Spark RDD
  2. 基于Spark RDD數(shù)據(jù)開發(fā)的不足
  3. SparkSQL
  4. SparkSQL Flow

一、Spark RDD

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、元素可并行計(jì)算的集合。

RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。

//Scala 在內(nèi)存中使用列表創(chuàng)建

  1. val lines = List(“A”, “B”, “C”, “D” …) 
  2. val rdd:RDD = sc.parallelize(lines); 

//以文本文件創(chuàng)建

  1. val rdd:RDD[String] = sc.textFile(“hdfs://path/filename”) 

Spark RDD Partition 分區(qū)劃分 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

新版本的 Hadoop 已經(jīng)把 BlockSize 改為 128M,也就是說每個(gè)分區(qū)處理的數(shù)據(jù)量更大。

Spark 讀取文件分區(qū)的核心原理

本質(zhì)上,Spark 是利用了 Hadoop 的底層對(duì)數(shù)據(jù)進(jìn)行分區(qū)的 API(InputFormat):

  1. public abstract class InputFormat<K,V>{ 
  2.  public abstract List<InputSplit> getSplits(JobContextcontext 
  3.  ) throwsIOException,InterruptedException; 
  4.   
  5.  public abstract RecordReader<K,V> createRecordReader(InputSplitsplit, 
  6.  TaskAttemptContextcontext 
  7.  )throwsIOException,InterruptedException; 

Spark 任務(wù)提交后通過對(duì)輸入進(jìn)行 Split,在 RDD 構(gòu)造階段,只是判斷是否可 Split(如果參數(shù)異常一定在此階段報(bào)出異常),并且 Split 后每個(gè) InputSplit 都是一個(gè)分區(qū)。只有在Action 算子提交后,才真正用 getSplits 返回的 InputSplit 通過 createRecordReader 獲得每個(gè) Partition 的連接。

然后通過 RecordReader 的 next() 遍歷分區(qū)內(nèi)的數(shù)據(jù)。

Spark RDD 轉(zhuǎn)換函數(shù)和提交函數(shù) 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

Spark RDD 的眾多函數(shù)可分為兩大類Transformation 與 Action。Transformation 與 Action 的區(qū)別在于,對(duì) RDD 進(jìn)行 Transformation 并不會(huì)觸發(fā)計(jì)算:Transformation 方法所產(chǎn)生的 RDD 對(duì)象只會(huì)記錄住該 RDD 所依賴的 RDD 以及計(jì)算產(chǎn)生該 RDD 的數(shù)據(jù)的方式;只有在用戶進(jìn)行 Action 操作時(shí),Spark 才會(huì)調(diào)度 RDD 計(jì)算任務(wù),依次為各個(gè) RDD 計(jì)算數(shù)據(jù)。這就是 Spark RDD 內(nèi)函數(shù)的“懶加載”特性。

二、基于Spark RDD數(shù)據(jù)開發(fā)的不足

由于MapReduce的shuffle過程需寫磁盤,比較影響性能;而Spark利用RDD技術(shù),計(jì)算在內(nèi)存中流式進(jìn)行。另外 MapReduce計(jì)算框架(API)比較局限, 使用需要關(guān)注的參數(shù)眾多,而Spark則是中間結(jié)果自動(dòng)推斷,通過對(duì)數(shù)據(jù)集上鏈?zhǔn)綀?zhí)行函數(shù)具備一定的靈活性。

即使 SparkRDD 相對(duì)于 MapReduce 提高很大的便利性,但在使用上仍然有許多問題。體現(xiàn)在一下幾個(gè)方面:

  1. RDD 函數(shù)眾多,開發(fā)者不容易掌握,部分函數(shù)使用不當(dāng) shuffle時(shí)造成數(shù)據(jù)傾斜影響性能;
  2. RDD 關(guān)注點(diǎn)仍然是Spark太底層的 API,基于 Spark RDD的開發(fā)是基于特定語言(Scala,Python,Java)的函數(shù)開發(fā),無法以數(shù)據(jù)的視界來開發(fā)數(shù)據(jù);
  3. 對(duì) RDD 轉(zhuǎn)換算子函數(shù)內(nèi)部分常量、變量、廣播變量使用不當(dāng),會(huì)造成不可控的異常;
  4. 對(duì)多種數(shù)據(jù)開發(fā),需各自開發(fā)RDD的轉(zhuǎn)換,樣板代碼較多,無法有效重利用;
  5. 其它在運(yùn)行期可能發(fā)生的異常。如:對(duì)象無法序列化等運(yùn)行期才能發(fā)現(xiàn)的異常。

三、SparkSQL

Spark 從 1.3 版本開始原有 SchemaRDD 的基礎(chǔ)上提供了類似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開發(fā)者的學(xué)習(xí)門檻,同時(shí)還支持Scala、Java與Python三種語言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數(shù)據(jù)場(chǎng)景。 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

一般的數(shù)據(jù)處理步驟:讀入數(shù)據(jù) -> 對(duì)數(shù)據(jù)進(jìn)行處理 -> 分析結(jié)果 -> 寫入結(jié)果

SparkSQL 結(jié)構(gòu)化數(shù)據(jù)

  • 處理結(jié)構(gòu)化數(shù)據(jù)(如 CSV,JSON,Parquet 等);
  • 把已經(jīng)結(jié)構(gòu)化數(shù)據(jù)抽象成 DataFrame (HiveTable);
  • 非結(jié)構(gòu)化數(shù)據(jù)通過 RDD.map.filter 轉(zhuǎn)換成結(jié)構(gòu)化進(jìn)行處理;
  • 按照列式數(shù)據(jù)庫,只加載非結(jié)構(gòu)化中可結(jié)構(gòu)化的部分列(Hbase,MongoDB);

處理非結(jié)構(gòu)化數(shù)據(jù),不能簡(jiǎn)單的用 DataFrame 裝載。而是要用 SparkRDD 把數(shù)據(jù)讀入,在通過一系列的 Transformer Method 把非結(jié)構(gòu)化的數(shù)據(jù)加工為結(jié)構(gòu)化,或者過濾到不合法的數(shù)據(jù)。

SparkSQL DataFrame 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。如果熟悉 Python Pandas 庫中的 DataFrame 結(jié)構(gòu),則會(huì)對(duì) SparkSQL DataFrame 概念非常熟悉。

TextFile DataFrame

  1. import.org.apache.spark.sql._ 
  2. //定義數(shù)據(jù)的列名稱和類型 
  3. valdt=StructType(List(id:String,name:String,gender:String,age:Int)) 
  4. ​ 
  5. //導(dǎo)入user_info.csv文件并指定分隔符 
  6. vallines = sc.textFile("/path/user_info.csv").map(_.split(",")) 
  7. ​ 
  8. //將表結(jié)構(gòu)和數(shù)據(jù)關(guān)聯(lián)起來,把讀入的數(shù)據(jù)user.csv映射成行,構(gòu)成數(shù)據(jù)集 
  9. valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt)) 
  10. ​ 
  11. //通過SparkSession.createDataFrame()創(chuàng)建表,并且數(shù)據(jù)表表頭 
  12. val df= spark.createDataFrame(rowRDD, dt) 

讀取規(guī)則數(shù)據(jù)文件作為DataFrame

  1. SparkSession.Builder builder = SparkSession.builder() 
  2. Builder.setMaster("local").setAppName("TestSparkSQLApp"
  3. SparkSession spark = builder.getOrCreate(); 
  4. SQLContext sqlContext = spark.sqlContext(); 
  5. ​ 
  6. # 讀取 JSON 數(shù)據(jù),path 可為文件或者目錄 
  7. valdf=sqlContext.read().json(path); 
  8. ​ 
  9. # 讀取 HadoopParquet 文件 
  10. vardf=sqlContext.read().parquet(path); 
  11. ​ 
  12. # 讀取 HadoopORC 文件 
  13. vardf=sqlContext.read().orc(path); 

JSON 文件為每行一個(gè) JSON 對(duì)象的文件類型,行尾無須逗號(hào)。文件頭也無須[]指定為數(shù)組;SparkSQL 讀取是只是按照每行一條 JSON Record序列化;

Parquet文件

  1. Configurationconfig = new Configuration(); 
  2. ParquetFileReaderreader = ParquetFileReader.open
  3.  HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf)); 
  4. Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData(); 
  5. String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata"); 

allFiedls 的值就是各字段的名稱和具體的類型,整體是一個(gè)json格式進(jìn)行展示。

讀取 Hive 表作為 DataFrame

Spark2 API 推薦通過 SparkSession.Builder 的 Builder 模式創(chuàng)建 SparkContext。 Builder.getOrCreate() 用于創(chuàng)建 SparkSession,SparkSession 是 SparkContext 的封裝。

在Spark1.6中有兩個(gè)核心組件SQLcontext和HiveContext。SQLContext 用于處理在 SparkSQL 中動(dòng)態(tài)注冊(cè)的表,HiveContext 用于處理 Hive 中的表。

從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執(zhí)行 Hive 中的表,也可執(zhí)行內(nèi)部注冊(cè)的表;

在需要執(zhí)行 Hive 表時(shí),只需要在 SparkSession.Builder 中開啟 Hive 支持即可(enableHiveSupport())。

  1. SparkSession.Builder builder = SparkSession.builder().enableHiveSupport(); 
  2. SparkSession spark = builder.getOrCreate(); 
  3. SQLContext sqlContext = spark.sqlContext(); 

// db 指 Hive 庫中的數(shù)據(jù)庫名,如果不寫默認(rèn)為 default

// tableName 指 hive 庫的數(shù)據(jù)表名

  1. sqlContext.sql(“select * from db.tableName”) 

SparkSQL ThriftServer

//首先打開 Hive 的 Metastore服務(wù)

  1. hive$bin/hive –-service metastore –p 8093 

//把 Spark 的相關(guān) jar 上傳到hadoophdfs指定目錄,用于指定sparkonyarn的依賴 jar

  1. spark$hadoop fs –put jars/*.jar /lib/spark2 

// 啟動(dòng) spark thriftserver 服務(wù)

  1. spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf  
  2. spark.yarn.jars=hdfs:///lib/spark2/*.jar 

當(dāng)hdfs 上傳了spark 依賴 jar 時(shí),通過spark.yarn.jars 可看到日志 spark 無須每個(gè)job 都上傳jar,可節(jié)省啟動(dòng)時(shí)間

  1. 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar 
  2. 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar 

//通過 spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)

  1. bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop 
  • -u 是指定 beeline 的執(zhí)行驅(qū)動(dòng)地址;
  • -n 是指定登陸到 spark Session 上的用戶名稱;

Beeline 還支持傳入-e 可傳入一行 SQL,

  • -e query that should be executed

也可通過 –f 指定一個(gè) SQL File,內(nèi)部可用逗號(hào)分隔的多個(gè) SQL(存儲(chǔ)過程)

  • -f script file that should be executed

SparkSQL Beeline 的執(zhí)行效果展示 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

SparkSQL ThriftServer 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

對(duì)于 SparkSQL ThriftServer 服務(wù),每個(gè)登陸的用戶都有創(chuàng)建的 SparkSession,并且執(zhí)行的對(duì)個(gè) SQL 會(huì)通過時(shí)間順序列表展示。

SparkSQL ThriftServer 服務(wù)可用于其他支持的數(shù)據(jù)庫工具創(chuàng)建查詢,也用于第三方的 BI 工具,如 tableau。

四、SparkSQL Flow

SparkSQL Flow 是以 SparkSQL 為基礎(chǔ),開發(fā)的統(tǒng)一的基于 XML 配置化的可執(zhí)行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個(gè) Flow。下文開始 SparkSQL Flow 的介紹:

SparkSQL Flow 是基于 SparkSQL 開發(fā)的一種基于 XML 配置化的 SQL 數(shù)據(jù)流轉(zhuǎn)處理模型。該模型簡(jiǎn)化了 SparkSQL 、Spark RDD的開發(fā),并且降低開發(fā)了難度,適合了解數(shù)據(jù)業(yè)務(wù)但無法駕馭大數(shù)據(jù)以及 Spark 技術(shù)的開發(fā)者。

  • 一個(gè)由普元技術(shù)部提供的基于 SparkSQL 的開發(fā)模型;
  • 一個(gè)可二次定制開發(fā)的大數(shù)據(jù)開發(fā)框架,提供了靈活的可擴(kuò)展 API;
  • 一個(gè)提供了 對(duì)文件,數(shù)據(jù)庫,NoSQL 等統(tǒng)一的數(shù)據(jù)開發(fā)視界語義;
  • 基于 SQL 的開發(fā)語言和 XML 的模板配置,支持 Spark UDF 的擴(kuò)展管理;
  • 支持基于 Spark Standlone,Yarn,Mesos 資源管理平臺(tái);
  • 支持開源、華為、星環(huán)等平臺(tái)統(tǒng)一認(rèn)證。

SparkSQL Flow 適合的場(chǎng)景:

  1. 批量 ETL;
  2. 非實(shí)時(shí)分析服務(wù);

SparkSQL Flow XML 概覽 

基于 Spark 的數(shù)據(jù)分析實(shí)踐
  1. Properties 內(nèi)定義一組變量,可用于宏替換;
  2. Methods 內(nèi)可注冊(cè) udf 和 udaf 兩種函數(shù);
  3. Prepare 內(nèi)可定義前置 SQL,用于執(zhí)行 source 前的 sql 操作;
  4. Sources 內(nèi)定義一個(gè)到多個(gè)數(shù)據(jù)表視圖;
  5. Transformer 內(nèi)可定義 0 到多個(gè)基于 SQL 的數(shù)據(jù)轉(zhuǎn)換操作(支持 join);
  6. Targets 用于定義 1 到多個(gè)數(shù)據(jù)輸出;
  7. After 可定義 0到多個(gè)任務(wù)日志;

如你所見,source 的 type 參數(shù)用于區(qū)分 source 的類型,source 支持的種類直接決定SparkSQL Flow 的數(shù)據(jù)源加載廣度;并且,根據(jù) type 不同,source 也需要配置不同的參數(shù),如數(shù)據(jù)庫還需要 driver,url,user和 password 參數(shù)。

Transformer 是基于 source 定的數(shù)據(jù)視圖可執(zhí)行的一組轉(zhuǎn)換 SQL,該 SQL 符合 SparkSQL 的語法(SQL99)。Transform 的 SQL 的執(zhí)行結(jié)果被作為中間表命名為 table_name 指定的值。

Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。

SparkSQL Flow 支持的Sourse 

基于 Spark 的數(shù)據(jù)分析實(shí)踐
  • 支持從 Hive 獲得數(shù)據(jù);
  • 支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile
  • 支持RDBMS數(shù)據(jù)庫:PostgreSQL, MySQL,Oracle
  • 支持 NOSQL 數(shù)據(jù)庫:Hbase,MongoDB

SparkSQL Flow TextFile Source

textfile 為讀取文本文件,把文本文件每行按照 delimiter 指定的字符進(jìn)行切分,切分不夠的列使用 null 填充。

  1. <source type="textfile" table_name="et_rel_pty_cong" 
  2.  fields="cust_id,name1,gender1,age1:int"  
  3.  delimiter="," 
  4.  path="file:///Users/zhenqin/software/hive/user.txt"/> 
  1. Tablename 為該文件映射的數(shù)據(jù)表名,可理解為數(shù)據(jù)的視圖;
  2. Fields 為切分后的字段,使用逗號(hào)分隔,字段后可緊跟該字段的類型,使用冒號(hào)分隔;
  3. Delimiter 為每行的分隔符;
  4. Path 用于指定文件地址,可以是文件,也可是文件夾;
  5. Path 指定地址需要使用協(xié)議,如:file:// 、 hdfs://,否則跟 core-site.xml 配置密切相關(guān);

SparkSQL Flow DB Source

  1. <source type="mysql" table_name="et_rel_pty_cong" 
  2.  table="user" 
  3.  url="jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8" 
  4.  driver="com.mysql.jdbc.Driver" 
  5.  user="root" password="123456"/> 

RDBMS 是從數(shù)據(jù)庫使用 JDBC讀取 數(shù)據(jù)集。支持 type 為:db、mysql、oracle、postgres、mssql;

  1. tablename 為該數(shù)據(jù)表的抽象 table 名稱(視圖);
  2. url、driver、user,password 為數(shù)據(jù)庫 JDBC 驅(qū)動(dòng)信息,為必須字段;
  3. SparkSQL 會(huì)加載該表的全表數(shù)據(jù),無法使用 where 條件。

SparkSQL Flow Transformer

  1. <transform type="sql" table_name="cust_id_agmt_id_t" cached="true"
  2.  SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids 
  3.  FROM user_concat_testx 
  4.  group by c_phone,c_type,c_num 
  5. </transform> 

Transform 支持 cached 屬性,默認(rèn)為 false;如果設(shè)置為 true,相當(dāng)于把該結(jié)果緩存到內(nèi)存中,緩存到內(nèi)存中的數(shù)據(jù)在后續(xù)其它 Transform 中使用能提高計(jì)算效率。但是需使用大量?jī)?nèi)存,開發(fā)者需要評(píng)估該數(shù)據(jù)集能否放到內(nèi)存中,防止出現(xiàn) OutofMemory 的異常。

SparkSQL Flow Targets

SparkSQL Flow Targets 支持輸出數(shù)據(jù)到一個(gè)或者多個(gè)目標(biāo)。這些目標(biāo),基本覆蓋了 Source 包含的外部系統(tǒng)。下面以 Hive 舉例說明:

  1. <target type="hive" 
  2.  table_name="cust_id_agmt_id_t"  
  3.  savemode=”append” 
  4. target_table_name="cust_id_agmt_id_h"/> 
  1. table_name 為 source 或者 Transform 定義的表名稱;
  2. target_table_name 為 hive 中的表結(jié)果,Hive 表可不存在也可存在,sparksql 會(huì)根據(jù) DataFrame 的數(shù)據(jù)類型自動(dòng)創(chuàng)建表;
  3. savemode 默認(rèn)為 overwrite 覆蓋寫入,當(dāng)寫入目標(biāo)已存在時(shí)刪除源表再寫入;支持 append 模式, 可增量寫入。

Target 有一個(gè)特殊的 show 類型的 target。用于直接在控制臺(tái)輸出一個(gè) DataFrame 的結(jié)果到控制臺(tái)(print),該 target 用于開發(fā)和測(cè)試。

  1. <target type="show" table_name="cust_id_agmt_id_t" rows=”10000”/> 

Rows 用于控制輸出多少行數(shù)據(jù)。

SparkSQL Around

After 用于 Flow 在運(yùn)行結(jié)束后執(zhí)行的一個(gè)環(huán)繞,用于記錄日志和寫入狀態(tài)。類似 Java 的 try {} finally{ round.execute() }

多個(gè) round 一定會(huì)執(zhí)行,round 異常不會(huì)導(dǎo)致任務(wù)失敗。

  1. <prepare
  2.  <round type="mysql" 
  3.  sql="insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at) 
  4.  values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())" 
  5.  url="${jdbc.url}" .../> 
  6. </prepare
  7. <after
  8.  <round type="mysql" 
  9.  sql="update cpic_task_history set 
  10.  end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}" 
  11.  url="${jdbc.url}”…/> 
  12. </after

Prepare round 和 after round 配合使用可用于記錄 SparkSQL Flow 任務(wù)的運(yùn)行日志。

SparkSQL Around的執(zhí)行效果 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

Prepare round 可做插入(insert)動(dòng)作,after round 可做更新 (update)動(dòng)作,相當(dāng)于在數(shù)據(jù)庫表中從執(zhí)行開始到結(jié)束有了完整的日志記錄。SparkSQL Flow 會(huì)保證round 一定能被執(zhí)行,而且 round 的執(zhí)行不影響任務(wù)的狀態(tài)。

SparkSQL Flow 提交

  1. bin/spark-submit --master yarn-client --driver-memory 1G  
  2. --num-executors 10 --executor-memory 2G  
  3. --jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar  
  4. --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar  
  5. --queue default --name FlowTest  
  6. etl-flow-0.2.0.jar -f hive-flow-test.xml 

 基于 Spark 的數(shù)據(jù)分析實(shí)踐

接收必須的參數(shù) –f,可選的參數(shù)為支持 Kerberos 認(rèn)證的租戶名稱principal,和其認(rèn)證需要的密鑰文件。

  1. usage: spark-submit --jars etl-flow.jar --class 
  2.  com.yiidata.etl.flow.source.FlowRunner 
  3.  -f,--xml-file <arg> Flow XML File Path 
  4.  --keytabFile <arg> keytab File Path(Huawei) 
  5.  --krb5File <arg> krb5 File Path(Huawei) 
  6.  --principal <arg> principal for hadoop(Huawei) 

SparkSQL Execution Plan 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

每個(gè)Spark Flow 任務(wù)本質(zhì)上是一連串的 SparkSQL 操作,在 SparkUI SQL tab 里可以看到 flow 中重要的數(shù)據(jù)表操作。

regiserDataFrameAsTable 是每個(gè) source 和 Transform 的數(shù)據(jù)在 SparkSQL 中的數(shù)據(jù)視圖,每個(gè)視圖都會(huì)在 SparkContex 中注冊(cè)一次。

對(duì)RegisterDataFrameAsTable的分析 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

通過單個(gè) regiserDataFrameAsTable 項(xiàng)進(jìn)行分析,SparkSQL 并不是把source 的數(shù)據(jù)立即計(jì)算把數(shù)據(jù)放到內(nèi)存,而是每次執(zhí)行 source 時(shí)只是生成了一個(gè) Logical Plan,只有遇到需要提交的算子(Action),SparkSQL 才會(huì)觸發(fā)前面所依賴的的 plan 執(zhí)行。

總結(jié)

這是一個(gè)開發(fā)框架,不是一個(gè)成熟的產(chǎn)品,也不是一種架構(gòu)。他只是基于 SparkSQL 整合了大多數(shù)的外部系統(tǒng),能通過 XML 的模板配置完成數(shù)據(jù)開發(fā)。面向的是理解數(shù)據(jù)業(yè)務(wù)但不了解 Spark 的數(shù)據(jù)開發(fā)人員。整個(gè)框架完成了大多數(shù)的外部系統(tǒng)對(duì)接,開發(fā)者只需要使用 type 獲得數(shù)據(jù),完成數(shù)據(jù)開發(fā)后通過 target 回寫到目標(biāo)系統(tǒng)中。整個(gè)過程基本無須程序開發(fā),除非當(dāng)前的 SQL 函數(shù)無法滿足使用的情況下,需要自行開發(fā)一下特定的 UDF。因此本框架在對(duì) SparkSQL 做了二次開發(fā)基礎(chǔ)上,大大簡(jiǎn)化了 Spark 的開發(fā),可降低了開發(fā)者使用難度。

關(guān)于作者:震秦,普元資深開發(fā)工程師,專注于大數(shù)據(jù)開發(fā) 8 年,擅長(zhǎng) Hadoop 生態(tài)內(nèi)各工具的使用和優(yōu)化。參與某公關(guān)廣告(上市)公司DMP 建設(shè),負(fù)責(zé)數(shù)據(jù)分層設(shè)計(jì)和批處理,調(diào)度實(shí)現(xiàn),完成交付使用;參與國(guó)內(nèi)多省市公安社交網(wǎng)絡(luò)項(xiàng)目部署,負(fù)責(zé)產(chǎn)品開發(fā)(Spark 分析應(yīng)用);參與數(shù)據(jù)清洗加工為我方主題庫并部署上層應(yīng)用。

關(guān)于EAWorld:微服務(wù),DevOps,數(shù)據(jù)治理,移動(dòng)架構(gòu)原創(chuàng)技術(shù)分享。

責(zé)任編輯:未麗燕 來源: 今日頭條
相關(guān)推薦

2015-10-12 17:40:12

數(shù)據(jù)分析實(shí)踐

2024-11-01 08:16:54

2018-06-15 20:44:40

Hadoop數(shù)據(jù)分析數(shù)據(jù)

2020-10-21 10:51:43

數(shù)據(jù)分析

2015-09-23 09:24:56

spark數(shù)據(jù)分析

2021-01-25 20:20:35

數(shù)據(jù)分析SparkHadoop

2014-06-30 10:59:21

2016-12-01 19:07:46

大數(shù)據(jù)數(shù)據(jù)分析

2015-10-16 09:21:13

SparkMySQL數(shù)據(jù)分析

2023-03-01 18:32:16

系統(tǒng)監(jiān)控數(shù)據(jù)

2018-02-26 08:44:35

Python微信數(shù)據(jù)分析

2012-03-21 09:31:51

ibmdw

2017-01-04 10:29:37

Spark運(yùn)維技術(shù)

2017-10-11 11:10:02

Spark Strea大數(shù)據(jù)流式處理

2016-10-19 18:31:11

2021-06-06 19:03:25

SQL大數(shù)據(jù)Spark

2024-03-19 09:24:00

大數(shù)據(jù)數(shù)據(jù)分析性能優(yōu)化

2013-04-27 10:52:09

大數(shù)據(jù)全球技術(shù)峰會(huì)

2016-12-07 15:40:42

谷歌數(shù)據(jù)分析Airbnb

2015-07-01 13:51:12

HadoopMapReduce數(shù)據(jù)分析
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)