偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

Spark 在供應(yīng)鏈核算中的應(yīng)用總結(jié)

開(kāi)發(fā) 架構(gòu)
核算業(yè)務(wù)的特征比較偏向數(shù)據(jù)和規(guī)則的處理,大數(shù)據(jù)引擎的引入有助于整體業(yè)務(wù)的交付效率提升和成本降低。我們對(duì)Spark的認(rèn)知主要在完成數(shù)據(jù)處理邏輯開(kāi)發(fā)及日常的調(diào)優(yōu)上,隨著運(yùn)行實(shí)例的增多以及業(yè)務(wù)的不斷發(fā)展,當(dāng)前的技術(shù)方案也會(huì)不斷的迭代演進(jìn)。

一、業(yè)務(wù)背景

(會(huì)計(jì))核算是使用會(huì)計(jì)語(yǔ)言與方法,對(duì)產(chǎn)品業(yè)務(wù)的結(jié)果進(jìn)行登記與反映,從而為利益相關(guān)者提供直觀、準(zhǔn)確、有價(jià)值的信息,主要服務(wù)對(duì)象是財(cái)務(wù)、審計(jì)、外部監(jiān)管、合規(guī)以及管理層,同時(shí)核算也是資金管理風(fēng)險(xiǎn)防范的其中一個(gè)手段。整體流程可以概括為基于核算規(guī)則從業(yè)務(wù)事件(采購(gòu)入庫(kù)、退供、TOC確認(rèn)收貨、開(kāi)票等)關(guān)聯(lián)單據(jù)中提取業(yè)務(wù)要素(采購(gòu)/銷售主體、業(yè)務(wù)時(shí)間、客商、金額等)轉(zhuǎn)換為會(huì)計(jì)語(yǔ)言表達(dá)的數(shù)據(jù)(會(huì)計(jì)分錄,會(huì)計(jì)要素主要包括OU/收益部門/預(yù)算部門/往來(lái)段/明細(xì)段/行業(yè)段/成本中心等),供應(yīng)鏈核算主要鏈路如下圖所示:

從上圖可以看到供應(yīng)鏈核算一腳在業(yè)務(wù)(計(jì)費(fèi)/結(jié)算可以理解為財(cái)務(wù)視角的業(yè)務(wù)),一腳在財(cái)務(wù),職責(zé)上既要滿足核算團(tuán)隊(duì)月結(jié)出賬的訴求,又要提供業(yè)財(cái)對(duì)賬的能力,基于此我們將數(shù)據(jù)處理統(tǒng)一為如下流程:

二、離線 SQL 模式存在的問(wèn)題

從第1章節(jié)圖2可以看到,核算的流程就是ETL的過(guò)程,在早期的方案中通過(guò)離線+在線的實(shí)現(xiàn)方式,其中離線完成原始憑證的加工,業(yè)務(wù)接入的邏輯通過(guò)SQL實(shí)現(xiàn),在線系統(tǒng)完成記賬+拋賬,同時(shí)由于在線系統(tǒng)處理能力有限,在原始憑證加工中進(jìn)行了業(yè)務(wù)單據(jù)的聚合,此種實(shí)現(xiàn)方式主要存在以下問(wèn)題。

1.對(duì)賬問(wèn)題定位困難,核算小二主要通過(guò)下載分錄及對(duì)應(yīng)的業(yè)務(wù)單據(jù)匯總數(shù)據(jù)進(jìn)行對(duì)賬,如果某一分錄和業(yè)務(wù)數(shù)據(jù)有出入,只能逐一業(yè)務(wù)要素分析,由于缺乏通過(guò)分錄精確追溯到關(guān)聯(lián)業(yè)務(wù)單據(jù)的下鉆能力,問(wèn)題定位耗時(shí)較長(zhǎng),造成這一問(wèn)題的主要原因在于通過(guò)離線SQL實(shí)現(xiàn)的原始加工邏輯無(wú)法精確的建立業(yè)務(wù)單據(jù)和原始憑證的關(guān)聯(lián)關(guān)系。

2.日常運(yùn)維困難,隨著業(yè)務(wù)的不斷發(fā)展,業(yè)務(wù)接入離線任務(wù)在不斷的膨脹,最終成為一個(gè)橫跨4個(gè)項(xiàng)目空間,150+離線任務(wù)、100+離線表的工程,任一節(jié)點(diǎn)的錯(cuò)誤都會(huì)造成月結(jié)數(shù)據(jù)出錯(cuò)。

3.行業(yè)實(shí)施效率較低,每次新接入行業(yè)都需要開(kāi)發(fā)小二新建一套離線表+離線任務(wù),相應(yīng)的也造成運(yùn)維問(wèn)題的持續(xù)惡化。

三、為什么選擇Spark

1.核心訴求

在核算主版本的建設(shè)中,我們希望能夠通過(guò)打造穩(wěn)定可復(fù)用的產(chǎn)品能力最大程度的解決上述問(wèn)題,核心訴求如下

1)核算規(guī)則(業(yè)務(wù)接入/記賬/拋賬)可配、可視,不存在黑盒的加工邏輯,加工流程對(duì)核算小二全透明(提升實(shí)施+對(duì)賬效率)

2)建立整個(gè)核算鏈路單據(jù)維度的關(guān)聯(lián)關(guān)系(業(yè)務(wù)單據(jù)<->原始憑證<->記賬憑證<->拋賬憑證),具備雙向的單據(jù)追溯能力(提升對(duì)賬效率)

基于以上訴求,我們抽象了標(biāo)準(zhǔn)的規(guī)則模型,滿足用戶多場(chǎng)景下各個(gè)鏈路(業(yè)務(wù)接入、記賬、拋賬)的加工邏輯配置(規(guī)則相關(guān)設(shè)計(jì)方案不再此文展開(kāi)),與之配套的會(huì)計(jì)引擎完成基于核算規(guī)則的數(shù)據(jù)處理,另外在主版本的設(shè)計(jì)中,原始憑證需要1V1還原業(yè)務(wù)單據(jù),每月原始憑證數(shù)據(jù)量達(dá)到了10億級(jí)別,為了滿足月結(jié)時(shí)效性的要求,我們需要采用高性能、支持大數(shù)據(jù)量、且編程友好(便于建立單據(jù)關(guān)系)的計(jì)算引擎。

2.Spark VS MapReduce

基于上述訴求,我們重點(diǎn)調(diào)研了Spark和MapReduce兩款計(jì)算引擎,差異如下所示:

引擎

MapReduce

Spark

編程友好

一般,支持Map/Reduce兩種算子

較好,支持的算子豐富(map/filter/reduce/aggregate等)

性能

一般,中間態(tài)數(shù)據(jù)需要落盤,計(jì)算邏輯相對(duì)復(fù)雜時(shí),MapReduce會(huì)涉及到多MapReduce任務(wù)執(zhí)行(多次shuffle),每次shuffle也會(huì)涉及到大量的磁盤IO

較好,基于內(nèi)存計(jì)算,基于DAG可以構(gòu)建RDD的血緣關(guān)系,在調(diào)度過(guò)程中可以避免大量無(wú)效的磁盤IO,另外rdd共享機(jī)制可以降低網(wǎng)絡(luò)IO的開(kāi)銷

集團(tuán)生態(tài)

較好,odps提供MapReduce計(jì)算框架支持,可以通過(guò)LogView查看日志

較好,odps提供Spark計(jì)算引擎支持,可以通過(guò)LogView查看日志,目前提供了stand-alone、集群及client三種模式的支持

比較形象的對(duì)比(并不是說(shuō)spark不會(huì)落盤,在基于DAG圖拆分stage時(shí),也會(huì)涉及到shuffle,但整體的磁盤IO消耗比MapReduce要低)。

3.編程模式優(yōu)勢(shì): RDD + DataFrame 的編程模式

如上面和MapReduce的比較中看到 Spark 在編程友好性上比MapReduce好一些,比較適合后端開(kāi)發(fā)人員。

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

上面是一個(gè)官方的例子,在schema控制,可編程性和 sql 操作等能較好的結(jié)合,邏輯比較類同后端開(kāi)發(fā)。

基于上述spark特點(diǎn)及優(yōu)勢(shì),我們最終選擇spark實(shí)現(xiàn)會(huì)計(jì)引擎邏輯。

四、spark基礎(chǔ)介紹

1.基礎(chǔ)概念

  • Rdd(Resilient distributed dataset):不可變的彈性分布式數(shù)據(jù)集(不可變性似于docker中的只讀鏡像層),只能通過(guò)其他的transformation算子創(chuàng)建新的RDD。
  • Operations:算子,spark包括兩類算子,transformation(轉(zhuǎn)換算子,通過(guò)對(duì)前置rdd的處理生成新的rdd)/action(觸發(fā)spark job的拆分及執(zhí)行,負(fù)責(zé)將rdd輸出)。
  • Task:執(zhí)行器執(zhí)行的任務(wù)單元,一般基于當(dāng)前rdd的分區(qū)數(shù)量拆分。
  • Job:包含多個(gè)task的集合,基于Action算子拆分。
  • Stage:基于當(dāng)前rdd處理邏輯的寬窄依賴拆分,spark中非常重要的概念,stage的切換會(huì)涉及到IO。
  • Narrow/Wide dependencies:參考下圖,區(qū)分的重要依據(jù)在于父節(jié)點(diǎn)是否會(huì)被多個(gè)子節(jié)點(diǎn)使用。

2.Spark on MaxCompute(ODPS)

我們?cè)趯?shí)踐中,主要基于spark on odps提供的client模式實(shí)現(xiàn),client模式的詳細(xì)介紹可以參考相關(guān)文檔。

  • Spark 有很多的后端的 Runtime,例如其商業(yè)化公司的Databricks Runtime, 彈內(nèi)我們使用的是 AliSpark,是集團(tuán)的適配MaxComputer,同時(shí)在離線交互是使用了 Cupid-SDK 的 Client模式,這個(gè)模式不是獨(dú)立集群的模式,類Serveless模式,整體的成本上比獨(dú)立集群要低,當(dāng)然資源保障上沒(méi)有獨(dú)立集群好。

Client模式原理參考相關(guān)文檔,比調(diào)度模式有更好的應(yīng)用交互性。

  • 集團(tuán)client模式將spark session作為服務(wù)提供,可以方便地與在線系統(tǒng)交互,包括任務(wù)的提交、關(guān)閉、實(shí)例的關(guān)閉等;
  • 在使用集團(tuán)提供的spark能力時(shí),比較麻煩的在于如何方便的查看日志,從我們的實(shí)踐看主要有以下2個(gè)路徑。

申請(qǐng)odps對(duì)應(yīng)項(xiàng)目空間的logview權(quán)限,可以直接在https://logview.alibaba-inc.com/中基于sparkInstanceId定位到具體的日志;

借助odps client+提交spark任務(wù)時(shí)返回的實(shí)例ID獲取log地址,代碼參考如下

//instanceIdd對(duì)應(yīng)odps client中的lookupName
Account account = new AliyunAccount(sparkSessionConfig.getAccessId(), sparkSessionConfig.getAccessKey());
Odps odps = new Odps(account);
odps.setEndpoint(sparkSessionConfig.getEndPoint());
odps.setDefaultProject(sparkSessionConfig.getNamespace());
//日志地址目前設(shè)定有效期為7*24小時(shí)
try {
return odps.logview().generateLogView(odps.instances().get(sparkInstanceId), 7 * 24L);
} catch (OdpsException e) {
LOGGER.error("生成logView地址失敗,config:{},instanceId:{},e:{}", sparkSessionConfig, sparkInstanceId, e);
}

五、技術(shù)方案

1.整體方案

spark作為大數(shù)據(jù)處理引擎,在實(shí)例數(shù)量較少的情況下采用odps任務(wù)目前的運(yùn)維方式來(lái)管理的話成本并不高,但是在供應(yīng)鏈核算的場(chǎng)景下,需要支持每天將近600+(行業(yè)*核算場(chǎng)景)數(shù)量的實(shí)例運(yùn)行,且需滿足核算完整性、準(zhǔn)確性、及時(shí)性的要求,另外由于目前我們的spark任務(wù)(cupid)與odps任務(wù)共享項(xiàng)目空間資源,意味著我們需要在有限的資源下支持核算的業(yè)務(wù),基于以上背景及訴求,供應(yīng)鏈核算整體的應(yīng)用架構(gòu)設(shè)計(jì)如下:

其中ascp-finance-accounting負(fù)責(zé)任務(wù)調(diào)度,組件交互如下

  • spark任務(wù)管理:負(fù)責(zé)spark任務(wù)相關(guān)生命周期的管理,承接核算任務(wù)和spark session之間的交互;
  • spark session管理:負(fù)責(zé)spark實(shí)例的創(chuàng)建、銷毀、job提交等,另外針對(duì)不同類型的session,支持自定義所需資源,包括實(shí)例worker數(shù)量、分區(qū)大小等,主要與spark on odps交互;
  • 核算任務(wù)管理:負(fù)責(zé)業(yè)務(wù)接入、記賬、拋賬等核算任務(wù)的生命周期管理;
  • spark job版本管理:spark任務(wù)所需jar包會(huì)不斷的迭代,針對(duì)不同的核算場(chǎng)景可以定制所需的job版本。

ascp-finance-accounting-spark負(fù)責(zé)spark job的開(kāi)發(fā)維護(hù),spark on odps client模式下需要基于服務(wù)上傳jar包,若jar包較大,性能較差,所以基于client模式下提供的resource管理能力,我們將項(xiàng)目module拆分如下:

包名

作用

accounting-spark-client

對(duì)外提供spark任務(wù)的啟動(dòng)、查詢及終止服務(wù)

accounting-spark-common

公共包,包括常量、工具類等

accounting-spark-job

spark任務(wù)包,封裝了任務(wù)接入和記賬兩個(gè)任務(wù)的實(shí)現(xiàn)

accounting-spark-dependency

spark任務(wù)包依賴的二方包,client模式下若job包過(guò)大,會(huì)造成上傳失敗的問(wèn)題,所以部分job依賴的二方包可以放在dependency中,單獨(dú)打包,手工在datawork中上傳,通過(guò)resources傳遞參數(shù)

2.數(shù)據(jù)處理流程

核算接入、記賬、拋賬等主流程的spark處理邏輯如下所示:

六、運(yùn)維及調(diào)優(yōu)

基于spark的特性,完成數(shù)據(jù)處理邏輯的編寫對(duì)我們來(lái)說(shuō)并不困難,問(wèn)題主要集中在如何用盡可能低的成本滿足業(yè)務(wù)需求,特別是在目前控制成本的背景下,在供應(yīng)鏈核算的落地過(guò)程中,我們主要采用了以下優(yōu)化方式。

1.數(shù)據(jù)量評(píng)估

spark任務(wù)的運(yùn)行效率很大程度上受到分區(qū)數(shù)量的影響,spark提供了如下手段來(lái)進(jìn)行分區(qū)數(shù)量的調(diào)整(部分為spark on odps能力),供應(yīng)鏈核算在實(shí)現(xiàn)過(guò)程中主要用到了odps離線表和lindorm兩種數(shù)據(jù)源。

1)spark.hadoop.odps.input.split.size:用于設(shè)置spark讀取odps離線表的分區(qū)大小,默認(rèn)為256M,在實(shí)踐過(guò)程中需要結(jié)合當(dāng)前分區(qū)的大小進(jìn)行調(diào)整,比如當(dāng)前分區(qū)大小為1GB,那么默認(rèn)情況下會(huì)拆分為4個(gè)分區(qū);

2)spark讀寫lindorm(類hbase)的分區(qū)數(shù)主要受到region數(shù)量的影響,在供應(yīng)鏈核算系統(tǒng)的實(shí)踐中,由于初始region數(shù)量較少,導(dǎo)致分區(qū)數(shù)量很少,spark執(zhí)行效率很差,針對(duì)此問(wèn)題我們實(shí)踐了兩種處理策略;

  • 進(jìn)行重分區(qū)(repartition算子):針對(duì)數(shù)據(jù)傾斜進(jìn)行重新分區(qū),但是會(huì)拆分stage,觸發(fā)shuffle,增加額外的IO成本。
  • lindorm進(jìn)行預(yù)分區(qū),比如預(yù)分區(qū)為128個(gè)region,但此種實(shí)現(xiàn)方案需要結(jié)合rowkey的設(shè)計(jì)一起使用,會(huì)影響到scan的效率。

2.代碼邏輯相關(guān)job/stage/task評(píng)估

除了六中所述數(shù)據(jù)量以外,數(shù)據(jù)處理邏輯的實(shí)現(xiàn)方法也會(huì)影響到任務(wù)的執(zhí)行效率,spark比mapreduce執(zhí)行效率高的一個(gè)原因就在于spark會(huì)先基于處理流程構(gòu)建DAG,這樣可以有效評(píng)估每個(gè)stage是否需要落盤(IO成本),在邏輯實(shí)現(xiàn)過(guò)程中我們?cè)诒WC數(shù)據(jù)處理無(wú)誤的情況下需要盡可能得降低IO(減少shuffle),比如可以執(zhí)行以下策略。

  • 慎用效率角度的算子,比如groupBy。
  • 盡量減少stage數(shù)量。

3.計(jì)算存儲(chǔ)資源評(píng)估

計(jì)算存儲(chǔ)資源同樣是spark執(zhí)行效率優(yōu)化的關(guān)鍵,spark也提供了多種手段來(lái)調(diào)整資源的使用情況;

  • spark.executor.instances executor:設(shè)置當(dāng)前實(shí)例的worker數(shù)量;
  • spark.executor.cores:核數(shù),每個(gè)Executor中的可同時(shí)運(yùn)行的task數(shù)目;
  • spark.executor.memory:executor內(nèi)存。

4.其他參數(shù)

odps.cupid.clientmode.heartbeat.timeout 此配置用來(lái)調(diào)節(jié)cupid(spark on odps) client模式下的心跳超時(shí)時(shí)間,默認(rèn)為30分鐘,若任務(wù)執(zhí)行較長(zhǎng),需要進(jìn)行調(diào)整。

hbase.client.write.buffer:用來(lái)調(diào)節(jié)lindorm的flush磁盤的buffer大小,lindorm mput數(shù)量限制為100(經(jīng)咨詢?yōu)槿窒拗?,無(wú)法調(diào)整),所以在spark寫lindorm時(shí)我們主要采用此配置項(xiàng)調(diào)節(jié)批量寫入的數(shù)量,這點(diǎn)比較坑。

spark.hadoop.odps.cupid.job.priority:用于調(diào)節(jié)任務(wù)資源獲取的優(yōu)先級(jí)。

5.Spark UI

spark 本身的 UI 中有整體的job/stage/task的可視化分析數(shù)據(jù),比較方便的查詢到對(duì)應(yīng)的執(zhí)行過(guò)程,如下圖:

通過(guò)SparkUI 可以看到任務(wù)的驅(qū)動(dòng)步驟和對(duì)應(yīng)的執(zhí)行的日志。通過(guò)分析可以針對(duì)性的優(yōu)化提升。

6.交互式開(kāi)發(fā)測(cè)試

ODPS 有一個(gè)非常好的所見(jiàn)所得的 dataworks 平臺(tái),大大提升了開(kāi)發(fā)的效率,spark 當(dāng)前在dataworks沒(méi)有直接的交互的IDE,需要通過(guò) zeppelin 來(lái)實(shí)現(xiàn)。zeppelin在數(shù)據(jù)技術(shù)棧中的定位如下:

Web-based notebook that enables data-driven,interactive data analytics and collaborative documents with SQL, Scala, Python, R and more.

可以在交互中實(shí)現(xiàn)結(jié)果的快速反饋。

支持 scala 的 UDF 驗(yàn)證等,提升了測(cè)試驗(yàn)證效率。

7.效果

經(jīng)過(guò)以上優(yōu)化,在2500萬(wàn)數(shù)據(jù)量60worker數(shù)的場(chǎng)景,接入+記賬+拋賬流程由之前的2小時(shí)提效至10分鐘,同時(shí)在編程模式上更加匹配服務(wù)端技術(shù)的研發(fā)模式,提升了研發(fā)效率。

七、總結(jié)

核算業(yè)務(wù)的特征比較偏向數(shù)據(jù)和規(guī)則的處理,大數(shù)據(jù)引擎的引入有助于整體業(yè)務(wù)的交付效率提升和成本降低。目前我們對(duì)Spark的認(rèn)知主要在完成數(shù)據(jù)處理邏輯開(kāi)發(fā)及日常的調(diào)優(yōu)上,隨著運(yùn)行實(shí)例的增多以及業(yè)務(wù)的不斷發(fā)展,當(dāng)前的技術(shù)方案也會(huì)不斷的迭代演進(jìn)。

參考文檔

通過(guò)spark訪問(wèn)lindorm:https://help.aliyun.com/document_detail/174657.html

責(zé)任編輯:武曉燕 來(lái)源: 阿里技術(shù)
相關(guān)推薦

2023-03-22 11:17:04

ChatGPT供應(yīng)鏈人工智能

2022-01-20 11:12:00

區(qū)塊鏈金融應(yīng)用

2025-04-08 05:00:00

AI人工智能供應(yīng)鏈

2023-02-23 07:52:20

2019-07-08 10:16:30

物聯(lián)網(wǎng)區(qū)塊鏈大數(shù)據(jù)

2024-09-24 20:29:05

2022-01-22 00:29:36

區(qū)塊鏈食品技術(shù)

2023-03-07 16:00:16

自動(dòng)化人工智能

2020-12-02 10:29:41

物聯(lián)網(wǎng)供應(yīng)鏈IOT

2024-09-03 16:55:01

2017-01-23 11:18:16

戴爾

2022-03-02 14:08:35

區(qū)塊鏈供應(yīng)鏈技術(shù)

2022-04-26 10:47:15

智能供應(yīng)鏈供應(yīng)鏈

2023-03-03 14:46:03

物聯(lián)網(wǎng)供應(yīng)鏈

2022-01-10 14:09:47

供應(yīng)鏈安全分析技術(shù)網(wǎng)絡(luò)安全

2023-12-25 16:43:04

區(qū)塊鏈CAGR自動(dòng)化

2018-04-13 17:55:03

區(qū)塊鏈供應(yīng)鏈金融

2022-12-06 14:34:28

2020-04-26 14:34:47

新冠病毒物聯(lián)網(wǎng)技術(shù)供應(yīng)鏈

2024-08-14 15:47:22

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)