基于GitHub公開(kāi)事件數(shù)據(jù)集的離線實(shí)時(shí)一體化實(shí)踐
本文為您介紹如何基于GitHub實(shí)時(shí)事件數(shù)據(jù)通過(guò)MaxCompute構(gòu)建離線數(shù)倉(cāng)、通過(guò)Flink和Hologres構(gòu)建實(shí)時(shí)數(shù)倉(cāng),然后通過(guò)Hologres和MaxCompute分別進(jìn)行實(shí)時(shí)與離線數(shù)據(jù)分析,從而實(shí)現(xiàn)實(shí)時(shí)離線一體化解決方案。
背景信息
隨著社會(huì)數(shù)字化發(fā)展,企業(yè)對(duì)數(shù)據(jù)時(shí)效性的需求越來(lái)越強(qiáng)烈。除傳統(tǒng)的面向海量數(shù)據(jù)加工場(chǎng)景設(shè)計(jì)的離線場(chǎng)景外,大量業(yè)務(wù)需要解決面向?qū)崟r(shí)加工、實(shí)時(shí)存儲(chǔ)、實(shí)時(shí)分析的實(shí)時(shí)場(chǎng)景問(wèn)題,為了應(yīng)對(duì)這樣的情形,提出了離線實(shí)時(shí)一體化的概念。
實(shí)時(shí)離線一體化是指將實(shí)時(shí)數(shù)據(jù)和離線數(shù)據(jù)在同一平臺(tái)上管理和處理的技術(shù)。它能夠?qū)崿F(xiàn)實(shí)時(shí)數(shù)據(jù)處理和離線數(shù)據(jù)分析的無(wú)縫銜接,從而提高數(shù)據(jù)分析效率和精度。其優(yōu)勢(shì)在于:
提高數(shù)據(jù)處理效率:將實(shí)時(shí)數(shù)據(jù)和離線數(shù)據(jù)整合在同一平臺(tái)上,大大提高了數(shù)據(jù)處理效率,降低數(shù)據(jù)傳輸和轉(zhuǎn)換成本。
提高數(shù)據(jù)分析精度:將實(shí)時(shí)數(shù)據(jù)和離線數(shù)據(jù)進(jìn)行混合分析,從而提高數(shù)據(jù)分析精度和準(zhǔn)確性。
降低系統(tǒng)復(fù)雜度:減少數(shù)據(jù)管理和處理的復(fù)雜度,使數(shù)據(jù)管理和處理更加簡(jiǎn)單和高效。
提高數(shù)據(jù)應(yīng)用價(jià)值:更加充分地發(fā)揮數(shù)據(jù)的應(yīng)用價(jià)值,為企業(yè)提供更好的決策支持。
阿里云在此方向上進(jìn)行了諸多方案設(shè)計(jì),推出了化繁為簡(jiǎn)的實(shí)時(shí)離線一體化數(shù)倉(cāng),通過(guò)大數(shù)據(jù)計(jì)算服務(wù)MaxCompute和實(shí)時(shí)數(shù)倉(cāng)Hologres分別對(duì)應(yīng)上述的離線與實(shí)時(shí)場(chǎng)景,同時(shí)匹配Flink的實(shí)時(shí)加工能力,共同構(gòu)成阿里云一體化數(shù)倉(cāng)的核心引擎組件。
方案架構(gòu)
使用MaxCompute和Hologres對(duì)GitHub公開(kāi)事件數(shù)據(jù)集進(jìn)行實(shí)時(shí)離線一體化實(shí)踐的完整鏈路圖如下所示。
其中ECS將GitHub實(shí)時(shí)與離線事件數(shù)據(jù)收集匯總后作為數(shù)據(jù)源,分別進(jìn)入實(shí)時(shí)鏈路與離線鏈路,最后兩條鏈路數(shù)據(jù)匯總到Hologres,統(tǒng)一對(duì)外提供服務(wù)。
實(shí)時(shí)鏈路:通過(guò)Flink對(duì)日志服務(wù)中的數(shù)據(jù)實(shí)時(shí)加工并寫(xiě)入Hologres。Flink是強(qiáng)大的流式計(jì)算引擎,Hologres支持?jǐn)?shù)據(jù)實(shí)時(shí)寫(xiě)入與更新、寫(xiě)入即可查,二者原生集成,支持高吞吐、低延時(shí)、有模型、高質(zhì)量的實(shí)時(shí)數(shù)倉(cāng)開(kāi)發(fā),最終滿足業(yè)務(wù)洞察實(shí)時(shí)性需求,如最新事件提取、熱點(diǎn)事件分析等場(chǎng)景。
離線鏈路:通過(guò)MaxCompute對(duì)海量離線數(shù)據(jù)進(jìn)行處理并歸檔。阿里云OSS(Object Storage Service)是阿里云提供的云存儲(chǔ)服務(wù),可以用于存儲(chǔ)各類數(shù)據(jù),本次實(shí)踐引用的原始數(shù)據(jù)是JSON格式,OSS可以提供方便、安全、低成本、可靠的存儲(chǔ)能力。MaxCompute是適用于數(shù)據(jù)分析場(chǎng)景的企業(yè)級(jí)SaaS(Software as a Service)模式云數(shù)據(jù)倉(cāng)庫(kù),可以直接通過(guò)外表的方式讀取并解析OSS中的半結(jié)構(gòu)化數(shù)據(jù),將高價(jià)值可用數(shù)據(jù)集成至MaxCompute內(nèi)部存儲(chǔ),然后結(jié)合DataWorks進(jìn)行數(shù)據(jù)開(kāi)發(fā),生成離線數(shù)據(jù)倉(cāng)庫(kù)。
Hologres與MaxCompute底層無(wú)縫打通,因此可以通過(guò)Hologres對(duì)MaxCompute海量歷史數(shù)據(jù)進(jìn)行加速查詢分析,滿足業(yè)務(wù)對(duì)歷史數(shù)據(jù)的低頻高性能查詢需求。還可以輕松實(shí)現(xiàn)通過(guò)離線鏈路對(duì)實(shí)時(shí)數(shù)據(jù)的修正,解決實(shí)時(shí)鏈路中可能出現(xiàn)的數(shù)據(jù)遺漏等問(wèn)題。
該方案優(yōu)勢(shì)如下:
離線鏈路穩(wěn)定高效:支持?jǐn)?shù)據(jù)小時(shí)級(jí)寫(xiě)入更新,可以批量處理大規(guī)模數(shù)據(jù),進(jìn)行復(fù)雜的計(jì)算和分析,降低計(jì)算成本,提高數(shù)據(jù)處理效率。
實(shí)時(shí)鏈路成熟:支持實(shí)時(shí)寫(xiě)入、實(shí)時(shí)事件計(jì)算、實(shí)時(shí)分析,實(shí)時(shí)鏈路簡(jiǎn)化,數(shù)據(jù)秒級(jí)響應(yīng)。
統(tǒng)一存儲(chǔ)與服務(wù):均由Hologres對(duì)外提供服務(wù),數(shù)據(jù)集中存儲(chǔ),對(duì)外接口一致(OLAP、KeyValue統(tǒng)一為SQL接口)。
實(shí)時(shí)離線融合:數(shù)據(jù)冗余少、移動(dòng)少,數(shù)據(jù)可修正。
通過(guò)一站式開(kāi)發(fā),最終實(shí)現(xiàn)數(shù)據(jù)秒級(jí)響應(yīng),全鏈路狀態(tài)可見(jiàn),架構(gòu)組件少、依賴少,運(yùn)維成本、人工成本均有效降低。
業(yè)務(wù)與數(shù)據(jù)認(rèn)知
大量開(kāi)發(fā)人員在GitHub上進(jìn)行開(kāi)源項(xiàng)目的開(kāi)發(fā)工作,并在項(xiàng)目的開(kāi)發(fā)過(guò)程中產(chǎn)生海量事件。GitHub會(huì)記錄每次事件的類型及詳情、開(kāi)發(fā)者、代碼倉(cāng)庫(kù)等信息,并開(kāi)放其中的公開(kāi)事件,包括加星標(biāo)、提交代碼等,具體事件類型請(qǐng)參見(jiàn)Webhook events and payloads。
GitHub通過(guò)OpenAPI公布其公開(kāi)事件,API僅開(kāi)放5分鐘前的實(shí)時(shí)事件,詳情請(qǐng)參見(jiàn)Events。該API可用于獲取實(shí)時(shí)數(shù)據(jù)。
GH Archive項(xiàng)目則是將GitHub公開(kāi)事件按小時(shí)進(jìn)行匯總,并允許開(kāi)發(fā)人員訪問(wèn),項(xiàng)目具體信息請(qǐng)參見(jiàn)GH Archive。該項(xiàng)目可用于獲取離線數(shù)據(jù)。
GitHub業(yè)務(wù)認(rèn)知
Github的業(yè)務(wù)核心為管理代碼與互動(dòng)交流,主要涉及三個(gè)一級(jí)實(shí)體對(duì)象:開(kāi)發(fā)者(Developer)、代碼倉(cāng)庫(kù)(Repository)和組織(Organization)。
在本次Github公開(kāi)事件數(shù)據(jù)分析中,事件
作為一個(gè)實(shí)體對(duì)象被存儲(chǔ)和記錄下來(lái)。
原始公開(kāi)事件數(shù)據(jù)認(rèn)知
某原始事件JSON編碼數(shù)據(jù)示例如下:
{
"id": "19541192931",
"type": "WatchEvent",
"actor":
{
"id": 23286640,
"login": "herekeo",
"display_login": "herekeo",
"gravatar_id": "",
"url": "https://api.github.com/users/herekeo",
"avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
},
"repo":
{
"id": 52760178,
"name": "crazyguitar/pysheeet",
"url": "https://api.github.com/repos/crazyguitar/pysheeet"
},
"payload":
{
"action": "started"
},
"public": true,
"created_at": "2022-01-01T00:03:04Z"
}
本分析實(shí)踐涉及15類公開(kāi)事件(不包含未出現(xiàn)及不再記錄的事件),詳細(xì)的事件類型及描述請(qǐng)參見(jiàn)Github公開(kāi)事件類型。
前提條件
已創(chuàng)建云服務(wù)器ECS實(shí)例并綁定彈性公網(wǎng)IP,用于提取GitHub API中的實(shí)時(shí)事件數(shù)據(jù),詳情請(qǐng)參見(jiàn)創(chuàng)建方式導(dǎo)航和綁定和解綁彈性公網(wǎng)IP。
已開(kāi)通對(duì)象存儲(chǔ)OSS并在ECS中安裝ossutil工具,用于存儲(chǔ)GH Archive提供的JSON數(shù)據(jù)文件,詳情請(qǐng)參見(jiàn)開(kāi)通OSS服務(wù)和安裝ossutil。
已開(kāi)通大數(shù)據(jù)計(jì)算服務(wù)MaxCompute并創(chuàng)建Project,詳情請(qǐng)參見(jiàn)創(chuàng)建MaxCompute項(xiàng)目。
已開(kāi)通大數(shù)據(jù)開(kāi)發(fā)治理平臺(tái)DataWorks并創(chuàng)建工作空間,用于創(chuàng)建離線調(diào)度任務(wù),詳情請(qǐng)參見(jiàn)創(chuàng)建工作空間。
已開(kāi)通日志服務(wù)SLS并創(chuàng)建Project和Logstore,用于將ECS提取到的數(shù)據(jù)作為日志進(jìn)行收集,詳情請(qǐng)參見(jiàn)快速入門(mén)。
已開(kāi)通實(shí)時(shí)計(jì)算Flink實(shí)例,用于將SLS收集的日志數(shù)據(jù)實(shí)時(shí)寫(xiě)入Hologres,詳情請(qǐng)參見(jiàn)開(kāi)通實(shí)時(shí)計(jì)算Flink版。
已開(kāi)通實(shí)時(shí)數(shù)倉(cāng)Hologres,詳情請(qǐng)參見(jiàn)購(gòu)買(mǎi)Hologres。
離線數(shù)據(jù)倉(cāng)庫(kù)建設(shè)(小時(shí)級(jí)更新)
通過(guò)ECS下載原始數(shù)據(jù)文件并上傳至OSS
ECS用例用于下載GH Archive提供的JSON數(shù)據(jù)文件,對(duì)于歷史數(shù)據(jù)可通過(guò)wget
命令下載,例如wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gz
下載2012年到2022年每個(gè)小時(shí)的數(shù)據(jù);對(duì)于未來(lái)每小時(shí)產(chǎn)生的新數(shù)據(jù),可以通過(guò)如下步驟設(shè)置小時(shí)級(jí)定時(shí)任務(wù)下載。
使用如下命令創(chuàng)建名稱為
download_code.sh
的文件:vim download_code.sh
在文件內(nèi)輸入
i
后進(jìn)入編輯模式,添加如下示例腳本命令:說(shuō)明請(qǐng)確保已在ECS中安裝ossutil工具,詳情請(qǐng)參見(jiàn)安裝ossutil。本示例對(duì)應(yīng)的OSS Bucket名稱為
githubevents
。d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H') h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H') url=https://data.gharchive.org/$z68uejxpaoma.json.gz echo ${url} wget ${url} -P ./gh_data/ cd gh_data gzip -d $z68uejxpaoma.json echo $z68uejxpaoma.json #使用ossutil工具上傳數(shù)據(jù)至OSS cd /root ./ossutil64 mkdir oss://githubevents/hr=${h} ./ossutil64 cp -r /hourlydata/gh_data oss://githubevents/hr=${h} -u echo oss uploaded successfully! rm -rf /hourlydata/gh_data/$z68uejxpaoma.json echo ecs deleted!
按Esc鍵,輸入
:wq
并回車以保存并關(guān)閉文件。使用如下命令設(shè)置每小時(shí)的第10分鐘執(zhí)行
download_code.sh
腳本文件。crontab -e 10 * * * * cd /hourlydata && sh download_code.sh > download.log
執(zhí)行后每個(gè)小時(shí)的第10分鐘會(huì)下載前一個(gè)小時(shí)的JSON文件,在ECS解壓后上傳至OSS中(路徑為
oss://githubevents
)。為了之后每次只讀取前一個(gè)小時(shí)的文件,在上傳文件時(shí)對(duì)每個(gè)文件建一個(gè)名稱為‘hr=%Y-%M-%D-%H’
的目錄作為分區(qū),之后每次寫(xiě)入數(shù)據(jù)只讀取最新分區(qū)下的文件。
通過(guò)外部表將OSS數(shù)據(jù)導(dǎo)入MaxCompute
請(qǐng)?jiān)贛axCompute客戶端或DataWorks中的ODPS SQL節(jié)點(diǎn)執(zhí)行如下命令,詳情請(qǐng)參見(jiàn)使用本地客戶端(odpscmd)連接或開(kāi)發(fā)ODPS SQL任務(wù)。
創(chuàng)建用于轉(zhuǎn)換OSS中存儲(chǔ)的JSON文件的外部表
githubevents
:CREATE EXTERNAL TABLE IF NOT EXISTS githubevents ( col STRING ) PARTITIONED BY ( hr STRING ) STORED AS textfile LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/' ;
MaxCompute中創(chuàng)建OSS外部表詳情請(qǐng)參見(jiàn)創(chuàng)建OSS外部表。
創(chuàng)建用于存儲(chǔ)數(shù)據(jù)的事實(shí)表
dwd_github_events_odps
,其DDL如下:CREATE TABLE IF NOT EXISTS dwd_github_events_odps ( id BIGINT COMMENT '事件ID' ,actor_id BIGINT COMMENT '事件發(fā)起人ID' ,actor_login STRING COMMENT '事件發(fā)起人登錄名' ,repo_id BIGINT COMMENT 'repoID' ,repo_name STRING COMMENT 'repo全名:owner/Repository_name' ,org_id BIGINT COMMENT 'repo所屬組織ID' ,org_login STRING COMMENT 'repo所屬組織名稱' ,`type` STRING COMMENT '事件類型' ,created_at DATETIME COMMENT '事件發(fā)生時(shí)間' ,action STRING COMMENT '事件行為' ,iss_or_pr_id BIGINT COMMENT 'issue/pull_request ID' ,number BIGINT COMMENT 'issue/pull_request 序號(hào)' ,comment_id BIGINT COMMENT 'comment(評(píng)論) ID' ,commit_id STRING COMMENT 'commit(提交記錄) ID' ,member_id BIGINT COMMENT '成員ID' ,rev_or_push_or_rel_id BIGINT COMMENT 'review/push/release ID' ,ref STRING COMMENT '創(chuàng)建/刪除的資源名稱' ,ref_type STRING COMMENT '創(chuàng)建/刪除的資源類型' ,state STRING COMMENT 'issue/pull_request/pull_request_review的狀態(tài)' ,author_association STRING COMMENT 'actor與repo之間的關(guān)系' ,language STRING COMMENT '請(qǐng)求合并代碼的語(yǔ)言' ,merged BOOLEAN COMMENT '是否接受合并' ,merged_at DATETIME COMMENT '代碼合并時(shí)間' ,additions BIGINT COMMENT '代碼增加行數(shù)' ,deletions BIGINT COMMENT '代碼減少行數(shù)' ,changed_files BIGINT COMMENT 'pull request 改變文件數(shù)量' ,push_size BIGINT COMMENT '提交數(shù)量' ,push_distinct_size BIGINT COMMENT '不同的提交數(shù)量' ,hr STRING COMMENT '事件發(fā)生所在小時(shí),如00點(diǎn)23分,hr=00' ,`month` STRING COMMENT '事件發(fā)生所在月,如2015年10月,month=2015-10' ,`year` STRING COMMENT '事件發(fā)生所在年,如2015年,year=2015' ) PARTITIONED BY ( ds STRING COMMENT '事件發(fā)生所在日,ds=yyyy-mm-dd' ) ;
將JSON數(shù)據(jù)解析寫(xiě)入事實(shí)表。
使用如下命令引入分區(qū)并進(jìn)行JSON解析寫(xiě)入
dwd_github_events_odps
表中:msck repair table githubevents add partitions; set odps.sql.hive.compatible = true; set odps.sql.split.hive.bridge = true; INSERT into TABLE dwd_github_events_odps PARTITION(ds) SELECT CAST(GET_JSON_OBJECT(col,'$.id') AS BIGINT ) AS id ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id ,GET_JSON_OBJECT(col,'$.org.login') AS org_login ,GET_JSON_OBJECT(col,'$.type') as type ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at ,GET_JSON_OBJECT(col,'$.payload.action') AS action ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) END AS iss_or_pr_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT) END AS number ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT) END AS rev_or_push_or_rel_id ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') END AS state ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') END AS author_association ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT) AS deletions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT) AS push_size ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT) AS push_distinct_size ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'/','-') as month ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'/','-') as ds from githubevents where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);
查詢數(shù)據(jù)。
使用如下命令查詢
dwd_github_events_odps
表數(shù)據(jù):set odps.sql.allow.fullscan=true; select * from dwd_github_events_odps limit 10;
示例返回結(jié)果如下:
實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)建設(shè)
通過(guò)ECS獲取實(shí)時(shí)數(shù)據(jù)
ECS實(shí)例用于從GitHub API中提取實(shí)時(shí)事件數(shù)據(jù)。本文僅以如下腳本為例,展示一種通過(guò)GitHub API采集實(shí)時(shí)數(shù)據(jù)的方法。
該腳本每次運(yùn)行會(huì)執(zhí)行1分鐘,采集這段時(shí)間內(nèi)API提供的實(shí)時(shí)事件數(shù)據(jù),并以JSON格式存儲(chǔ)每個(gè)事件數(shù)據(jù)。
該腳本不保證采集到全部的實(shí)時(shí)事件數(shù)據(jù)。
持續(xù)從GitHub API中采集數(shù)據(jù)需要提供Accept和Authorization。其中Accept為固定值,Authorization需要填寫(xiě)從GitHub中申請(qǐng)的訪問(wèn)令牌。訪問(wèn)令牌的創(chuàng)建方法請(qǐng)參見(jiàn)此處。
使用如下命令創(chuàng)建名稱為
download_realtime_data.py
的文件。vim download_realtime_data.py
在文件內(nèi)輸入
i
后進(jìn)入編輯模式,添加如下示例內(nèi)容。#!python import requests import json import sys import time # 獲取API URL def get_next_link(resp): resp_link = resp.headers['link'] link = '' for l in resp_link.split(', '): link = l.split('; ')[0][1:-1] rel = l.split('; ')[1] if rel == 'rel="next"': return link return None # 采集API中一頁(yè)的數(shù)據(jù) def download(link, fname): # 定義GitHub API的Accept和Authorization headers = {"Accept": "application/vnd.github+json"[, "Authorization": "Bearer <github_api_token>"]} resp = requests.get(link, headers=headers) if int(resp.status_code) != 200: return None with open(fname, 'a') as f: for j in resp.json(): f.write(json.dumps(j)) f.write('\n') print('downloaded {} events to {}'.format(len(resp.json()), fname)) return resp # 采集API中多頁(yè)的數(shù)據(jù) def download_all_data(fname): link = 'https://api.github.com/events?per_page=100&page=1' while True: resp = download(link, fname) if resp is None: break link = get_next_link(resp) if link is None: break # 定義當(dāng)前時(shí)間 def get_current_ms(): return round(time.time()*1000) # 定義腳本每次執(zhí)行時(shí)長(zhǎng)1分鐘 def main(fname): current_ms = get_current_ms() while get_current_ms() - current_ms < 60*1000: download_all_data(fname) time.sleep(0.1) # 執(zhí)行腳本 if __name__ == '__main__': if len(sys.argv) < 2: print('usage: python {} <log_file>'.format(sys.argv[0])) exit(0) main(sys.argv[1])
按Esc鍵,輸入
:wq
并回車以保存并關(guān)閉文件。創(chuàng)建
run_py.sh
文件用于執(zhí)行download_realtime_data.py
并將每次執(zhí)行采集到的數(shù)據(jù)分別存儲(chǔ),內(nèi)容如下。python /root/download_realtime_data.py /root/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json
創(chuàng)建
delete_log.sh
文件用于刪除歷史數(shù)據(jù),內(nèi)容如下。d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d') rm -f /root/gh_realtime_data/*$z68uejxpaoma*.json
使用如下命令每分鐘采集一次GitHub數(shù)據(jù)、每天刪除一次歷史數(shù)據(jù)。
crontab -e * * * * * bash /root/run_py.sh 1 1 * * * bash /root/delete_log.sh
通過(guò)SLS采集ECS數(shù)據(jù)
SLS用于將ECS中提取到的實(shí)時(shí)事件數(shù)據(jù)作為日志進(jìn)行收集。
SLS支持通過(guò)Logtail采集ECS上的日志。由于本文涉及的數(shù)據(jù)為JSON格式,因此可以使用Logtail的JSON模式快速采集ECS中的增量JSON日志,采集方法請(qǐng)參見(jiàn)使用JSON模式采集日志。其中本文定義SLS對(duì)原始數(shù)據(jù)的頂層鍵值對(duì)進(jìn)行解析。
Logtail配置的日志路徑參數(shù)本示例設(shè)置為/root/gh_realtime_data/**/*.json
。
配置完成后,SLS即可持續(xù)完成對(duì)ECS中增量事件數(shù)據(jù)的采集。采集到的數(shù)據(jù)情況示例如下圖。
通過(guò)Flink實(shí)時(shí)寫(xiě)入SLS數(shù)據(jù)至Hologres
Flink用于將SLS采集的日志數(shù)據(jù)實(shí)時(shí)寫(xiě)入Hologres。通過(guò)在Flink中使用SLS源表、Hologres結(jié)果表,即可實(shí)現(xiàn)數(shù)據(jù)從SLS到Hologres的實(shí)時(shí)寫(xiě)入,詳情請(qǐng)參見(jiàn)從SLS日志服務(wù)導(dǎo)入。
創(chuàng)建Hologres內(nèi)部表。
本文創(chuàng)建的內(nèi)部表中只保留了原始JSON數(shù)據(jù)的部分鍵值,并將事件
id
、日期ds
設(shè)為主鍵,將事件id
設(shè)為Distribution Key,將日期ds
設(shè)為分區(qū)鍵,將事件發(fā)生時(shí)間created_at
設(shè)為event_time_column。您可以根據(jù)實(shí)際查詢需求,為其他字段創(chuàng)建索引,以提升查詢效率。索引介紹請(qǐng)參見(jiàn)CREATE TABLE。本次示例建表DDL如下。DROP TABLE IF EXISTS gh_realtime_data; BEGIN; CREATE TABLE gh_realtime_data ( id bigint, actor_id bigint, actor_login text, repo_id bigint, repo_name text, org_id bigint, org_login text, type text, created_at timestamp with time zone NOT NULL, action text, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id text, member_id bigint, rev_or_push_or_rel_id bigint, ref text, ref_type text, state text, author_association text, language text, merged boolean, merged_at timestamp with time zone, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr text, month text, year text, ds text, PRIMARY KEY (id,ds) ) PARTITION BY LIST (ds); CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id'); CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at'); CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at'); COMMENT ON COLUMN public.gh_realtime_data.id IS '事件ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_id IS '事件發(fā)起人ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_login IS '事件發(fā)起人登錄名'; COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'repoID'; COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'repo名稱'; COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'repo所屬組織ID'; COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'repo所屬組織名稱'; COMMENT ON COLUMN public.gh_realtime_data.type IS '事件類型'; COMMENT ON COLUMN public.gh_realtime_data.created_at IS '事件發(fā)生時(shí)間'; COMMENT ON COLUMN public.gh_realtime_data.action IS '事件行為'; COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'issue/pull_request ID'; COMMENT ON COLUMN public.gh_realtime_data.number IS 'issue/pull_request 序號(hào)'; COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'comment(評(píng)論)ID'; COMMENT ON COLUMN public.gh_realtime_data.commit_id IS '提交記錄ID'; COMMENT ON COLUMN public.gh_realtime_data.member_id IS '成員ID'; COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'review/push/release ID'; COMMENT ON COLUMN public.gh_realtime_data.ref IS '創(chuàng)建/刪除的資源名稱'; COMMENT ON COLUMN public.gh_realtime_data.ref_type IS '創(chuàng)建/刪除的資源類型'; COMMENT ON COLUMN public.gh_realtime_data.state IS 'issue/pull_request/pull_request_review的狀態(tài)'; COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'actor與repo之間的關(guān)系'; COMMENT ON COLUMN public.gh_realtime_data.language IS '編程語(yǔ)言'; COMMENT ON COLUMN public.gh_realtime_data.merged IS '是否接受合并'; COMMENT ON COLUMN public.gh_realtime_data.merged_at IS '代碼合并時(shí)間'; COMMENT ON COLUMN public.gh_realtime_data.additions IS '代碼增加行數(shù)'; COMMENT ON COLUMN public.gh_realtime_data.deletions IS '代碼減少行數(shù)'; COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'pull request 改變文件數(shù)量'; COMMENT ON COLUMN public.gh_realtime_data.push_size IS '提交數(shù)量'; COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS '不同的提交數(shù)量'; COMMENT ON COLUMN public.gh_realtime_data.hr IS '事件發(fā)生所在小時(shí),如00點(diǎn)23分,hr=00'; COMMENT ON COLUMN public.gh_realtime_data.month IS '事件發(fā)生所在月,如2015年10月,month=2015-10'; COMMENT ON COLUMN public.gh_realtime_data.year IS '事件發(fā)生所在年,如2015年,year=2015'; COMMENT ON COLUMN public.gh_realtime_data.ds IS '事件發(fā)生所在日,ds=yyyy-mm-dd'; COMMIT;
通過(guò)Flink實(shí)時(shí)寫(xiě)入數(shù)據(jù)。
通過(guò)Flink對(duì)SLS的數(shù)據(jù)進(jìn)一步解析并實(shí)時(shí)寫(xiě)入到Hologres中。在Flink中使用如下語(yǔ)句對(duì)寫(xiě)入的數(shù)據(jù)進(jìn)行過(guò)濾,丟棄事件ID、事件發(fā)生時(shí)間(
created_at
)為空的臟數(shù)據(jù),并且只保留近期發(fā)生的事件數(shù)據(jù)。CREATE TEMPORARY TABLE sls_input ( actor varchar, created_at varchar, id bigint, org varchar, payload varchar, public varchar, repo varchar, type varchar ) WITH ( 'connector' = 'sls', 'endpoint' = '<endpoint>',--sls私域endpoint 'accessid' = '<accesskey id>',--賬號(hào)access id 'accesskey' = '<accesskey secret>',--賬號(hào)access key 'project' = '<project name>',--sls的project名 'logstore' = '<logstore name>'--sls的LogStore名稱 ); CREATE TEMPORARY TABLE hologres_sink ( id bigint, actor_id bigint, actor_login string, repo_id bigint, repo_name string, org_id bigint, org_login string, type string, created_at timestamp, action string, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id string, member_id bigint, rev_or_push_or_rel_id bigint, `ref` string, ref_type string, state string, author_association string, `language` string, merged boolean, merged_at timestamp, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr string, `month` string, `year` string, ds string ) with ( 'connector' = 'hologres', 'dbname' = '<hologres dbname>', --Hologres的數(shù)據(jù)庫(kù)名稱 'tablename' = '<hologres tablename>', --Hologres用于接收數(shù)據(jù)的表名稱 'username' = '<accesskey id>', --當(dāng)前阿里云賬號(hào)的AccessKey ID 'password' = '<accesskey secret>', --當(dāng)前阿里云賬號(hào)的AccessKey Secret 'endpoint' = '<endpoint>', --當(dāng)前Hologres實(shí)例VPC網(wǎng)絡(luò)的Endpoint 'jdbcretrycount' = '1', --連接故障時(shí)的重試次數(shù) 'partitionrouter' = 'true', --是否寫(xiě)入分區(qū)表 'createparttable' = 'true', --是否自動(dòng)創(chuàng)建分區(qū) 'mutatetype' = 'insertorignore' --數(shù)據(jù)寫(xiě)入模式 ); INSERT INTO hologres_sink SELECT id ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id ,JSON_VALUE(actor, '$.login') AS actor_login ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id ,JSON_VALUE(repo, '$.name') AS repo_name ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id ,JSON_VALUE(org, '$.login') AS org_login ,type ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at ,JSON_VALUE(payload, '$.action') AS action ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint) END AS iss_or_pr_id ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint) ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint) END AS number ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id ,CASE WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint) WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint) WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint) END AS rev_or_push_or_rel_id ,JSON_VALUE(payload, '$.ref') AS `ref` ,JSON_VALUE(payload, '$.ref_type') AS ref_type ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state') END AS state ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association') WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association') END AS author_association ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language` ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds FROM sls_input WHERE id IS NOT NULL AND created_at IS NOT NULL AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1) ;
參數(shù)說(shuō)明請(qǐng)參見(jiàn)日志服務(wù)SLS源表和實(shí)時(shí)數(shù)倉(cāng)Hologres結(jié)果表。
說(shuō)明由于GitHub原始事件數(shù)據(jù)采用的時(shí)區(qū)為UTC、原始數(shù)據(jù)不帶有時(shí)區(qū)屬性,Hologres的默認(rèn)時(shí)區(qū)為東八區(qū),因此需要在Flink實(shí)時(shí)寫(xiě)入Hologres過(guò)程中對(duì)數(shù)據(jù)時(shí)區(qū)進(jìn)行調(diào)整:需要在Flink SQL中對(duì)源表數(shù)據(jù)賦予UTC時(shí)區(qū)屬性,并在啟動(dòng)作業(yè)時(shí)在作業(yè)啟動(dòng)配置頁(yè)面的Flink配置區(qū)域添加
table.local-time-zone:Asia/Shanghai
語(yǔ)句將Flink系統(tǒng)時(shí)區(qū)定義為Asia/Shanghai
。查詢數(shù)據(jù)。
在Hologres中查詢通過(guò)Flink寫(xiě)入Hologres中的SLS數(shù)據(jù),后續(xù)您可以根據(jù)業(yè)務(wù)需求進(jìn)行數(shù)據(jù)開(kāi)發(fā)。
SELECT * FROM public.gh_realtime_data limit 10;
結(jié)果示例如下:
使用離線數(shù)據(jù)修正實(shí)時(shí)數(shù)據(jù)
在本文的場(chǎng)景中,實(shí)時(shí)數(shù)據(jù)存在遺漏的可能,因此可以使用離線數(shù)據(jù)對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行修正。通過(guò)如下步驟可以完成對(duì)前一日實(shí)時(shí)數(shù)據(jù)的修正,您可以根據(jù)自身業(yè)務(wù)需要,調(diào)整數(shù)據(jù)修正的周期。
在Hologres中創(chuàng)建外部表,獲取MaxCompute離線數(shù)據(jù)。
IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to ( <foreign_table_name> ) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');
參數(shù)說(shuō)明請(qǐng)參見(jiàn)IMPORT FOREIGN SCHEMA。
通過(guò)創(chuàng)建臨時(shí)表實(shí)現(xiàn)離線數(shù)據(jù)修正前一日實(shí)時(shí)數(shù)據(jù)。
說(shuō)明Hologres從V2.1.17版本起支持Serverless Computing能力,針對(duì)大數(shù)據(jù)量離線導(dǎo)入、大型ETL作業(yè)、外表大數(shù)據(jù)量查詢等場(chǎng)景,使用Serverless Computing執(zhí)行該類任務(wù)可以直接使用額外的Serverless資源,避免使用實(shí)例自身資源,無(wú)需為實(shí)例預(yù)留額外的計(jì)算資源,顯著提升實(shí)例穩(wěn)定性、減少OOM概率,且僅需為任務(wù)單獨(dú)付費(fèi)。Serverless Computing詳情請(qǐng)參見(jiàn)Serverless Computing概述,Serverless Computing使用方法請(qǐng)參見(jiàn)Serverless Computing使用指南。
-- 清理潛在的臨時(shí)表 DROP TABLE IF EXISTS gh_realtime_data_tmp; -- 創(chuàng)建臨時(shí)表 SET hg_experimental_enable_create_table_like_properties = ON; CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data'); -- (可選)推薦使用Serverless Computing執(zhí)行大數(shù)據(jù)量離線導(dǎo)入和ETL作業(yè) SET hg_computing_resource = 'serverless'; -- 向臨時(shí)表插入數(shù)據(jù)并更新統(tǒng)計(jì)信息 INSERT INTO gh_realtime_data_tmp SELECT * FROM <foreign_table_name> WHERE ds = current_date - interval '1 day' ON CONFLICT (id, ds) DO NOTHING; ANALYZE gh_realtime_data_tmp; -- 重置配置,保證非必要的SQL不會(huì)使用serverless資源。 RESET hg_computing_resource; -- 已有臨時(shí)子表替換原子表 BEGIN; DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>'); COMMIT;
數(shù)據(jù)分析
針對(duì)已獲取到的海量數(shù)據(jù),可以進(jìn)行豐富的數(shù)據(jù)分析。您可以結(jié)合自身業(yè)務(wù)需要分析的時(shí)間范圍,對(duì)數(shù)據(jù)倉(cāng)庫(kù)進(jìn)行進(jìn)一步分層設(shè)計(jì),以滿足實(shí)時(shí)數(shù)據(jù)分析、離線數(shù)據(jù)分析、實(shí)時(shí)離線一體化分析等多方面訴求。
如下示例針對(duì)上文獲取到的實(shí)時(shí)數(shù)據(jù)進(jìn)行分析,您也可以針對(duì)具體代碼倉(cāng)庫(kù)或開(kāi)發(fā)者進(jìn)行數(shù)據(jù)分析。
查詢今日公開(kāi)事件總數(shù)。
SELECT count(*) FROM gh_realtime_data WHERE created_at >= date_trunc('day', now());
返回結(jié)果示例如下:
count ------ 1006
查詢過(guò)去1天最活躍(事件數(shù)最多)的幾個(gè)項(xiàng)目。
SELECT repo_name, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' GROUP BY repo_name ORDER BY events DESC LIMIT 5;
返回結(jié)果示例如下:
repo_name events ----------------------------------------+------ leo424y/heysiri.ml 29 arm-on/plan 10 Christoffel-T/fiverr-pat-20230331 9 mate-academy/react_dynamic-list-of-goods 9 openvinotoolkit/openvino 7
查詢過(guò)去1天最活躍(事件數(shù)最多)的幾位開(kāi)發(fā)者。
SELECT actor_login, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' AND actor_login NOT LIKE '%[bot]' GROUP BY actor_login ORDER BY events DESC LIMIT 5;
返回結(jié)果示例如下:
actor_login events ------------------+------ direwolf-github 13 arm-on 10 sergii-nosachenko 9 Christoffel-T 9 yangwang201911 7
查詢過(guò)去1小時(shí)最火編程語(yǔ)言排行。
SELECT language, count(*) total FROM gh_realtime_data WHERE created_at > now() - interval '1 hour' AND language IS NOT NULL GROUP BY language ORDER BY total DESC LIMIT 10;
返回結(jié)果示例如下:
language total -----------+---- JavaScript 25 C++ 15 Python 14 TypeScript 13 Java 8 PHP 8
查詢過(guò)去1天項(xiàng)目加星數(shù)排行。
說(shuō)明本示例并未考慮用戶取消星標(biāo)等情況。
SELECT repo_id, repo_name, COUNT(actor_login) total FROM gh_realtime_data WHERE type = 'WatchEvent' AND created_at > now() - interval '1 day' GROUP BY repo_id, repo_name ORDER BY total DESC LIMIT 10;
返回結(jié)果示例如下:
repo_id repo_name total ---------+----------------------------------+----- 618058471 facebookresearch/segment-anything 4 619959033 nomic-ai/gpt4all 1 97249406 denysdovhan/wtfjs 1 9791525 digininja/DVWA 1 168118422 aylei/interview 1 343520006 joehillen/sysz 1 162279822 agalwood/Motrix 1 577723410 huggingface/swift-coreml-diffusers 1 609539715 e2b-dev/e2b 1 254839429 maniackk/KKCallStack 1
查詢今日用戶和項(xiàng)目日活。
SELECT uniq (actor_id) actor_num, uniq (repo_id) repo_num FROM gh_realtime_data WHERE created_at > date_trunc('day', now());
返回結(jié)果示例如下:
actor_num repo_num ---------+-------- 743 816