通過腳本實(shí)現(xiàn)索引結(jié)構(gòu)、模板或ILM的同步
在進(jìn)行Elasticsearch(ES)集群間數(shù)據(jù)遷移時(shí),例如從自建ES遷移到阿里云ES,必須確保源端和目標(biāo)端的索引結(jié)構(gòu)保持一致,避免自動(dòng)映射可能引入的數(shù)據(jù)丟失、格式錯(cuò)誤及查詢性能下降等問題,因此一般要求在數(shù)據(jù)遷移前手動(dòng)創(chuàng)建目標(biāo)索引,預(yù)先定義索引的映射和設(shè)置。本文介紹運(yùn)用Python腳本預(yù)定義ES集群索引的映射和設(shè)置,并實(shí)現(xiàn)索引結(jié)構(gòu)、索引模板和索引生命周期管理(ILM)在目標(biāo)集群的寫入。
前提條件
已創(chuàng)建兩個(gè)阿里云ES實(shí)例。具體操作,請參見創(chuàng)建阿里云Elasticsearch實(shí)例。
本文以源端和目標(biāo)端的ES版本均為7.10為例。
說明本文以ES 7.10版本為例提供相關(guān)的Python腳本示例。高版本ES在mapping構(gòu)造上可能存在差異,其他版本需結(jié)合場景進(jìn)行調(diào)整,如低版本多type結(jié)構(gòu)在高版本已不支持,無法通過文檔示例進(jìn)行創(chuàng)建。
已創(chuàng)建ECS實(shí)例,并配置python環(huán)境,具體操作,請參考Linux系統(tǒng)實(shí)例快速入門。
本文以Python 3.6.8為例,其他版本結(jié)合對應(yīng)版本接口的Requests模塊進(jìn)行調(diào)整。
已打通源端ES和目標(biāo)端ES與ECS的網(wǎng)絡(luò),將ECS公網(wǎng)或私網(wǎng)IP地址,分別配置到源端ES和目標(biāo)端ES的公網(wǎng)地址訪問白名單或VPC私網(wǎng)訪問白名單中。
說明生產(chǎn)環(huán)境注重?cái)?shù)據(jù)安全性,建議您通過私網(wǎng)連通ECS與源端ES和目標(biāo)端ES。
同步索引結(jié)構(gòu)
同步索引結(jié)構(gòu)(mappings)和設(shè)置(settings)下的主副本配置。
準(zhǔn)備測試數(shù)據(jù)。
在源端ES中執(zhí)行以下命令,創(chuàng)建索引。
PUT /product_info { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "productName": { "type": "text" }, "annual_rate":{ "type":"keyword" }, "describe": { "type": "text" } } } }
在ECS中執(zhí)行以下腳本,同步索引結(jié)構(gòu)和設(shè)置。
import requests from requests.auth import HTTPBasicAuth # 配置信息,按照實(shí)際環(huán)境調(diào)整 config = { # 源集群host 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # 源集群用戶名 'old_cluster_user': 'yourusername', # 源集群密碼 'old_cluster_password': 'yourpassward', # 源集群http協(xié)議,可選 http/https 'old_cluster_protocol': 'http', # 目標(biāo)集群host,可在阿里云Elasticsearch實(shí)例的基本信息頁面獲取。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # 目標(biāo)集群用戶名 'new_cluster_user': 'yourusername', # 目標(biāo)集群密碼 'new_cluster_password': 'yourpassward', # 目標(biāo)集群http協(xié)議,可選 http/https 'new_cluster_protocol': 'http', # 目標(biāo)集群默認(rèn)副本數(shù) 'default_replicas': 1, } # 通用的 HTTP 請求函數(shù) def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # 打印錯(cuò)誤信息 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # 如果響應(yīng)不是 JSON 格式,打印錯(cuò)誤并返回原始內(nèi)容 print("Invalid JSON response:") print(response.text) raise # 獲取所有索引列表 def get_indices(): endpoint = "/_cat/indices?format=json" indices_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) index_list = [index['index'] for index in indices_result if index['status'] == 'open'] return index_list # 獲取索引的設(shè)置 def get_index_settings(index): endpoint = f"/{index}/_settings" index_settings = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) settings = index_settings[index]['settings']['index'] shards_replicas_settings = { 'number_of_shards': settings.get('number_of_shards'), 'number_of_replicas': config['default_replicas'] } return {'settings': shards_replicas_settings} # 獲取索引的映射 def get_index_mapping(index): endpoint = f"/{index}/_mapping" index_mapping = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return {'mappings': index_mapping[index]['mappings']} # 創(chuàng)建新索引 def create_index(old_index_name, new_index_name=""): if not new_index_name: new_index_name = old_index_name settings = get_index_settings(old_index_name) mappings = get_index_mapping(old_index_name) body = {**settings, **mappings} endpoint = f"/{new_index_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=body) print(f"Index {new_index_name} created with result: {create_result}") # 主函數(shù) def main(): index_list = get_indices() for index in index_list: if not index.startswith("."): # 忽略系統(tǒng)索引 create_index(index) if __name__ == '__main__': main()
驗(yàn)證結(jié)果。
在目標(biāo)端ES中執(zhí)行以下命令,查看同步成功的索引。
GET _cat/indices/product_info
同步索引模板
準(zhǔn)備測試數(shù)據(jù)。
在源端ES中執(zhí)行以下命令,創(chuàng)建索引模板。
PUT _template/product { "index_patterns": ["product_*"], "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "productName": { "type": "text" }, "annual_rate":{ "type":"keyword" }, "describe": { "type": "text" } } } }
在ECS中執(zhí)行以下腳本,同步索引模板。
import requests from requests.auth import HTTPBasicAuth # 配置信息,按照實(shí)際環(huán)境調(diào)整 config = { # 源集群host 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # 源集群用戶名 'old_cluster_user': 'yourusername', # 源集群密碼 'old_cluster_password': 'yourpassward', # 源集群http協(xié)議,可選 http/https 'old_cluster_protocol': 'http', # 目標(biāo)集群host,可在阿里云Elasticsearch實(shí)例的基本信息頁面獲取。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # 目標(biāo)集群用戶名 'new_cluster_user': 'yourusername', # 目標(biāo)集群密碼 'new_cluster_password': 'yourpassward', # 目標(biāo)集群http協(xié)議,可選 http/https 'new_cluster_protocol': 'http', # 目標(biāo)集群默認(rèn)副本數(shù) 'default_replicas': 1, } # 通用的 HTTP 請求函數(shù) def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # 打印錯(cuò)誤信息 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # 如果響應(yīng)不是 JSON 格式,打印錯(cuò)誤并返回原始內(nèi)容 print("Invalid JSON response:") print(response.text) raise # 獲取源集群的所有索引模板 def get_index_templates(): endpoint = "/_template" templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return templates_result # 創(chuàng)建目標(biāo)集群的索引模板 def create_index_template(template_name, template_body): endpoint = f"/_template/{template_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=template_body) print(f"Template {template_name} created with result: {create_result}") # 主函數(shù) def main(): # 同步索引模板 templates = get_index_templates() for template_name, template_body in templates.items(): create_index_template(template_name, template_body) if __name__ == '__main__': main()
驗(yàn)證結(jié)果。
在目標(biāo)端ES中執(zhí)行以下命令,查詢目標(biāo)模板的信息。
GET _template/product
同步索引生命周期管理(ILM)
準(zhǔn)備測試數(shù)據(jù)。
在源端ES中執(zhí)行以下命令,創(chuàng)建索引生命周期管理(ILM)。
PUT _ilm/policy/product { "policy": { "phases": { "hot": { "actions": { "rollover": { "max_size": "1GB", "max_age": "1d", "max_docs": 1000 } } }, "delete": { "min_age": "2h", "actions": { "delete": {} } } } } }
在ECS中執(zhí)行以下腳本,同步索引生命周期管理(ILM)。
import requests from requests.auth import HTTPBasicAuth # 配置信息,按照實(shí)際環(huán)境調(diào)整 config = { # 源集群host 'old_cluster_host': 'es-cn-27a3jyyd2000770dd.public.elasticsearch.aliyuncs.com:9200', # 源集群用戶名 'old_cluster_user': 'yourusername', # 源集群密碼 'old_cluster_password': 'yourpassward', # 源集群http協(xié)議,可選 http/https 'old_cluster_protocol': 'http', # 目標(biāo)集群host,可在阿里云Elasticsearch實(shí)例的基本信息頁面獲取。 'new_cluster_host': 'es-cn-o493kbics000cw4pt.elasticsearch.aliyuncs.com:9200', # 目標(biāo)集群用戶名 'new_cluster_user': 'yourusername', # 目標(biāo)集群密碼 'new_cluster_password': 'yourpassward', # 目標(biāo)集群http協(xié)議,可選 http/https 'new_cluster_protocol': 'http', # 目標(biāo)集群默認(rèn)副本數(shù) 'default_replicas': 1, } # 通用的 HTTP 請求函數(shù) def send_http_request(method, host, endpoint, username="", password="", params=None, json_body=None, protocol='http'): url = f"{protocol}://{host}{endpoint}" auth = (username, password) if username and password else None headers = {'Content-Type': 'application/json'} if method != 'GET' else None try: response = requests.request(method, url, auth=auth, params=params, json=json_body, headers=headers) response.raise_for_status() return response.json() except requests.HTTPError as e: # 打印錯(cuò)誤信息 print(f"HTTP Error: {e.response.status_code} for {url}") print(e.response.text) except ValueError as e: # 如果響應(yīng)不是 JSON 格式,打印錯(cuò)誤并返回原始內(nèi)容 print("Invalid JSON response:") print(response.text) raise # 獲取源集群的所有索引ILM def get_ilm_polices(): endpoint = "/_ilm/policy" templates_result = send_http_request('GET', config['old_cluster_host'], endpoint, config['old_cluster_user'], config['old_cluster_password'], protocol=config['old_cluster_protocol']) return templates_result # 創(chuàng)建目標(biāo)集群的索引ILM def create_ilm_policy(policy_name, policy_body): policy_body.pop('version', None) policy_body.pop('modified_date', None) policy_body.pop('modified_date_string', None) endpoint = f"/_ilm/policy/{policy_name}" create_result = send_http_request('PUT', config['new_cluster_host'], endpoint, config['new_cluster_user'], config['new_cluster_password'], protocol=config['new_cluster_protocol'], json_body=policy_body) print(f"Policy {policy_name} created with result: {create_result}") # 主函數(shù) def main(): # 同步索引ILM policies = get_ilm_polices() for policy_name, policy_body in policies.items(): create_ilm_policy(policy_name, policy_body) if __name__ == '__main__': main()
驗(yàn)證結(jié)果。
在目標(biāo)端ES中執(zhí)行以下命令,查詢索引生命周期管理(ILM)的信息。
GET _ilm/policy/product