Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。 Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrift的client,連接到impala
Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。
Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrift的client,連接到impalad的21000端口。
Impalad: 分為frontend和backend兩部分,這個進程有三個ThriftServer(beeswax_server, hs2_server, be_server)對系統外和系統內提供服務。
Statestored: 集群內各個backend service的數據交換中心,每個backend會在statestored注冊,以后statestored會與所有注冊過的backend交換update消息。
Component | Service | Port | Access Requirement | Comment |
ImpalaDaemon | Impala Daemon Backend Port | 22000 | Internal | ImpalaBackendService export |
Impala Daemon Frontend Port | 21000 | External | ImpalaService export | |
Impala Daemon HTTP Server Port | 25000 | External | Impala debug web server | |
StateStoreSubscriber Service Port | 23000 | Internal | StateStoreSubscriberService | |
?ImpalaStateStore Daemon | StateStore HTTP Server Port | 25010 | External | StateStore debug web server |
StateStore Service Port | 24000 | Internal | StateStoreService export |
下面介紹三個組件之間的Thrift RPC(“<->”前面的表示RPC client,“<->”后面的表示RPC server)
BeeswaxService(beeswax.thrift): client通過query()提交SQL請求,然后異步調用get_state()監聽該SQL的查詢進度,一旦完成,調用fetch()取回結果。
TCLIService(cli_service.thrift): client提交SQL請求,功能和上面類似,更豐富的就是對DDL操作的支持,例如GetTables()返回指定table的元數據。
ImpalaService和ImpalaHiveServer2Service(ImpalaService.thrift)分別是上面兩個類的子類,各自豐富了點功能而已,核心功能沒啥大變化。
StateStoreService(StateStoreService.thrift): statestored保存整個系統所有backend service狀態的全局數據庫,這里是個單節點中央數據交換中心(該節點保存的狀態是soft state,一旦宕機,保存的狀態信息就沒了)。例如每個impala backend啟動的時候會調用StateStoreService.RegisterService()向statestored注冊自己(其實是通過跟這個backend service捆綁在一起的StateStoreSubscriber標識的),然后再調用StateStoreService.RegisterSubscription()表明這個StateStoreSubscriber接收來自statestored的update。
StateStoreSubscriberService(StateStoreSubscriberService.thrift): backend向statestored調用RegisterSubscription之后,statestored就會定期向backend這邊捆綁的StateStoreSubscriber發送該backend的狀態更新信息。然后backend這邊調用StateStoreSubscriberService.UpdateState()更新相關狀態。同時這個UpdateState()調用在impalad backend/StateStoreSubscriber這端還會返回該backend的一些update信息給statestored。
ImpalaInternalService(ImpalaInternalService.thrift):某個backend的coordinator要向其他backend的execute engine發送執行某個plan fragment的請求(提交ExecPlanFragment并要求返回ReportExecStatus)。這部分功能會在backend分析中詳細討論。
ImpalaPlanService(ImpalaPlanService.thrift):可以由其他形式的frontend生成TExecRequest然后交給backend執行。
另外,Impala frontend是用Java寫的,而backend使用C++寫的。Frontend負責把輸入的SQL解析,然后生成執行計劃,之后通過Thrift的序列化/反序列化的方式傳給backend。TExecRequest(frontend.thrift)是中間傳輸的數據結構,表示了一個Query/DML/DDL的查詢請求,也是SQL執行過程中在frontend和backend之間的數據接口。所以我們可以把impala-frontend換掉,用其他的形式拼湊出這個TExecRequest就可以傳給backend執行,這也就是前面說的ImpalaPlanService干的事。
client就可以通過Beeswax和HiveServer2的Thrift API向Impala提交query。這兩種訪問接口的作用是一樣的(都是用于client提交query,返回query result)。
Impala_shell.py是通過Beeswax方式訪問impala的,下面我們看看impala_shell.py是怎么向impalad提交query的。
(1)通過OptionParser()解析命令行參數。如果參數中有—query或者—query_file,則執行execute_queries_non_interactive_mode(options),這是非交互查詢(也就是就查詢一個SQL或者一個寫滿SQL的文件);否則進入ImpalaShell.cmdloop (intro)循環。
(2)進入命令行循環后,一般是先connect某一個impalad,輸入”connect localhost:21000”,進入do_connect(self, args)函數。這個函數根據用戶指定的host和port,生成與相應的impalad的socket連接。最重要的就是這行代碼:
self.imp_service = ImpalaService.Client(protocol)
至此imp_service就是client端的代理了,所有請求都通過它提交。
(3)下面以select命令為例說明,如果client輸入這樣的命令”select col1, col2 from tbl”,則進入do_select(self, args)函數。在這個函數里首先生成BeeswaxService.Query對象,向這個對象填充query statement和configuration。然后進入__query_with_result()函數通過imp_service.query(query)提交query。注意ImpalaService都是異步的,提交之后返回一個QueryHandle,然后就是在一個while循環里不斷__get_query_state()查詢狀態。如果發現這個SQL的狀態是FINISHED,那么就通過fetch() RPC獲取結果。
Statestored進程對外提供StateStoreService RPC服務,而StateStoreSubscriberService RPC服務是在impalad進程中提供的。StateStoreService這個RPC的邏輯實現是在StateStore這個類里面實現的。
Statestored收到backend發送的RegisterService RPC請求時,調用StateStore::RegisterService()處理,主要做兩件事:
(1)根據TRegisterServiceRequest提供的service_id把該service加入StateStore.service_instances_。
通常在整個impala集群只存在名為“impala_backend_service”這一個服務,所以service_id=”impala_backend_service”。而每個backend捆綁的
(2)Impalad backend在向statestored RegisterService的時候,會把subscriber_address發送過去。在statestored端,會根據這個subscriber_address生成對應的Subscriber對象(表示與該Subscriber捆綁的backend)。把與該backend綁定的Subscriber加入StateStore.subscribers_這個map里。每個Subscriber有個唯一的id,這樣分布在集群內的impala backend就有了全局唯一id了。
這樣如果以后某個backend/StateStoreSubscriber fail或者其中運行的SQL任務出了問題,在statestored這里就會有體現了,那么就會通知給其他相關的backend。
那么每個backend是怎么update的呢?StateStore::UpdateLoop()負責定期向各個backend推送其所訂閱的service的所有成員的更新,目前的更新策略是全量更新,未來會考慮增量更新。
Impalad進程的服務被wrapper在ImpalaServer這個類中。ImpalaServer包括fe和be的功能,實現了ImpalaService(Beeswax), ImpalaHiveServer2Service(HiveServer2)和ImpalaInternelService API。
全局函數CreateImpalaServer()創建了一個ImpalaServer其中包含了多個ThriftServer:
(1)創建一個名為beeswax_server的ThriftServer對系統外提供ImpalaService(Beeswax)服務,主要服務于Query查詢,是fe/frontend的核心服務,端口21000
(2)創建一個名為hs2_server的ThriftServer對系統外提供ImpalaHiveServer2Service服務,提供Query, DML, DDL相關操作,端口21050
(3)創建一個名為be_server的ThriftServer對系統內其他impalad提供ImpalaInternalService,端口22000
(4)創建ImpalaServer對象,前面三個ThriftServer的TProcessor被賦值這個ImpalaServer對象,所以對前面三個Thrift服務的RPC請求都交由這個ImpalaServer對象處理。最典型的例子就是我們通過Beeswax接口提交了一個BeeswaxService.query()請求,在impalad端的處理邏輯是由void ImpalaServer::query(QueryHandle& query_handle, const Query& query)這個函數(在impala-beeswax-server.cc中實現)完成的。
下面是impalad-main.cc的主函數:
int main(int argc, char** argv) { //參數解析,開啟日志(基于Google gflags和glog) InitDaemon(argc, argv); LlvmCodeGen::InitializeLlvm(); // Enable Kerberos security if requested. if (!FLAGS_principal.empty()) { EXIT_IF_ERROR(InitKerberos("Impalad")); } //因為frontend, HBase等相關組件是由Java開發的,所以下面這幾行都是初始化JNI相關的reference和method id JniUtil::InitLibhdfs(); EXIT_IF_ERROR(JniUtil::Init()); EXIT_IF_ERROR(HBaseTableScanner::Init()); EXIT_IF_ERROR(HBaseTableCache::Init()); InitFeSupport(); //ExecEnv類是impalad backend上Query/PlanFragment的執行環境。 //生成SubscriptionManager, SimpleScheduler和各種Cache ExecEnv exec_env; //生成Beeswax, hive-server2和backend三種ThriftServer用于接收client請求,不過這三種服務的后端真正的處理邏輯都是ImpalaServer* server這個對象。 ThriftServer* beeswax_server = NULL; ThriftServer* hs2_server = NULL; ThriftServer* be_server = NULL; ImpalaServer* server = CreateImpalaServer(&exec_env, FLAGS_fe_port, FLAGS_hs2_port, FLAGS_be_port, &beeswax_server, &hs2_server, &be_server); //因為be_server是對系統內提供服務的,先啟動它。 be_server->Start(); //這里面關鍵是啟動了SubscriptionManager和Scheduler Status status = exec_env.StartServices(); if (!status.ok()) { LOG(ERROR) << "Impalad services did not start correctly, exiting"; ShutdownLogging(); exit(1); } // register be service *after* starting the be server thread and after starting // the subscription mgr handler thread scoped_ptr cb; if (FLAGS_use_statestore) { THostPort host_port; host_port.port = FLAGS_be_port; host_port.ipaddress = FLAGS_ipaddress; host_port.hostname = FLAGS_hostname; //注冊這個be服務到statestored,整個集群里所有的be服務組成一個group,這樣以后來了Query請求就可以在各個backend之間dispatch了。 Status status = exec_env.subscription_mgr()->RegisterService(IMPALA_SERVICE_ID, host_port); unordered_set services; services.insert(IMPALA_SERVICE_ID); //注冊callback函數,每當StateStoreSubscriber接收到來自statestored的update之后調用該函數。 cb.reset(new SubscriptionManager::UpdateCallback( bind(mem_fn(&ImpalaServer::MembershipCallback), server, _1))); exec_env.subscription_mgr()->RegisterSubscription(services, "impala.server", cb.get()); if (!status.ok()) { LOG(ERROR) << "Could not register with state store service: " << status.GetErrorMsg(); ShutdownLogging(); exit(1); } } // this blocks until the beeswax and hs2 servers terminate //前面對內服務的be_server已經成功啟動,下面啟動對外服務的beeswax_server和hs2_server beeswax_server->Start(); hs2_server->Start(); beeswax_server->Join(); hs2_server->Join(); delete be_server; delete beeswax_server; delete hs2_server; }
exec_env.StartServices()調用SubscriptionManager.Start(),進一步調用StateStoreSubscriber.Start()啟動一個ThriftServer。
StateStoreSubscriber實現了StateStoreSubscriberService(StateStoreSubscriberService.thrift中定義),用于接收來自statestored的update,并把與這個StateStoreSubscriber捆綁的backend的update反饋給statestored。這樣這個backend就可以對其他backend可見,這樣就可以接受其他impala backend發來的任務更新了(當然,接收backend更新是通過statestored中轉的)。
參考文獻:
http://www.sizeofvoid.net/wp-content/uploads/ImpalaIntroduction2.pdf
原文地址:Impala源代碼分析(1)-Impala架構和RPC, 感謝原作者分享。
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com