尋夢新聞LINE@每日推播熱門推薦文章,趣聞不漏接❤️
導讀隨著大數據時代的到來,Hadoop在過去幾年以接近統治性的方式包攬的ETL和數據分析查詢的工作,大家也無意間的想往大數據方向靠攏,即使每天數據也就幾十、幾百M也要放到Hadoop上作分析,只會適得其反,但是當面對真正的Big Data的時候,Hadoop就會暴露出它對於數據分析查詢支持的弱點。
背景
甚至出現《MapReduce: 一個巨大的倒退》此類極端的吐槽,這也怪不得Hadoop,畢竟它的設計就是為了批處理,使用用MR的編程模型來做到SQL查詢,性能肯定不如意。所以通常我也只是把Hive當做能夠提供將SQL語義轉換成MR任務的工具,尤其在做ETL的時候。
在Dremel論文發表之後,開源社區湧現出了一批基於MPP架構的SQL-on-Hadoop(HDFS)查詢引擎,典型代表有Apache Impala、Presto、Apache Drill、Apache HAWQ等,看上去這些查詢引擎提供的功能和做到方式也都大同小異,本文將基於Impala的使用和做到介紹日益發展的基於HDFS的MPP數據查詢引擎。
Impala介紹
Apache Impala是由Cloudera開發並開源的一款基於HDFS/Hbase的MPP SQL引擎,它擁有和Hadoop一樣的可擴展性、它提供了類SQL(類Hsql)語法,在多用戶場景下也能擁有較高的響應速度和吞吐量。它是由Java和C++做到的,Java提供的查詢交互的接口和做到,C++做到了查詢引擎部分,除此之外,Impala還能夠共享Hive Metastore(這逐漸變成一種標準),甚至可以直接使用Hive的JDBC jar和beeline等直接對Impala進行查詢、支持豐富的數據存儲格式(Parquet、Avro等),當然除了有比較明確的理由,Parquet總是使用Impala的第一選擇。
從用戶視角
可以將Impala這類系統的用戶分為兩類,一類是負責數據導入和管理的數據開發同學,另一類則是執行查詢的數據分析師同學,前者通常需要將數據存儲到HDFS,通過CREATE TABLE的方式創建與數據match的schema,然後通過load data或者add partition的方式將表和數據關聯起來,這一些流程串起來還是挺麻煩的,但是多虧了Hive,由於Impala可以共享Hive的MetaStore,這樣就可以使用Hive完成此類ETL工作,然後將數據查詢的工作交給Impala,大大簡化工作流程(據我所知畢竟大部分數據開發同學還是比較熟悉Hive)。接下來對於數據分析師而言就是如何編寫正確的SQ以表達他們的查詢、分析需求,這也是它們最拿手的了,Impala通常可以在TB級別的數據上提供秒級的查詢速度,所以使用起來可能讓你從Hive的龜速響應一下提升到期望的速度。
Impala除了支持簡單類型之外,還支持String、timestamp、decimal等多種類型,用戶還可以對於特殊的邏輯做到自定義函數(UDF)和自定義聚合函數(UDAF),前者可以使用Java和C++做到,後者目前僅支持C++做到,除此之外的schema操作都可以在Hive上做到,由於Impala的存儲由HDFS做到,因此不能夠做到update、delete語句,如果有此類需求,還是需要重新計算整個分區的數據並且覆蓋老數據,這點對於修改的實時性要求比較高的需求還是不能滿足的,如果有此類需求還是期待Kudu的支持吧,或者嘗試一下傳統的MPP數據庫,例如GreenPlum。
當完成數據導入之後,用戶需要執行COMPUTE STATS
以收集和更新表的統計信息,這些統計信息對於CBO優化器提供數據支持,用於生成更優的物理執行計劃。測試發現這個操作的速度還是比較快的,可以將其看做數據導入的一部分,另外需要注意的是這個語句不會自動執行,因此建議用戶在load完數據之後手動的執行一次該命令。 系統架構
從用戶的使用方式上來看,Impala和Hive還是很相似的,並且可以共享一份元數據,這也大大簡化了接入流程,下面我們從做到的角度來看一下Impala是如何工作的。下圖展示了Impala的系統架構和查詢的執行流程。
從上圖可以看出,Impala自身包含三個模塊:Impalad、Statestore和Catalog,除此之外它還依賴Hive Metastore和HDFS,其中Imapalad負責接受用戶的查詢請求,也意味著用戶的可以將請求發送給任意一個Impalad進程,該進程在本次查詢充當協調者(coordinator)的作用,生成執行計劃並且分發到其它的Impalad進程執行,最終匯集結果返回給用戶,並且對於當前Impalad和其它Impalad進程而言,他們同時也是本次查詢的執行者,完成數據讀取、物理算子的執行並將結果返回給協調者Impalad。這種無中心查詢節點的設計能夠最大程度的保證容錯性並且很容易做負載均衡。正如圖中展示的一樣,通常每一個HDFS的DataNode上部署一個Impalad進程,由於HDFS存儲數據通常是多副本的,所以這樣的部署可以保證數據的本地性,查詢盡可能的從本地磁盤讀取數據而非網路,從這點可以推斷出Impalad對於本地數據的讀取應該是通過直接讀本地文件的方式,而非調用HDFS的接口。為了做到查詢分割的子任務可以做到盡可能的本地數據讀取,Impalad需要從Metastore中獲取表的數據存儲路徑,並且從NameNode中獲取每一個文件的數據塊分布。
Catalog服務提供了元數據的服務,它以單點的形式存在,它既可以從外部系統(例如HDFS NameNode和Hive Metastore)拉取元數據,也負責在Impala中執行的DDL語句提交到Metatstore,由於Impala沒有update/delete操作,所以它不需要對HDFS做任何修改。之前我們介紹過有兩種方式向Impala中導入數據(DDL)——通過hive或者impala,如果通過hive則改變的是Hive metastore的狀態,此時需要通過在Impala中執行REFRESH以通知元數據的更新,而如果在impala中操作則Impalad會將該更新操作通知Catalog,後者通過廣播的方式通知其它的Impalad進程。默認情況下Catalog是異步加載元數據的,因此查詢可能需要等待元數據加載完成之後才能進行(第一次加載)。該服務的存在將元數據從Impalad進程中獨立出來,可以簡化Impalad的做到,降低Impalad之間的耦合。
除了Catalog服務,Impala還提供了StateStore服務完成兩個工作:消息訂閱服務和狀態監測功能。Catalog中的元數據就是通過StateStore服務進行廣播分發的,它做到了一個Pub-Sub服務,Impalad可以註冊它們希望獲得的事件類型,Statestore會周期性的發送兩種類型的消息給Impalad進程,一種為該Impalad註冊監聽的事件的更新,基於版本的增量更新(只通知上次成功更新之後的變化)可以減小每次通信的消息大小;另一種消息為心跳信息,StateStore負責統計每一個Impalad進程的狀態,Impalad可以據此了解其餘Impalad進程的狀態,用於判斷分配查詢任務到哪些節點。由於周期性的推送並且每一個節點的推送頻率不一致可能會導致每一個Impalad進程獲得的狀態不一致,由於每一次查詢只依賴於協調者Impalad進程獲取的狀態進行任務的分配,而不需要多個進程進行再次的協調,因此並不需要保證所有的Impalad狀態是一致的。另外,StateStore進程是單點的,並且不會持久化任何數據到磁盤,如果服務掛掉,Impalad則依賴於上一次獲得元數據狀態進行任務分配,官方並沒有提供可靠性部署的方案,通常可以使用DNS方式綁定多個服務以應對單個服務掛掉的情況。
Impalad模塊
從Impalad的各個模塊可以看出,主要查詢處理都是在Impalad進程中完成,StateStore和Catalog幫助Impalad完成元數據的管理和負載監控等工作,其實更進一步可以將Query Planner和Query Coordinator模塊從Impalad移出單獨的作為一個入口服務存在,而Impalad僅負責數據讀寫和子任務的執行。
在Impalad進行執行優化的時候根本原則是盡可能的數據本地讀取,減少網路通信,畢竟在不考慮內存緩存數據的情況下,從遠端讀取數據需要磁盤->內存->網卡->本地網卡->本地內存的過程,而從本地讀取數據僅需要本地磁盤->本地內存的過程,可以看出,在相同的硬件結構下,讀取其他節點數據始終本地磁盤的數據讀取速度。
Impalad服務由三個模塊組成:Query Planner、Query Coordinator和Query Executor,前兩個模塊組成前端,負責接收SQL查詢請求,解析SQL並轉換成執行計劃,交由後端執行,語法方面它既支持基本的操作(select、project、join、group by、filter、order by、limit等),也支持關聯子查詢和非關聯子查詢,支持各種outer-join和窗口函數,這部分按照通用的解析流程分為查詢解析->語法分析->查詢優化,最終生成物理執行計劃。對於Query Planner而言,它生成物理執行計劃的過程分成兩步,首先生成單節點執行計劃,然後再根據它得到分區可並行的執行計劃。前者是根據類似於RDBMS進行執行優化的過程,決定join順序,對join執行謂詞下推,根據關係運算公式進行一些轉換等,這個執行計劃的生成過程依賴於Impala表和分區的統計信息。第二步是根據上一步生成的單節點執行計劃得到分布式執行計劃,可參照Dremel的執行過程。在上一步已經決定了join的順序,這一步需要決定join的策略:使用hash join還是broadcast join,前者一般針對兩個大表,根據join鍵進行hash分區以使得相同的id散列到相同的節點上進行join,後者通過廣播整個小表到所有節點,Impala選擇的策略是依賴於網路通信的最小化。對於聚合操作,通常需要首先在每個節點上執行預聚合,然後再根據聚合鍵的值進行hash將結果散列到多個節點再進行一次merge,最終在coordinator節點上進行最終的合併(只需要合併就可以了),當然對於非group by的聚合運算,則可以將每一個節點預聚合的結果交給一個節點進行merge。sort和top-n的運算和這個類似。
下圖展示了執行select t1.n1, t2.n2, count(1) as c from t1 join t2 on t1.id = t2.id join t3 on t1.id = t3.id where t3.n3 between ‘a’ and ‘f’ group by t1.n1, t2.n2 order by c desc limit 100;查詢的執行邏輯,首先Query Planner生成單機的物理執行計劃,如下圖所示:
和大多數數據庫做到一樣,第一步生成了一個單節點的執行計劃,利用Parquet等列式存儲,可以在SCAN操作的時候只讀取需要的列,並且可以將謂詞下推到SCAN中,大大降低數據讀取。然後執行join、aggregation、sort和limit等操作,這樣的執行計劃需要再轉換成分布式執行計劃,如下圖。
這類的查詢執行流程類似於Dremel,首先根據三個表的大小權衡使用的join方式,這里T1和T2使用hash join,此時需要按照id的值分別將T1和T2分散到不同的Impalad進程,但是相同的id會散列到相同的Impalad進程,這樣每一個join之後是全部數據的一部分。對於T3的join使用boardcast的方式,每一個節點都會收到T3的全部數據(只需要id列),在執行完join之後可以根據group by執行本地的預聚合,每一個節點的預聚合結果只是最終結果的一部分(不同的節點可能存在相同的group by的值),需要再進行一次全局的聚合,而全局的聚合同樣需要並行,則根據聚合列進行hash分散到不同的節點執行merge運算(其實仍然是一次聚合運算),一般情況下為了較少數據的網路傳輸, intermediate節點同樣也是worker節點。通過本次的聚合,相同的key只存在於一個節點,然後對於每一個節點進行排序和TopN計算,最終將每一個Worker的結果返回給coordinator進行合併、排序、limit計算,返回結果給用戶。
Impalad優化
上面介紹了整個查詢大致的執行流程,Impalad的後端使用的是C++做到的,這使得它可以針對硬件做一些特殊的優化,並且可以比使用JAVA做到的SQL引擎有更好的資源使用率。另外,後端的做到使用了LLVM,它是一個編譯器框架,可以在執行器生成並編譯代碼。官方測試發現使用動態生成代碼機制可以使得後端執行性能提高1—5倍。
在數據訪問方面,Impalad並沒有使用通用的HDFS讀取數據那一套流程,畢竟Impalad一般部署在DataNode上,訪問數據完全不需要再走NameNode了,因此它使用了HDFS提供的Short-Circuit Local Reads機制,它提供了直接訪問DataNode的方案,可以參考Hadoop官方文檔和HDFS-347了解詳情。
最後Impalad後端支持對中文件格式和壓縮數據的讀取,包括Avro、RC、Sequence、Parquet,支持snappy、gzip、bz2等壓縮,看來Impala不支持可能也不打算支持ORC格式啦,畢竟有自家主推的Parquet,而ORC則在Presto中廣泛使用。
部署方式
通常情況下,我們會考慮兩種方式的集群部署:混合部署和獨立部署,下圖分別展示了混合部署與獨立部署時的各節點結構。混合部署意味著將Impala集群部署在Hadoop集群之上,共享整個Hadoop集群的資源;獨立部署則是單獨使用部分機器只部署HDFS和Impala,前者的優勢是Impala可以和Hadoop集群共享數據,不需要進行數據的拷貝,但是存在Impala和Hadoop集群搶占資源的情況,進而可能影響Impala的查詢性能(MR任務也可能被Impala影響),而後者可以提供穩定的高性能,但是需要持續的從Hadoop集群拷貝數據到Impala集群上,增加了ETL的複雜度。兩種方式各有優劣,但是針對前一種部署方案,需要考慮如何分配資源的問題,首先在混合部署的情況下不可能再讓Impalad進程常駐(這樣相當於把每一個NodeManager的資源分出去了一部分,並且不能充分利用集群資源),但是YARN的資源分配機制延遲太大,對於Impala的查詢速度有很大的影響,於是Impala很早就設計了一種在YARN上完成Impala資源調度的方案——Llama(Low Latency Application MAster),它其實是一個AM的角色,對於Impala而言。它的要求是在查詢執行之前必須確保需要的資源可用,否則可能出現一個Impalad的阻塞而影響整個查詢的響應速度(木桶原理),Llama會在Impala查詢之前申請足夠的資源,並且在查詢完成之後盡可能的緩存資源,只有當YARN需要將該部分資源用於其它工作時,Llama才會將資源釋放。雖然Llama盡可能的保持資源,但是當混合部署的情況下,還是可能存在Impala查詢獲取不到資源的情況,所以為了保證高性能,還是建議獨立部署。
測試
我們小組的同事對Impala做了一次基於TPCDS數據集的性能測試,分別基於1TB和10TB的數據集,可以看出,它的查詢性能較之於Hive有數量級級別的提升,對比Spark SQL也有幾倍的提升,Compute stat操作可以給Impala帶來一定的查詢優化,但是偶爾反而誤導查詢優化器以至於性能下降,最後我們還測試了Impala on Kudu,發現它並沒有達到意料中的性能(幾倍的差別)。唯一的缺憾是我們並沒有對多用戶並發場景下進行測試,不過從單個查詢的資源消耗來看,C++做到的Impala對資源的消耗也是最少的,可以推斷出在多用戶下它仍然能滿足快速響應的需求,最後是官方給出的多用戶場景下的對比結果(有點故意黑Presto的感覺)。
總結
本文主要介紹了Impala這個高性能的ad-hoc查詢引擎,分別從使用、原理和部署等方面做了詳細的分析,最終基於我們的測試結果也證實了它的高性能,區別於傳統DBMS的MPP解決方案,例如Greenplum、Vertica、Teradata等,Impala更好的融入大數據(Hadoop/Spark)生態圈,更好的做到數據之間的流通,而傳統MPP數據庫,更傾向於數據自制。當然基於HDFS的做到導致Impala無法做到單條數據的實時更新,而只能批量的追加或者覆蓋數據,雖然Cloudera也提供了Impala對於Kudu的支持,但是從性能測試結果看,目前查詢性能還是不理想,而傳統MPP數據庫不僅可以支持單條數據的實時更新,甚至能夠在保證查詢性能的情況下支持較複雜的事務,這也是SQL-on-Hadoop查詢引擎所望塵莫及的。但是無論如何,這類的查詢引擎畢竟支持SQL引擎而不是一個完整的數據庫系統,它提供給用戶在大數據圈中高性能的查詢服務,這也能夠滿足了大部分用戶的需求。