SLS Indexing Service是E-MapReduce推出的一個Druid插件,用于從日志服務(Log Service,簡稱SLS)消費數據。

背景介紹

SLS Indexing Service優點如下:
  • 極為便捷的數據采集,可以利用SLS的多種數據采集方式實時將數據導入SLS。
  • 無需額外維護一個Kafka集群,省去了數據流的一個環節。
  • 支持Exactly-Once語義。

    因為SLS Indexing Service消費原理與Kafka Indexing Service類似,所以也支持Kafka Indexing Service一樣的Exactly-Once語義。

  • 消費作業高可靠保證,例如,作業失敗重試,集群重啟等。

準備工作

  • 如果您還沒有開通SLS服務,請先開通SLS服務,并配置好相應的Project和Logstore。
  • 準備好以下配置項內容:
    • SLS服務的Endpoint(請使用內網服務入口)。
    • 阿里云賬號的AccessKey ID和對應的AccessKey Secret。

使用SLS Indexing Service

  1. 準備數據格式描述文件
    如果您熟悉 Kafka Indexing Service,那么 SLS Indexing Service 會非常簡單。具體請參見Kafka Indexing Service的介紹,我們用同樣的數據進行索引,那么數據源的數據格式描述文件如下(將其保存為 metrics-sls.json):
    {
        "type": "sls",
        "dataSchema": {
            "dataSource": "metrics-sls",
            "parser": {
                "type": "string",
                "parseSpec": {
                    "timestampSpec": {
                        "column": "time",
                        "format": "auto"
                    },
                    "dimensionsSpec": {
                        "dimensions": ["url", "user"]
                    },
                    "format": "json"
                }
            },
            "granularitySpec": {
                "type": "uniform",
                "segmentGranularity": "hour",
                "queryGranularity": "none"
            },
            "metricsSpec": [{
                    "type": "count",
                    "name": "views"
                },
                {
                    "name": "latencyMs",
                    "type": "doubleSum",
                    "fieldName": "latencyMs"
                }
            ]
        },
        "ioConfig": {
            "project": <your_project>,
            "logstore": <your_logstore>,
            "endpoint": "cn-hangzhou-intranet.log.aliyuncs.com",(以杭州為例,注意使用內網服務入口)
            "accessKeyId": <your_access_key_id>,
            "accessKeySec": <your_access_key_secret>,
            "collectMode": "simple"/"other"
            "taskCount": 1,
            "replicas": 1,
            "taskDuration": "PT1H"
        },
        "tuningConfig": {
            "type": "sls",
            "maxRowsInMemory": "100000"
        }
    }
    對比Kafka Indexing Service一節中的介紹,我們發現兩者基本上是一樣的。這里簡要列一下需要注意的字段:
    • type: sls。
    • dataSchema.parser.parseSpec.format:與ioConfig.consumerProperties.logtail.collection-mode有關,也就是與SLS日志的收集模式有關。如果是極簡模式(simple)收集,那么該處原本文件是什么格式,就填什么格式。如果是非極簡模式(other)收集,那么此處取值為json。
    • ioConfig.project:您要收集的日志的project。
    • ioConfig.logstore: 您要收集的日志的logstore。
    • ioConfig.consumerProperties.endpoint: SLS內網服務地址,例如杭州對應 cn-hangzhou-intranet.log.aliyuncs.com
    • ioConfig.consumerProperties.access-key-id:阿里云賬號的AccessKey ID。
    • ioConfig.consumerProperties.access-key-secret: 阿里云賬號的AccessKeySecret。
    • ioConfig.consumerProperties.logtail.collection-mode: SLS日志收集模式,極簡模式填simple,其他情況填 other。
    重要 上述配置文件中的ioConfig 配置格式僅適用于EMR-3.20.0及之前版本。自EMR-3.21.0開始,ioConfig配置變更如下:
    "ioConfig": {
            "project": <your_project>,
            "logstore": <your_logstore>,
            "endpoint": "cn-hangzhou-intranet.log.aliyuncs.com",(以杭州為例,注意使用內網服務入口)
            "accessKeyId": <your_access_key_id>,
            "accessKeySec": <your_access_key_secret>,
            "collectMode": "simple"/"other"
            "taskCount": 1,
            "replicas": 1,
            "taskDuration": "PT1H"
        },

    即,取消了 consumerProperties 層級、access-key-idaccess-key-secretlogtail.collection-mode 變更為 accessKeyIdaccessKeySeccollectMode

  2. 執行下述命令添加SLS supervisor。
    curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-sls.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor
    重要 其中--negotiate、-u、-b、-c等選項是針對安全Druid集群。
  3. 向SLS中導入數據。

    您可以采用多種方式向SLS中導入數據。

  4. 在Druid端進行相關查詢。