偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

基于Spark的數(shù)據(jù)分析實(shí)踐

大數(shù)據(jù) Spark
本文主要分析了 Spark RDD 以及 RDD 作為開發(fā)的不足之處,介紹了 SparkSQL 對(duì)已有的常見數(shù)據(jù)系統(tǒng)的操作方法,以及重點(diǎn)介紹了普元在眾多數(shù)據(jù)開發(fā)項(xiàng)目中總結(jié)的基于 SparkSQL Flow 開發(fā)框架。

引言:

Spark是在借鑒了MapReduce之上發(fā)展而來的,繼承了其分布式并行計(jì)算的優(yōu)點(diǎn)并改進(jìn)了MapReduce明顯的缺陷。Spark主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等組件。

本文主要分析了 Spark RDD 以及 RDD 作為開發(fā)的不足之處,介紹了 SparkSQL 對(duì)已有的常見數(shù)據(jù)系統(tǒng)的操作方法,以及重點(diǎn)介紹了普元在眾多數(shù)據(jù)開發(fā)項(xiàng)目中總結(jié)的基于 SparkSQL Flow 開發(fā)框架。

目錄:

  1. Spark RDD
  2. 基于Spark RDD數(shù)據(jù)開發(fā)的不足
  3. SparkSQL
  4. SparkSQL Flow

一、Spark RDD

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、元素可并行計(jì)算的集合。

RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。

//Scala 在內(nèi)存中使用列表創(chuàng)建

  1. val lines = List(“A”, “B”, “C”, “D” …) 
  2. val rdd:RDD = sc.parallelize(lines); 

//以文本文件創(chuàng)建

  1. val rdd:RDD[String] = sc.textFile(“hdfs://path/filename”) 

Spark RDD Partition 分區(qū)劃分 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

新版本的 Hadoop 已經(jīng)把 BlockSize 改為 128M,也就是說每個(gè)分區(qū)處理的數(shù)據(jù)量更大。

Spark 讀取文件分區(qū)的核心原理

本質(zhì)上,Spark 是利用了 Hadoop 的底層對(duì)數(shù)據(jù)進(jìn)行分區(qū)的 API(InputFormat):

  1. public abstract class InputFormat<K,V>{ 
  2.  public abstract List<InputSplit> getSplits(JobContextcontext 
  3.  ) throwsIOException,InterruptedException; 
  4.   
  5.  public abstract RecordReader<K,V> createRecordReader(InputSplitsplit, 
  6.  TaskAttemptContextcontext 
  7.  )throwsIOException,InterruptedException; 

Spark 任務(wù)提交后通過對(duì)輸入進(jìn)行 Split,在 RDD 構(gòu)造階段,只是判斷是否可 Split(如果參數(shù)異常一定在此階段報(bào)出異常),并且 Split 后每個(gè) InputSplit 都是一個(gè)分區(qū)。只有在Action 算子提交后,才真正用 getSplits 返回的 InputSplit 通過 createRecordReader 獲得每個(gè) Partition 的連接。

然后通過 RecordReader 的 next() 遍歷分區(qū)內(nèi)的數(shù)據(jù)。

Spark RDD 轉(zhuǎn)換函數(shù)和提交函數(shù) 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

Spark RDD 的眾多函數(shù)可分為兩大類Transformation 與 Action。Transformation 與 Action 的區(qū)別在于,對(duì) RDD 進(jìn)行 Transformation 并不會(huì)觸發(fā)計(jì)算:Transformation 方法所產(chǎn)生的 RDD 對(duì)象只會(huì)記錄住該 RDD 所依賴的 RDD 以及計(jì)算產(chǎn)生該 RDD 的數(shù)據(jù)的方式;只有在用戶進(jìn)行 Action 操作時(shí),Spark 才會(huì)調(diào)度 RDD 計(jì)算任務(wù),依次為各個(gè) RDD 計(jì)算數(shù)據(jù)。這就是 Spark RDD 內(nèi)函數(shù)的“懶加載”特性。

二、基于Spark RDD數(shù)據(jù)開發(fā)的不足

由于MapReduce的shuffle過程需寫磁盤,比較影響性能;而Spark利用RDD技術(shù),計(jì)算在內(nèi)存中流式進(jìn)行。另外 MapReduce計(jì)算框架(API)比較局限, 使用需要關(guān)注的參數(shù)眾多,而Spark則是中間結(jié)果自動(dòng)推斷,通過對(duì)數(shù)據(jù)集上鏈?zhǔn)綀?zhí)行函數(shù)具備一定的靈活性。

即使 SparkRDD 相對(duì)于 MapReduce 提高很大的便利性,但在使用上仍然有許多問題。體現(xiàn)在一下幾個(gè)方面:

  1. RDD 函數(shù)眾多,開發(fā)者不容易掌握,部分函數(shù)使用不當(dāng) shuffle時(shí)造成數(shù)據(jù)傾斜影響性能;
  2. RDD 關(guān)注點(diǎn)仍然是Spark太底層的 API,基于 Spark RDD的開發(fā)是基于特定語言(Scala,Python,Java)的函數(shù)開發(fā),無法以數(shù)據(jù)的視界來開發(fā)數(shù)據(jù);
  3. 對(duì) RDD 轉(zhuǎn)換算子函數(shù)內(nèi)部分常量、變量、廣播變量使用不當(dāng),會(huì)造成不可控的異常;
  4. 對(duì)多種數(shù)據(jù)開發(fā),需各自開發(fā)RDD的轉(zhuǎn)換,樣板代碼較多,無法有效重利用;
  5. 其它在運(yùn)行期可能發(fā)生的異常。如:對(duì)象無法序列化等運(yùn)行期才能發(fā)現(xiàn)的異常。

三、SparkSQL

Spark 從 1.3 版本開始原有 SchemaRDD 的基礎(chǔ)上提供了類似Pandas DataFrame API。新的DataFrame API不僅可以大幅度降低普通開發(fā)者的學(xué)習(xí)門檻,同時(shí)還支持Scala、Java與Python三種語言。更重要的是,由于脫胎自SchemaRDD,DataFrame天然適用于分布式大數(shù)據(jù)場(chǎng)景。 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

一般的數(shù)據(jù)處理步驟:讀入數(shù)據(jù) -> 對(duì)數(shù)據(jù)進(jìn)行處理 -> 分析結(jié)果 -> 寫入結(jié)果

SparkSQL 結(jié)構(gòu)化數(shù)據(jù)

  • 處理結(jié)構(gòu)化數(shù)據(jù)(如 CSV,JSON,Parquet 等);
  • 把已經(jīng)結(jié)構(gòu)化數(shù)據(jù)抽象成 DataFrame (HiveTable);
  • 非結(jié)構(gòu)化數(shù)據(jù)通過 RDD.map.filter 轉(zhuǎn)換成結(jié)構(gòu)化進(jìn)行處理;
  • 按照列式數(shù)據(jù)庫(kù),只加載非結(jié)構(gòu)化中可結(jié)構(gòu)化的部分列(Hbase,MongoDB);

處理非結(jié)構(gòu)化數(shù)據(jù),不能簡(jiǎn)單的用 DataFrame 裝載。而是要用 SparkRDD 把數(shù)據(jù)讀入,在通過一系列的 Transformer Method 把非結(jié)構(gòu)化的數(shù)據(jù)加工為結(jié)構(gòu)化,或者過濾到不合法的數(shù)據(jù)。

SparkSQL DataFrame 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

SparkSQL 中一切都是 DataFrame,all in DataFrame. DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。如果熟悉 Python Pandas 庫(kù)中的 DataFrame 結(jié)構(gòu),則會(huì)對(duì) SparkSQL DataFrame 概念非常熟悉。

TextFile DataFrame

  1. import.org.apache.spark.sql._ 
  2. //定義數(shù)據(jù)的列名稱和類型 
  3. valdt=StructType(List(id:String,name:String,gender:String,age:Int)) 
  4. ​ 
  5. //導(dǎo)入user_info.csv文件并指定分隔符 
  6. vallines = sc.textFile("/path/user_info.csv").map(_.split(",")) 
  7. ​ 
  8. //將表結(jié)構(gòu)和數(shù)據(jù)關(guān)聯(lián)起來,把讀入的數(shù)據(jù)user.csv映射成行,構(gòu)成數(shù)據(jù)集 
  9. valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt)) 
  10. ​ 
  11. //通過SparkSession.createDataFrame()創(chuàng)建表,并且數(shù)據(jù)表表頭 
  12. val df= spark.createDataFrame(rowRDD, dt) 

讀取規(guī)則數(shù)據(jù)文件作為DataFrame

  1. SparkSession.Builder builder = SparkSession.builder() 
  2. Builder.setMaster("local").setAppName("TestSparkSQLApp"
  3. SparkSession spark = builder.getOrCreate(); 
  4. SQLContext sqlContext = spark.sqlContext(); 
  5. ​ 
  6. # 讀取 JSON 數(shù)據(jù),path 可為文件或者目錄 
  7. valdf=sqlContext.read().json(path); 
  8. ​ 
  9. # 讀取 HadoopParquet 文件 
  10. vardf=sqlContext.read().parquet(path); 
  11. ​ 
  12. # 讀取 HadoopORC 文件 
  13. vardf=sqlContext.read().orc(path); 

JSON 文件為每行一個(gè) JSON 對(duì)象的文件類型,行尾無須逗號(hào)。文件頭也無須[]指定為數(shù)組;SparkSQL 讀取是只是按照每行一條 JSON Record序列化;

Parquet文件

  1. Configurationconfig = new Configuration(); 
  2. ParquetFileReaderreader = ParquetFileReader.open
  3.  HadoopInputFile.fromPath(new Path("hdfs:///path/file.parquet"),conf)); 
  4. Map<String, String>schema = reader.getFileMetaData().getKeyValueMetaData(); 
  5. String allFields= schema.get("org.apache.spark.sql.parquet.row.metadata"); 

allFiedls 的值就是各字段的名稱和具體的類型,整體是一個(gè)json格式進(jìn)行展示。

讀取 Hive 表作為 DataFrame

Spark2 API 推薦通過 SparkSession.Builder 的 Builder 模式創(chuàng)建 SparkContext。 Builder.getOrCreate() 用于創(chuàng)建 SparkSession,SparkSession 是 SparkContext 的封裝。

在Spark1.6中有兩個(gè)核心組件SQLcontext和HiveContext。SQLContext 用于處理在 SparkSQL 中動(dòng)態(tài)注冊(cè)的表,HiveContext 用于處理 Hive 中的表。

從Spark2.0以上的版本開始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可執(zhí)行 Hive 中的表,也可執(zhí)行內(nèi)部注冊(cè)的表;

在需要執(zhí)行 Hive 表時(shí),只需要在 SparkSession.Builder 中開啟 Hive 支持即可(enableHiveSupport())。

  1. SparkSession.Builder builder = SparkSession.builder().enableHiveSupport(); 
  2. SparkSession spark = builder.getOrCreate(); 
  3. SQLContext sqlContext = spark.sqlContext(); 

// db 指 Hive 庫(kù)中的數(shù)據(jù)庫(kù)名,如果不寫默認(rèn)為 default

// tableName 指 hive 庫(kù)的數(shù)據(jù)表名

  1. sqlContext.sql(“select * from db.tableName”) 

SparkSQL ThriftServer

//首先打開 Hive 的 Metastore服務(wù)

  1. hive$bin/hive –-service metastore –p 8093 

//把 Spark 的相關(guān) jar 上傳到hadoophdfs指定目錄,用于指定sparkonyarn的依賴 jar

  1. spark$hadoop fs –put jars/*.jar /lib/spark2 

// 啟動(dòng) spark thriftserver 服務(wù)

  1. spark$ sbin/start-thriftserver.sh --master yarn-client --driver-memory 1G --conf  
  2. spark.yarn.jars=hdfs:///lib/spark2/*.jar 

當(dāng)hdfs 上傳了spark 依賴 jar 時(shí),通過spark.yarn.jars 可看到日志 spark 無須每個(gè)job 都上傳jar,可節(jié)省啟動(dòng)時(shí)間

  1. 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar 
  2. 19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar 

//通過 spark bin 下的 beeline 工具,可以連接到 spark ThriftServer(SparkOnHive)

  1. bin/beeline -u jdbc:hive2://ip:10000/default -n hadoop 
  • -u 是指定 beeline 的執(zhí)行驅(qū)動(dòng)地址;
  • -n 是指定登陸到 spark Session 上的用戶名稱;

Beeline 還支持傳入-e 可傳入一行 SQL,

  • -e query that should be executed

也可通過 –f 指定一個(gè) SQL File,內(nèi)部可用逗號(hào)分隔的多個(gè) SQL(存儲(chǔ)過程)

  • -f script file that should be executed

SparkSQL Beeline 的執(zhí)行效果展示 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

SparkSQL ThriftServer 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

對(duì)于 SparkSQL ThriftServer 服務(wù),每個(gè)登陸的用戶都有創(chuàng)建的 SparkSession,并且執(zhí)行的對(duì)個(gè) SQL 會(huì)通過時(shí)間順序列表展示。

SparkSQL ThriftServer 服務(wù)可用于其他支持的數(shù)據(jù)庫(kù)工具創(chuàng)建查詢,也用于第三方的 BI 工具,如 tableau。

四、SparkSQL Flow

SparkSQL Flow 是以 SparkSQL 為基礎(chǔ),開發(fā)的統(tǒng)一的基于 XML 配置化的可執(zhí)行一連串的 SQL 操作,這一連串的 SQL 操作定義為一個(gè) Flow。下文開始 SparkSQL Flow 的介紹:

SparkSQL Flow 是基于 SparkSQL 開發(fā)的一種基于 XML 配置化的 SQL 數(shù)據(jù)流轉(zhuǎn)處理模型。該模型簡(jiǎn)化了 SparkSQL 、Spark RDD的開發(fā),并且降低開發(fā)了難度,適合了解數(shù)據(jù)業(yè)務(wù)但無法駕馭大數(shù)據(jù)以及 Spark 技術(shù)的開發(fā)者。

  • 一個(gè)由普元技術(shù)部提供的基于 SparkSQL 的開發(fā)模型;
  • 一個(gè)可二次定制開發(fā)的大數(shù)據(jù)開發(fā)框架,提供了靈活的可擴(kuò)展 API;
  • 一個(gè)提供了 對(duì)文件,數(shù)據(jù)庫(kù),NoSQL 等統(tǒng)一的數(shù)據(jù)開發(fā)視界語義;
  • 基于 SQL 的開發(fā)語言和 XML 的模板配置,支持 Spark UDF 的擴(kuò)展管理;
  • 支持基于 Spark Standlone,Yarn,Mesos 資源管理平臺(tái);
  • 支持開源、華為、星環(huán)等平臺(tái)統(tǒng)一認(rèn)證。

SparkSQL Flow 適合的場(chǎng)景:

  1. 批量 ETL;
  2. 非實(shí)時(shí)分析服務(wù);

SparkSQL Flow XML 概覽 

基于 Spark 的數(shù)據(jù)分析實(shí)踐
  1. Properties 內(nèi)定義一組變量,可用于宏替換;
  2. Methods 內(nèi)可注冊(cè) udf 和 udaf 兩種函數(shù);
  3. Prepare 內(nèi)可定義前置 SQL,用于執(zhí)行 source 前的 sql 操作;
  4. Sources 內(nèi)定義一個(gè)到多個(gè)數(shù)據(jù)表視圖;
  5. Transformer 內(nèi)可定義 0 到多個(gè)基于 SQL 的數(shù)據(jù)轉(zhuǎn)換操作(支持 join);
  6. Targets 用于定義 1 到多個(gè)數(shù)據(jù)輸出;
  7. After 可定義 0到多個(gè)任務(wù)日志;

如你所見,source 的 type 參數(shù)用于區(qū)分 source 的類型,source 支持的種類直接決定SparkSQL Flow 的數(shù)據(jù)源加載廣度;并且,根據(jù) type 不同,source 也需要配置不同的參數(shù),如數(shù)據(jù)庫(kù)還需要 driver,url,user和 password 參數(shù)。

Transformer 是基于 source 定的數(shù)據(jù)視圖可執(zhí)行的一組轉(zhuǎn)換 SQL,該 SQL 符合 SparkSQL 的語法(SQL99)。Transform 的 SQL 的執(zhí)行結(jié)果被作為中間表命名為 table_name 指定的值。

Targets 為定義輸出,table_name 的值需在 source 或者 Transformer 中定義。

SparkSQL Flow 支持的Sourse 

基于 Spark 的數(shù)據(jù)分析實(shí)踐
  • 支持從 Hive 獲得數(shù)據(jù);
  • 支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile
  • 支持RDBMS數(shù)據(jù)庫(kù):PostgreSQL, MySQL,Oracle
  • 支持 NOSQL 數(shù)據(jù)庫(kù):Hbase,MongoDB

SparkSQL Flow TextFile Source

textfile 為讀取文本文件,把文本文件每行按照 delimiter 指定的字符進(jìn)行切分,切分不夠的列使用 null 填充。

  1. <source type="textfile" table_name="et_rel_pty_cong" 
  2.  fields="cust_id,name1,gender1,age1:int"  
  3.  delimiter="," 
  4.  path="file:///Users/zhenqin/software/hive/user.txt"/> 
  1. Tablename 為該文件映射的數(shù)據(jù)表名,可理解為數(shù)據(jù)的視圖;
  2. Fields 為切分后的字段,使用逗號(hào)分隔,字段后可緊跟該字段的類型,使用冒號(hào)分隔;
  3. Delimiter 為每行的分隔符;
  4. Path 用于指定文件地址,可以是文件,也可是文件夾;
  5. Path 指定地址需要使用協(xié)議,如:file:// 、 hdfs://,否則跟 core-site.xml 配置密切相關(guān);

SparkSQL Flow DB Source

  1. <source type="mysql" table_name="et_rel_pty_cong" 
  2.  table="user" 
  3.  url="jdbc:mysql://localhost:3306/tdb?characterEncoding=UTF-8" 
  4.  driver="com.mysql.jdbc.Driver" 
  5.  user="root" password="123456"/> 

RDBMS 是從數(shù)據(jù)庫(kù)使用 JDBC讀取 數(shù)據(jù)集。支持 type 為:db、mysql、oracle、postgres、mssql;

  1. tablename 為該數(shù)據(jù)表的抽象 table 名稱(視圖);
  2. url、driver、user,password 為數(shù)據(jù)庫(kù) JDBC 驅(qū)動(dòng)信息,為必須字段;
  3. SparkSQL 會(huì)加載該表的全表數(shù)據(jù),無法使用 where 條件。

SparkSQL Flow Transformer

  1. <transform type="sql" table_name="cust_id_agmt_id_t" cached="true"
  2.  SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids 
  3.  FROM user_concat_testx 
  4.  group by c_phone,c_type,c_num 
  5. </transform> 

Transform 支持 cached 屬性,默認(rèn)為 false;如果設(shè)置為 true,相當(dāng)于把該結(jié)果緩存到內(nèi)存中,緩存到內(nèi)存中的數(shù)據(jù)在后續(xù)其它 Transform 中使用能提高計(jì)算效率。但是需使用大量?jī)?nèi)存,開發(fā)者需要評(píng)估該數(shù)據(jù)集能否放到內(nèi)存中,防止出現(xiàn) OutofMemory 的異常。

SparkSQL Flow Targets

SparkSQL Flow Targets 支持輸出數(shù)據(jù)到一個(gè)或者多個(gè)目標(biāo)。這些目標(biāo),基本覆蓋了 Source 包含的外部系統(tǒng)。下面以 Hive 舉例說明:

  1. <target type="hive" 
  2.  table_name="cust_id_agmt_id_t"  
  3.  savemode=”append” 
  4. target_table_name="cust_id_agmt_id_h"/> 
  1. table_name 為 source 或者 Transform 定義的表名稱;
  2. target_table_name 為 hive 中的表結(jié)果,Hive 表可不存在也可存在,sparksql 會(huì)根據(jù) DataFrame 的數(shù)據(jù)類型自動(dòng)創(chuàng)建表;
  3. savemode 默認(rèn)為 overwrite 覆蓋寫入,當(dāng)寫入目標(biāo)已存在時(shí)刪除源表再寫入;支持 append 模式, 可增量寫入。

Target 有一個(gè)特殊的 show 類型的 target。用于直接在控制臺(tái)輸出一個(gè) DataFrame 的結(jié)果到控制臺(tái)(print),該 target 用于開發(fā)和測(cè)試。

  1. <target type="show" table_name="cust_id_agmt_id_t" rows=”10000”/> 

Rows 用于控制輸出多少行數(shù)據(jù)。

SparkSQL Around

After 用于 Flow 在運(yùn)行結(jié)束后執(zhí)行的一個(gè)環(huán)繞,用于記錄日志和寫入狀態(tài)。類似 Java 的 try {} finally{ round.execute() }

多個(gè) round 一定會(huì)執(zhí)行,round 異常不會(huì)導(dǎo)致任務(wù)失敗。

  1. <prepare
  2.  <round type="mysql" 
  3.  sql="insert into cpic_task_history(id, task_type, catalog_model, start_time, retry_count, final_status, created_at) 
  4.  values(${uuid}, ${task.type}, ${catalog.model}, ${starttime}, 0, ${status}, now())" 
  5.  url="${jdbc.url}" .../> 
  6. </prepare
  7. <after
  8.  <round type="mysql" 
  9.  sql="update cpic_task_history set 
  10.  end_time = ${endtime}, final_status = ${status}, error_text = ${error} where id = ${uuid}" 
  11.  url="${jdbc.url}”…/> 
  12. </after

Prepare round 和 after round 配合使用可用于記錄 SparkSQL Flow 任務(wù)的運(yùn)行日志。

SparkSQL Around的執(zhí)行效果 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

Prepare round 可做插入(insert)動(dòng)作,after round 可做更新 (update)動(dòng)作,相當(dāng)于在數(shù)據(jù)庫(kù)表中從執(zhí)行開始到結(jié)束有了完整的日志記錄。SparkSQL Flow 會(huì)保證round 一定能被執(zhí)行,而且 round 的執(zhí)行不影響任務(wù)的狀態(tài)。

SparkSQL Flow 提交

  1. bin/spark-submit --master yarn-client --driver-memory 1G  
  2. --num-executors 10 --executor-memory 2G  
  3. --jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar  
  4. --conf spark.yarn.jars=hdfs:///lib/spark2/*.jar  
  5. --queue default --name FlowTest  
  6. etl-flow-0.2.0.jar -f hive-flow-test.xml 

 基于 Spark 的數(shù)據(jù)分析實(shí)踐

接收必須的參數(shù) –f,可選的參數(shù)為支持 Kerberos 認(rèn)證的租戶名稱principal,和其認(rèn)證需要的密鑰文件。

  1. usage: spark-submit --jars etl-flow.jar --class 
  2.  com.yiidata.etl.flow.source.FlowRunner 
  3.  -f,--xml-file <arg> Flow XML File Path 
  4.  --keytabFile <arg> keytab File Path(Huawei) 
  5.  --krb5File <arg> krb5 File Path(Huawei) 
  6.  --principal <arg> principal for hadoop(Huawei) 

SparkSQL Execution Plan 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

每個(gè)Spark Flow 任務(wù)本質(zhì)上是一連串的 SparkSQL 操作,在 SparkUI SQL tab 里可以看到 flow 中重要的數(shù)據(jù)表操作。

regiserDataFrameAsTable 是每個(gè) source 和 Transform 的數(shù)據(jù)在 SparkSQL 中的數(shù)據(jù)視圖,每個(gè)視圖都會(huì)在 SparkContex 中注冊(cè)一次。

對(duì)RegisterDataFrameAsTable的分析 

基于 Spark 的數(shù)據(jù)分析實(shí)踐

通過單個(gè) regiserDataFrameAsTable 項(xiàng)進(jìn)行分析,SparkSQL 并不是把source 的數(shù)據(jù)立即計(jì)算把數(shù)據(jù)放到內(nèi)存,而是每次執(zhí)行 source 時(shí)只是生成了一個(gè) Logical Plan,只有遇到需要提交的算子(Action),SparkSQL 才會(huì)觸發(fā)前面所依賴的的 plan 執(zhí)行。

總結(jié)

這是一個(gè)開發(fā)框架,不是一個(gè)成熟的產(chǎn)品,也不是一種架構(gòu)。他只是基于 SparkSQL 整合了大多數(shù)的外部系統(tǒng),能通過 XML 的模板配置完成數(shù)據(jù)開發(fā)。面向的是理解數(shù)據(jù)業(yè)務(wù)但不了解 Spark 的數(shù)據(jù)開發(fā)人員。整個(gè)框架完成了大多數(shù)的外部系統(tǒng)對(duì)接,開發(fā)者只需要使用 type 獲得數(shù)據(jù),完成數(shù)據(jù)開發(fā)后通過 target 回寫到目標(biāo)系統(tǒng)中。整個(gè)過程基本無須程序開發(fā),除非當(dāng)前的 SQL 函數(shù)無法滿足使用的情況下,需要自行開發(fā)一下特定的 UDF。因此本框架在對(duì) SparkSQL 做了二次開發(fā)基礎(chǔ)上,大大簡(jiǎn)化了 Spark 的開發(fā),可降低了開發(fā)者使用難度。

關(guān)于作者:震秦,普元資深開發(fā)工程師,專注于大數(shù)據(jù)開發(fā) 8 年,擅長(zhǎng) Hadoop 生態(tài)內(nèi)各工具的使用和優(yōu)化。參與某公關(guān)廣告(上市)公司DMP 建設(shè),負(fù)責(zé)數(shù)據(jù)分層設(shè)計(jì)和批處理,調(diào)度實(shí)現(xiàn),完成交付使用;參與國(guó)內(nèi)多省市公安社交網(wǎng)絡(luò)項(xiàng)目部署,負(fù)責(zé)產(chǎn)品開發(fā)(Spark 分析應(yīng)用);參與數(shù)據(jù)清洗加工為我方主題庫(kù)并部署上層應(yīng)用。

關(guān)于EAWorld:微服務(wù),DevOps,數(shù)據(jù)治理,移動(dòng)架構(gòu)原創(chuàng)技術(shù)分享。

責(zé)任編輯:未麗燕 來源: 今日頭條
相關(guān)推薦

2015-10-12 17:40:12

數(shù)據(jù)分析實(shí)踐

2024-11-01 08:16:54

2018-06-15 20:44:40

Hadoop數(shù)據(jù)分析數(shù)據(jù)

2020-10-21 10:51:43

數(shù)據(jù)分析

2015-09-23 09:24:56

spark數(shù)據(jù)分析

2021-01-25 20:20:35

數(shù)據(jù)分析SparkHadoop

2014-06-30 10:59:21

2016-12-01 19:07:46

大數(shù)據(jù)數(shù)據(jù)分析

2023-03-01 18:32:16

系統(tǒng)監(jiān)控數(shù)據(jù)

2015-10-16 09:21:13

SparkMySQL數(shù)據(jù)分析

2018-02-26 08:44:35

Python微信數(shù)據(jù)分析

2017-01-04 10:29:37

Spark運(yùn)維技術(shù)

2017-10-11 11:10:02

Spark Strea大數(shù)據(jù)流式處理

2012-03-21 09:31:51

ibmdw

2024-03-19 09:24:00

大數(shù)據(jù)數(shù)據(jù)分析性能優(yōu)化

2016-10-19 18:31:11

2021-06-06 19:03:25

SQL大數(shù)據(jù)Spark

2016-12-07 15:40:42

谷歌數(shù)據(jù)分析Airbnb

2013-04-27 10:52:09

大數(shù)據(jù)全球技術(shù)峰會(huì)

2016-10-24 11:24:22

電商數(shù)據(jù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)