日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過腳本實(shí)現(xiàn)索引結(jié)構(gòu)、模板或ILM的同步

在進(jìn)行Elasticsearch(ES)集群間數(shù)據(jù)遷移時(shí),例如從自建ES遷移到阿里云ES,必須確保源端和目標(biāo)端的索引結(jié)構(gòu)保持一致,避免自動(dòng)映射可能引入的數(shù)據(jù)丟失、格式錯(cuò)誤及查詢性能下降等問題,因此一般要求在數(shù)據(jù)遷移前手動(dòng)創(chuàng)建目標(biāo)索引,預(yù)先定義索引的映射和設(shè)置。本文介紹運(yùn)用Python腳本預(yù)定義ES集群索引的映射和設(shè)置,并實(shí)現(xiàn)索引結(jié)構(gòu)、索引模板和索引生命周期管理(ILM)在目標(biāo)集群的寫入。

前提條件

  1. 已創(chuàng)建兩個(gè)阿里云ES實(shí)例。具體操作,請參見創(chuàng)建阿里云Elasticsearch實(shí)例

    本文以源端和目標(biāo)端的ES版本均為7.10為例。

    說明

    本文以ES 7.10版本為例提供相關(guān)的Python腳本示例。高版本ES在mapping構(gòu)造上可能存在差異,其他版本需結(jié)合場景進(jìn)行調(diào)整,如低版本多type結(jié)構(gòu)在高版本已不支持,無法通過文檔示例進(jìn)行創(chuàng)建。

  2. 已創(chuàng)建ECS實(shí)例,并配置python環(huán)境,具體操作,請參考Linux系統(tǒng)實(shí)例快速入門

    本文以Python 3.6.8為例,其他版本結(jié)合對應(yīng)版本接口的Requests模塊進(jìn)行調(diào)整。

  3. 已打通源端ES和目標(biāo)端ES與ECS的網(wǎng)絡(luò),將ECS公網(wǎng)或私網(wǎng)IP地址,分別配置到源端ES和目標(biāo)端ES的公網(wǎng)地址訪問白名單或VPC私網(wǎng)訪問白名單中。

    說明

    生產(chǎn)環(huán)境注重?cái)?shù)據(jù)安全性,建議您通過私網(wǎng)連通ECS與源端ES和目標(biāo)端ES。

同步索引結(jié)構(gòu)

同步索引結(jié)構(gòu)(mappings)和設(shè)置(settings)下的主副本配置。

  1. 準(zhǔn)備測試數(shù)據(jù)。

    在源端ES中執(zhí)行以下命令,創(chuàng)建索引。

    PUT /product_info
    {
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
          "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. 在ECS中執(zhí)行以下腳本,同步索引結(jié)構(gòu)和設(shè)置。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 配置信息,按照實(shí)際環(huán)境調(diào)整
    config = {
        # 源集群host
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # 源集群用戶名
        'old_cluster_user': 'yourusername',
        # 源集群密碼
        'old_cluster_password': 'yourpassward',
        # 源集群http協(xié)議,可選 http/https
        'old_cluster_protocol': 'http',
        # 目標(biāo)集群host,可在阿里云Elasticsearch實(shí)例的基本信息頁面獲取。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # 目標(biāo)集群用戶名
        'new_cluster_user': 'yourusername',
        # 目標(biāo)集群密碼
        'new_cluster_password': 'yourpassward',
        # 目標(biāo)集群http協(xié)議,可選 http/https
        'new_cluster_protocol': 'http',
        # 目標(biāo)集群默認(rèn)副本數(shù)
        'default_replicas': 1,
    }
    
    # 通用的 HTTP 請求函數(shù)
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # 打印錯(cuò)誤信息
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # 如果響應(yīng)不是 JSON 格式,打印錯(cuò)誤并返回原始內(nèi)容
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # 獲取所有索引列表
    def get_indices():
        endpoint = "/_cat/indices?format=json"
        indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        index_list = [index['index'] for index in indices_result if index['status'] == 'open']
        return index_list
    
    # 獲取索引的設(shè)置
    def get_index_settings(index):
        endpoint = f"/{index}/_settings"
        index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        settings = index_settings[index]['settings']['index']
        shards_replicas_settings = {
            'number_of_shards': settings.get('number_of_shards'),
            'number_of_replicas': config['default_replicas']
        }
        return {'settings': shards_replicas_settings}
    
    # 獲取索引的映射
    def get_index_mapping(index):
        endpoint = f"/{index}/_mapping"
        index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return {'mappings': index_mapping[index]['mappings']}
    
    # 創(chuàng)建新索引
    def create_index(old_index_name, new_index_name=""):
        if not new_index_name:
            new_index_name = old_index_name
    
        settings = get_index_settings(old_index_name)
        mappings = get_index_mapping(old_index_name)
        body = {**settings, **mappings}
    
        endpoint = f"/{new_index_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body)
    
        print(f"Index {new_index_name} created with result: {create_result}")
    
    # 主函數(shù)
    def main():
        index_list = get_indices()
        for index in index_list:
            if not index.startswith("."):  # 忽略系統(tǒng)索引
                create_index(index)
    
    
    if __name__ == '__main__':
        main()
    
  3. 驗(yàn)證結(jié)果。

    在目標(biāo)端ES中執(zhí)行以下命令,查看同步成功的索引。

    GET _cat/indices/product_info

同步索引模板

  1. 準(zhǔn)備測試數(shù)據(jù)。

    在源端ES中執(zhí)行以下命令,創(chuàng)建索引模板。

    PUT _template/product
    {
      "index_patterns": ["product_*"],
      "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
      },
      "mappings": {
        "properties": {
            "productName": {
              "type": "text"
            },
            "annual_rate":{
              "type":"keyword"
            },
            "describe": {
              "type": "text"
            }
        }
      }
    }
  2. 在ECS中執(zhí)行以下腳本,同步索引模板。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 配置信息,按照實(shí)際環(huán)境調(diào)整
    config = {
        # 源集群host
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # 源集群用戶名
        'old_cluster_user': 'yourusername',
        # 源集群密碼
        'old_cluster_password': 'yourpassward',
        # 源集群http協(xié)議,可選 http/https
        'old_cluster_protocol': 'http',
        # 目標(biāo)集群host,可在阿里云Elasticsearch實(shí)例的基本信息頁面獲取。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # 目標(biāo)集群用戶名
        'new_cluster_user': 'yourusername',
        # 目標(biāo)集群密碼
        'new_cluster_password': 'yourpassward',
        # 目標(biāo)集群http協(xié)議,可選 http/https
        'new_cluster_protocol': 'http',
        # 目標(biāo)集群默認(rèn)副本數(shù)
        'default_replicas': 1,
    }
    
    # 通用的 HTTP 請求函數(shù)
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # 打印錯(cuò)誤信息
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # 如果響應(yīng)不是 JSON 格式,打印錯(cuò)誤并返回原始內(nèi)容
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # 獲取源集群的所有索引模板
    def get_index_templates():
        endpoint = "/_template"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # 創(chuàng)建目標(biāo)集群的索引模板
    def create_index_template(template_name, template_body):
        endpoint = f"/_template/{template_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body)
        print(f"Template {template_name} created with result: {create_result}")
    
    # 主函數(shù)
    def main():
    
        # 同步索引模板
        templates = get_index_templates()
        for template_name, template_body in templates.items():
            create_index_template(template_name, template_body)
    
    if __name__ == '__main__':
        main()
  3. 驗(yàn)證結(jié)果。

    在目標(biāo)端ES中執(zhí)行以下命令,查詢目標(biāo)模板的信息。

    GET _template/product

同步索引生命周期管理(ILM)

  1. 準(zhǔn)備測試數(shù)據(jù)。

    在源端ES中執(zhí)行以下命令,創(chuàng)建索引生命周期管理(ILM)。

    PUT _ilm/policy/product
    {
      "policy": {
        "phases": {
          "hot": {
            "actions": {
              "rollover": {
                "max_size": "1GB",
                "max_age": "1d",
                "max_docs": 1000
              }
            }
          },
          "delete": {
            "min_age": "2h",
            "actions": {
              "delete": {}
            }
          }
        }
      }
    }
  1. 在ECS中執(zhí)行以下腳本,同步索引生命周期管理(ILM)。

    import requests
    from requests.auth import HTTPBasicAuth
    
    # 配置信息,按照實(shí)際環(huán)境調(diào)整
    config = {
        # 源集群host
        'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200',
        # 源集群用戶名
        'old_cluster_user': 'yourusername',
        # 源集群密碼
        'old_cluster_password': 'yourpassward',
        # 源集群http協(xié)議,可選 http/https
        'old_cluster_protocol': 'http',
        # 目標(biāo)集群host,可在阿里云Elasticsearch實(shí)例的基本信息頁面獲取。
        'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200',
        # 目標(biāo)集群用戶名
        'new_cluster_user': 'yourusername',
        # 目標(biāo)集群密碼
        'new_cluster_password': 'yourpassward',
        # 目標(biāo)集群http協(xié)議,可選 http/https
        'new_cluster_protocol': 'http',
        # 目標(biāo)集群默認(rèn)副本數(shù)
        'default_replicas': 1,
    }
    
    # 通用的 HTTP 請求函數(shù)
    def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'):
        url = f"{protocol}://{host}{endpoint}"
        auth = (username, password) if username and password else None
        headers = {'Content-Type': 'application/json'} if method != 'GET' else None
        try:
            response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers)
            response.raise_for_status()
            return response.json()
        except requests.HTTPError as e:
            # 打印錯(cuò)誤信息
            print(f"HTTP Error: {e.response.status_code} for {url}")
            print(e.response.text)
        except ValueError as e:
            # 如果響應(yīng)不是 JSON 格式,打印錯(cuò)誤并返回原始內(nèi)容
            print("Invalid JSON response:")
            print(response.text)
            raise
    
    # 獲取源集群的所有索引ILM
    def get_ilm_polices():
        endpoint = "/_ilm/policy"
        templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol'])
        return templates_result
    
    # 創(chuàng)建目標(biāo)集群的索引ILM
    def create_ilm_policy(policy_name, policy_body):
        policy_body.pop('version', None)
        policy_body.pop('modified_date', None)
        policy_body.pop('modified_date_string', None)
    
        endpoint = f"/_ilm/policy/{policy_name}"
        create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body)
        print(f"Policy {policy_name} created with result: {create_result}")
    
    # 主函數(shù)
    def main():
    
        # 同步索引ILM
        policies = get_ilm_polices()
        for policy_name, policy_body in policies.items():
            create_ilm_policy(policy_name, policy_body)
    if __name__ == '__main__':
        main()
    
  2. 驗(yàn)證結(jié)果。

    在目標(biāo)端ES中執(zhí)行以下命令,查詢索引生命周期管理(ILM)的信息。

    GET _ilm/policy/product