Spark讀取kafka復(fù)雜嵌套json的最佳實(shí)踐,與其在大數(shù)據(jù)分析平臺(tái)中的應(yīng)用
2022-09-16 18:54:00
次
隨著互聯(lián)網(wǎng)的更進(jìn)一步發(fā)展,信息瀏覽、搜索以及電子商務(wù)、互聯(lián)網(wǎng)旅游生活產(chǎn)品等將生活中的流通環(huán)節(jié)在線化,對(duì)于實(shí)時(shí)性的要求進(jìn)一步提升,而信息的交互和溝通正在從點(diǎn)對(duì)點(diǎn)往信息鏈甚至信息網(wǎng)的方向發(fā)展,這樣必然帶來(lái)數(shù)據(jù)各個(gè)維度的交叉關(guān)聯(lián),數(shù)據(jù)爆炸也不可避免,因此流式處理應(yīng)運(yùn)而生,解決實(shí)時(shí)框架問(wèn)題,助力大數(shù)據(jù)分析。
kafka是一個(gè)高性能的流式消息隊(duì)列,適用于大數(shù)據(jù)場(chǎng)景下的消息傳輸、消息處理和消息存儲(chǔ),kafka可靠的傳遞能力讓它成為流式處理系統(tǒng)完美的數(shù)據(jù)來(lái)源,很多基于kafka構(gòu)建的流式處理系統(tǒng)都將kafka作為唯一可靠的數(shù)據(jù)來(lái)源。如Apache Storm、 Apache Spark Streaming 、Apache Flink 、Apache Samza 等。
json是kafka消息中比較常見(jiàn)的格式,對(duì)于單層json數(shù)據(jù)的讀取和解析相對(duì)簡(jiǎn)單,但是在真實(shí)kafka流程處理的業(yè)務(wù)中,很多情況下都是json嵌套復(fù)雜格式消息。Spark1.1以后的版本存在一些實(shí)用的 SparkSQL函數(shù),幫助解決復(fù)雜的json數(shù)據(jù)格式,實(shí)用函數(shù)包括get_json_object、from_json和explode等。
01、Spark框架中的基本概念和內(nèi)置函數(shù)
? RDD:Spark的基本計(jì)算單元,它是一個(gè)彈性可復(fù)原的分布式數(shù)據(jù)集。
? Dataframe:定義為指定到列的數(shù)據(jù)集(Dataset)。DFS類似于關(guān)系型數(shù)據(jù)庫(kù)中的表或者像R/Python 中的Dataframe ,可以說(shuō)是一個(gè)具有良好優(yōu)化技術(shù)的關(guān)系表。
? Spark SQL:它是Spark的其中一個(gè)模塊,用于結(jié)構(gòu)化數(shù)據(jù)處理,Spark SQL提供的接口為Spark提供了有關(guān)數(shù)據(jù)結(jié)構(gòu)和正在執(zhí)行的計(jì)算的更多信息,Spark SQL會(huì)使用這些額外的信息來(lái)執(zhí)行額外的優(yōu)化。
? from_json:Spark SQL內(nèi)置的函數(shù),從一個(gè)json 字符串中按照指定的schema格式抽取出來(lái)作為DataFrame的列,第一個(gè)參數(shù)為列名,以$"column_name"表示,第二個(gè)參數(shù)為定義的數(shù)據(jù)結(jié)構(gòu)

? get_json_object:Spark SQL內(nèi)置的函數(shù),從一個(gè)json字符串中根據(jù)指定的json路徑抽取一個(gè)json對(duì)象,第一個(gè)參數(shù)為column名,用$"column_name"表示,第二個(gè)參數(shù)為要取的json字段名,"$.字段名"表示。
? explode:Spark SQL內(nèi)置的函數(shù),可以從規(guī)定的Array或者M(jìn)ap中使用每一個(gè)元素創(chuàng)建一列,主要用于數(shù)組數(shù)據(jù)的展開(kāi),參數(shù)為column名,用$"column_name"表示。
02、Kafka復(fù)雜嵌套json解析
1)什么是復(fù)雜json?
json是一種輕量級(jí)的數(shù)據(jù)交換標(biāo)準(zhǔn),具體以逗號(hào)分隔的key:value鍵值對(duì)的串形式,主要表現(xiàn)形式包括兩種:{對(duì)象},[數(shù)組],其中key以字符串表達(dá),value包括字符串、數(shù)值、boolean值、對(duì)象和數(shù)組(可嵌套)。在復(fù)雜的json數(shù)據(jù)格式中,通常json數(shù)據(jù)會(huì)有嵌套,每個(gè)層級(jí)的結(jié)構(gòu)不完全相同,value中不同類型進(jìn)行混合使用。
下圖為一份簡(jiǎn)單json格式數(shù)據(jù):
期望處理的結(jié)果為下圖的二維表,json串中的key(id,sepallength,sepalwidth,
petallength,petalwidth,label)作為二維表的列,value作為表的一行數(shù)據(jù)。
下圖為一份復(fù)雜json格式數(shù)據(jù):
期望處理的結(jié)果為下圖的二維表,json串中單層key(id,createTime,deviceCode)和需要展開(kāi)的數(shù)組trajectory中單個(gè)元素key(x,y)作為二維表的列,value是將數(shù)組trajectory中所有的元素展開(kāi)成多行后,與其他列的數(shù)據(jù)進(jìn)行對(duì)齊。
2)整體思路
Kafka消費(fèi)者收到復(fù)雜嵌套json消息后,一共有兩步。
第一步:首先把這批json字符消息轉(zhuǎn)換成分布式數(shù)據(jù)集RDD[String]中,再將RDD[String]轉(zhuǎn)換成列名為`json`的DataFrame,然后通過(guò)Spark SQL內(nèi)置函數(shù)get_json_object將json對(duì)象中的`id`、`createTime`、`deviceCode`、`data.trajectory`分別生成新列,并構(gòu)建一個(gè)包含這些列的新DataFrame;
第二步:獲取需要展開(kāi)的列`data.trajectory`的schema(元數(shù)據(jù)信息),然后由SparkSQL內(nèi)置函數(shù)from_json將列`data.trajectory`的字符內(nèi)容轉(zhuǎn)換成數(shù)組對(duì)象,最后通過(guò)SparkSQL內(nèi)置函數(shù)explode將`data.trajectory`中的數(shù)組中每個(gè)元素展開(kāi)成多行。
基于spark解析復(fù)雜json流程設(shè)計(jì)圖:
3)Spark讀取kafka復(fù)雜json消息解析核心代碼
json格式數(shù)據(jù)如果使用現(xiàn)有的工具,用戶常常需要開(kāi)發(fā)出復(fù)雜的程序來(lái)讀寫(xiě)分析系統(tǒng)中的json數(shù)據(jù),Spark SQL對(duì)json數(shù)據(jù)的支持是從1.1版本開(kāi)始發(fā)布,并且在Spark 1.2版本中進(jìn)行了加強(qiáng)。
下圖的代碼是通過(guò)Spark SQL內(nèi)置的json函數(shù)將復(fù)雜json轉(zhuǎn)換成一張二維表,并支持將json中數(shù)組數(shù)據(jù)進(jìn)行展開(kāi)處理。
4)kafka復(fù)雜json解析在Tempo AI中的應(yīng)用
Tempo AI機(jī)器學(xué)習(xí)平臺(tái)將kafka數(shù)據(jù)作為數(shù)據(jù)挖掘分析標(biāo)準(zhǔn)數(shù)據(jù)源,既支持簡(jiǎn)單的json解析,也支持復(fù)雜json解析,先進(jìn)行基礎(chǔ)配置讀取消息數(shù)據(jù),查看消息內(nèi)容,然后進(jìn)行映射配置,將數(shù)據(jù)內(nèi)容與對(duì)應(yīng)元信息進(jìn)行匹配,最后可以預(yù)覽數(shù)據(jù)內(nèi)容。
基礎(chǔ)配置,包括連接配置和消息信息配置,如下圖所示:
在“消息內(nèi)容”頁(yè)面,查看提取的單條Kafka消息內(nèi)容,如下圖:
在“映射配置”頁(yè)面,根據(jù)左側(cè)預(yù)覽的消息內(nèi)容,通過(guò)點(diǎn)擊選擇左側(cè)的消息到右側(cè),進(jìn)行映射配置,可以設(shè)置需要展開(kāi)的數(shù)組,如下圖:
kafka輸入節(jié)點(diǎn)配置完成后,執(zhí)行AI流程,查看洞察信息,如下圖所示:
綜上,json是一種輕量級(jí)的數(shù)據(jù)交換格式,易于閱讀和編寫(xiě),目前是一種主流的數(shù)據(jù)格式,json字符串作為消息在kafka消息流中傳遞應(yīng)用很廣泛,通過(guò)Tempo 機(jī)器學(xué)習(xí)平臺(tái)封裝的Spark SQL解析復(fù)雜json的能力,極大簡(jiǎn)化了使用json數(shù)據(jù)的終端的相關(guān)工作,使客戶更專注于自己的業(yè)務(wù)。