<span id="mktg5"></span>

<i id="mktg5"><meter id="mktg5"></meter></i>

        <label id="mktg5"><meter id="mktg5"></meter></label>
        最新文章專題視頻專題問答1問答10問答100問答1000問答2000關鍵字專題1關鍵字專題50關鍵字專題500關鍵字專題1500TAG最新視頻文章推薦1 推薦3 推薦5 推薦7 推薦9 推薦11 推薦13 推薦15 推薦17 推薦19 推薦21 推薦23 推薦25 推薦27 推薦29 推薦31 推薦33 推薦35 推薦37視頻文章20視頻文章30視頻文章40視頻文章50視頻文章60 視頻文章70視頻文章80視頻文章90視頻文章100視頻文章120視頻文章140 視頻2關鍵字專題關鍵字專題tag2tag3文章專題文章專題2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章專題3
        問答文章1 問答文章501 問答文章1001 問答文章1501 問答文章2001 問答文章2501 問答文章3001 問答文章3501 問答文章4001 問答文章4501 問答文章5001 問答文章5501 問答文章6001 問答文章6501 問答文章7001 問答文章7501 問答文章8001 問答文章8501 問答文章9001 問答文章9501
        當前位置: 首頁 - 科技 - 知識百科 - 正文

        Impala源代碼分析(1)-Impala架構和RPC

        來源:懂視網 責編:小采 時間:2020-11-09 13:24:14
        文檔

        Impala源代碼分析(1)-Impala架構和RPC

        Impala源代碼分析(1)-Impala架構和RPC:Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。 Client: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrif
        推薦度:
        導讀Impala源代碼分析(1)-Impala架構和RPC:Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。 Client: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrif

        Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。 Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrift的client,連接到impala

        Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。

        Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrift的client,連接到impalad的21000端口。

        Impalad: 分為frontend和backend兩部分,這個進程有三個ThriftServer(beeswax_server, hs2_server, be_server)對系統外和系統內提供服務。

        Statestored: 集群內各個backend service的數據交換中心,每個backend會在statestored注冊,以后statestored會與所有注冊過的backend交換update消息。

        RPC

        Component Service Port Access Requirement Comment
        ImpalaDaemon Impala Daemon Backend Port 22000 Internal ImpalaBackendService export
        Impala Daemon Frontend Port 21000 External ImpalaService export
        Impala Daemon HTTP Server Port 25000 External Impala debug web server
        StateStoreSubscriber Service Port 23000 Internal StateStoreSubscriberService
        ?ImpalaStateStore Daemon StateStore HTTP Server Port 25010 External StateStore debug web server
        StateStore Service Port 24000 Internal StateStoreService export

        下面介紹三個組件之間的Thrift RPC(“<->”前面的表示RPC client,“<->”后面的表示RPC server)

        (1)Client <-> impalad(frontend)

        BeeswaxService(beeswax.thrift): client通過query()提交SQL請求,然后異步調用get_state()監聽該SQL的查詢進度,一旦完成,調用fetch()取回結果。

        TCLIService(cli_service.thrift): client提交SQL請求,功能和上面類似,更豐富的就是對DDL操作的支持,例如GetTables()返回指定table的元數據。

        ImpalaService和ImpalaHiveServer2Service(ImpalaService.thrift)分別是上面兩個類的子類,各自豐富了點功能而已,核心功能沒啥大變化。

        (2)Impalad(backend) <-> statestored

        StateStoreService(StateStoreService.thrift): statestored保存整個系統所有backend service狀態的全局數據庫,這里是個單節點中央數據交換中心(該節點保存的狀態是soft state,一旦宕機,保存的狀態信息就沒了)。例如每個impala backend啟動的時候會調用StateStoreService.RegisterService()向statestored注冊自己(其實是通過跟這個backend service捆綁在一起的StateStoreSubscriber標識的),然后再調用StateStoreService.RegisterSubscription()表明這個StateStoreSubscriber接收來自statestored的update。

        (3)Statestord <-> impalad(backend)

        StateStoreSubscriberService(StateStoreSubscriberService.thrift): backend向statestored調用RegisterSubscription之后,statestored就會定期向backend這邊捆綁的StateStoreSubscriber發送該backend的狀態更新信息。然后backend這邊調用StateStoreSubscriberService.UpdateState()更新相關狀態。同時這個UpdateState()調用在impalad backend/StateStoreSubscriber這端還會返回該backend的一些update信息給statestored。

        (4)Impalad(backend) <-> other impalad(backend) (這兩個是互為client/server的)

        ImpalaInternalService(ImpalaInternalService.thrift):某個backend的coordinator要向其他backend的execute engine發送執行某個plan fragment的請求(提交ExecPlanFragment并要求返回ReportExecStatus)。這部分功能會在backend分析中詳細討論。

        (5)Impalad backend <-> other frontend

        ImpalaPlanService(ImpalaPlanService.thrift):可以由其他形式的frontend生成TExecRequest然后交給backend執行。

        另外,Impala frontend是用Java寫的,而backend使用C++寫的。Frontend負責把輸入的SQL解析,然后生成執行計劃,之后通過Thrift的序列化/反序列化的方式傳給backend。TExecRequest(frontend.thrift)是中間傳輸的數據結構,表示了一個Query/DML/DDL的查詢請求,也是SQL執行過程中在frontend和backend之間的數據接口。所以我們可以把impala-frontend換掉,用其他的形式拼湊出這個TExecRequest就可以傳給backend執行,這也就是前面說的ImpalaPlanService干的事。

        impala組件執行流程

        1, impala-shell

        client就可以通過Beeswax和HiveServer2的Thrift API向Impala提交query。這兩種訪問接口的作用是一樣的(都是用于client提交query,返回query result)。

        Impala_shell.py是通過Beeswax方式訪問impala的,下面我們看看impala_shell.py是怎么向impalad提交query的。

        (1)通過OptionParser()解析命令行參數。如果參數中有—query或者—query_file,則執行execute_queries_non_interactive_mode(options),這是非交互查詢(也就是就查詢一個SQL或者一個寫滿SQL的文件);否則進入ImpalaShell.cmdloop (intro)循環。

        (2)進入命令行循環后,一般是先connect某一個impalad,輸入”connect localhost:21000”,進入do_connect(self, args)函數。這個函數根據用戶指定的host和port,生成與相應的impalad的socket連接。最重要的就是這行代碼:

        self.imp_service = ImpalaService.Client(protocol)

        至此imp_service就是client端的代理了,所有請求都通過它提交。

        (3)下面以select命令為例說明,如果client輸入這樣的命令”select col1, col2 from tbl”,則進入do_select(self, args)函數。在這個函數里首先生成BeeswaxService.Query對象,向這個對象填充query statement和configuration。然后進入__query_with_result()函數通過imp_service.query(query)提交query。注意ImpalaService都是異步的,提交之后返回一個QueryHandle,然后就是在一個while循環里不斷__get_query_state()查詢狀態。如果發現這個SQL的狀態是FINISHED,那么就通過fetch() RPC獲取結果。

        2, statestored

        Statestored進程對外提供StateStoreService RPC服務,而StateStoreSubscriberService RPC服務是在impalad進程中提供的。StateStoreService這個RPC的邏輯實現是在StateStore這個類里面實現的。

        Statestored收到backend發送的RegisterService RPC請求時,調用StateStore::RegisterService()處理,主要做兩件事:

        (1)根據TRegisterServiceRequest提供的service_id把該service加入StateStore.service_instances_。

        通常在整個impala集群只存在名為“impala_backend_service”這一個服務,所以service_id=”impala_backend_service”。而每個backend捆綁的是不一樣的,所以就形成了service和backend一對多的關系,這個關系存儲在StateStore.service_instances_組。

        (2)Impalad backend在向statestored RegisterService的時候,會把subscriber_address發送過去。在statestored端,會根據這個subscriber_address生成對應的Subscriber對象(表示與該Subscriber捆綁的backend)。把與該backend綁定的Subscriber加入StateStore.subscribers_這個map里。每個Subscriber有個唯一的id,這樣分布在集群內的impala backend就有了全局唯一id了。

        這樣如果以后某個backend/StateStoreSubscriber fail或者其中運行的SQL任務出了問題,在statestored這里就會有體現了,那么就會通知給其他相關的backend。

        那么每個backend是怎么update的呢?StateStore::UpdateLoop()負責定期向各個backend推送其所訂閱的service的所有成員的更新,目前的更新策略是全量更新,未來會考慮增量更新。

        3, impalad

        Impalad進程的服務被wrapper在ImpalaServer這個類中。ImpalaServer包括fe和be的功能,實現了ImpalaService(Beeswax), ImpalaHiveServer2Service(HiveServer2)和ImpalaInternelService API。

        全局函數CreateImpalaServer()創建了一個ImpalaServer其中包含了多個ThriftServer:

        (1)創建一個名為beeswax_server的ThriftServer對系統外提供ImpalaService(Beeswax)服務,主要服務于Query查詢,是fe/frontend的核心服務,端口21000

        (2)創建一個名為hs2_server的ThriftServer對系統外提供ImpalaHiveServer2Service服務,提供Query, DML, DDL相關操作,端口21050

        (3)創建一個名為be_server的ThriftServer對系統內其他impalad提供ImpalaInternalService,端口22000

        (4)創建ImpalaServer對象,前面三個ThriftServer的TProcessor被賦值這個ImpalaServer對象,所以對前面三個Thrift服務的RPC請求都交由這個ImpalaServer對象處理。最典型的例子就是我們通過Beeswax接口提交了一個BeeswaxService.query()請求,在impalad端的處理邏輯是由void ImpalaServer::query(QueryHandle& query_handle, const Query& query)這個函數(在impala-beeswax-server.cc中實現)完成的。

        下面是impalad-main.cc的主函數:

        int main(int argc, char** argv) {
         //參數解析,開啟日志(基于Google gflags和glog)
         InitDaemon(argc, argv);
         LlvmCodeGen::InitializeLlvm();
         // Enable Kerberos security if requested.
         if (!FLAGS_principal.empty()) {
         EXIT_IF_ERROR(InitKerberos("Impalad"));
         }
         //因為frontend, HBase等相關組件是由Java開發的,所以下面這幾行都是初始化JNI相關的reference和method id
         JniUtil::InitLibhdfs();
         EXIT_IF_ERROR(JniUtil::Init());
         EXIT_IF_ERROR(HBaseTableScanner::Init());
         EXIT_IF_ERROR(HBaseTableCache::Init());
         InitFeSupport();
         //ExecEnv類是impalad backend上Query/PlanFragment的執行環境。
         //生成SubscriptionManager, SimpleScheduler和各種Cache
         ExecEnv exec_env;
         //生成Beeswax, hive-server2和backend三種ThriftServer用于接收client請求,不過這三種服務的后端真正的處理邏輯都是ImpalaServer* server這個對象。
         ThriftServer* beeswax_server = NULL;
         ThriftServer* hs2_server = NULL;
         ThriftServer* be_server = NULL;
         ImpalaServer* server =
         CreateImpalaServer(&exec_env, FLAGS_fe_port, FLAGS_hs2_port, FLAGS_be_port,
         &beeswax_server, &hs2_server, &be_server);
         //因為be_server是對系統內提供服務的,先啟動它。
         be_server->Start();
         //這里面關鍵是啟動了SubscriptionManager和Scheduler
         Status status = exec_env.StartServices();
         if (!status.ok()) {
         LOG(ERROR) << "Impalad services did not start correctly, exiting";
         ShutdownLogging();
         exit(1);
         }
         // register be service *after* starting the be server thread and after starting
         // the subscription mgr handler thread
         scoped_ptr cb;
         if (FLAGS_use_statestore) {
         THostPort host_port;
         host_port.port = FLAGS_be_port;
         host_port.ipaddress = FLAGS_ipaddress;
         host_port.hostname = FLAGS_hostname;
         //注冊這個be服務到statestored,整個集群里所有的be服務組成一個group,這樣以后來了Query請求就可以在各個backend之間dispatch了。
         Status status =
         exec_env.subscription_mgr()->RegisterService(IMPALA_SERVICE_ID, host_port);
         unordered_set services;
         services.insert(IMPALA_SERVICE_ID);
         //注冊callback函數,每當StateStoreSubscriber接收到來自statestored的update之后調用該函數。
         cb.reset(new SubscriptionManager::UpdateCallback(
         bind(mem_fn(&ImpalaServer::MembershipCallback), server, _1)));
         exec_env.subscription_mgr()->RegisterSubscription(services, "impala.server",
         cb.get());
         if (!status.ok()) {
         LOG(ERROR) << "Could not register with state store service: "
         << status.GetErrorMsg(); ShutdownLogging(); exit(1); } } // this blocks until the beeswax and hs2 servers terminate //前面對內服務的be_server已經成功啟動,下面啟動對外服務的beeswax_server和hs2_server beeswax_server->Start();
         hs2_server->Start();
         beeswax_server->Join();
         hs2_server->Join();
         delete be_server;
         delete beeswax_server;
         delete hs2_server;
        }
        

        exec_env.StartServices()調用SubscriptionManager.Start(),進一步調用StateStoreSubscriber.Start()啟動一個ThriftServer。

        StateStoreSubscriber實現了StateStoreSubscriberService(StateStoreSubscriberService.thrift中定義),用于接收來自statestored的update,并把與這個StateStoreSubscriber捆綁的backend的update反饋給statestored。這樣這個backend就可以對其他backend可見,這樣就可以接受其他impala backend發來的任務更新了(當然,接收backend更新是通過statestored中轉的)。

        參考文獻:

        http://www.sizeofvoid.net/wp-content/uploads/ImpalaIntroduction2.pdf

        聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

        文檔

        Impala源代碼分析(1)-Impala架構和RPC

        Impala源代碼分析(1)-Impala架構和RPC:Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。 Client: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrif
        推薦度:
        標簽: 源代碼 分析 rpc
        • 熱門焦點

        最新推薦

        猜你喜歡

        熱門推薦

        專題
        Top
        主站蜘蛛池模板: 一区二区三区视频免费观看| 亚洲精品不卡视频| 波多野结衣亚洲一级| 精品国产污污免费网站| 老司机亚洲精品影视www| 男男黄GAY片免费网站WWW| 成人性生活免费视频| 亚洲午夜精品一区二区公牛电影院 | 亚洲激情校园春色| 精品国产麻豆免费人成网站| 亚洲中文字幕在线第六区| 日本中文字幕免费看| 免费播放特黄特色毛片| 国产成人综合亚洲| 国产乱子伦精品免费无码专区| 亚洲欧美成人一区二区三区| 亚洲欧洲免费无码| 亚洲av中文无码乱人伦在线观看| 麻花传媒剧在线mv免费观看| 亚洲综合综合在线| 日本片免费观看一区二区| 亚洲国产成人精品青青草原| 在线精品一卡乱码免费| 亚洲国产成人九九综合| 免费看国产精品3a黄的视频| 亚洲日韩中文字幕一区| 特级淫片国产免费高清视频| 99亚洲乱人伦aⅴ精品| 国产小视频免费观看| 美女露隐私全部免费直播| 免费一级大黄特色大片| selaoban在线视频免费精品| 亚洲熟妇丰满多毛XXXX| 午夜免费福利视频| 亚洲免费在线视频播放| 日本成人免费在线| 一级毛片免费不卡直观看| 情人伊人久久综合亚洲| 免费成人福利视频| 美女免费视频一区二区三区| 国产亚洲精品线观看动态图|