本文通過編寫函數代碼檢測ECS實例中的CPU核數為例,為您介紹基于函數計算2.0創建自定義規則的完整操作流程。
前提條件
請確保您已開通函數計算服務。具體操作,請參見開通服務。
關于函數計算服務的收費標準,請參見計費概覽。
背景信息
關于自定義函數規則的概念、應用場景、運行原理等,請參見自定義函數規則定義和運行原理。
操作步驟
本文通過編寫函數代碼檢測ECS實例中的CPU核數為例,當CPU核數小于或等于2時,ECS實例合規;反之,ECS實例不合規。
創建服務。
登錄函數計算控制臺。
在左側導航欄,單擊服務及函數。
在頂部菜單欄,選擇地域,例如:華東2(上海)。
在服務列表頁面,單擊創建服務。
在創建服務面板,輸入服務的名稱,其他參數均保持默認值。
單擊確定。
創建函數。
在步驟 1創建的服務中,單擊創建函數。
在創建函數頁面,輸入函數名稱,請求處理程序類型選擇處理事件請求,運行環境選擇Python 3.10,其他參數保持默認值。
單擊創建。
配置函數代碼。
拷貝并粘貼如下代碼至文件index.py。
# # !/usr/bin/env python # # -*- encoding: utf-8 -*- import json import logging from aliyunsdkcore.client import AcsClient from aliyunsdkcore.request import CommonRequest logger = logging.getLogger() # 合規類型 COMPLIANCE_TYPE_COMPLIANT = 'COMPLIANT' COMPLIANCE_TYPE_NON_COMPLIANT = 'NON_COMPLIANT' COMPLIANCE_TYPE_NOT_APPLICABLE = 'NOT_APPLICABLE' # 資源配置推送類型 CONFIGURATION_TYPE_COMMON = 'COMMON' CONFIGURATION_TYPE_OVERSIZE = 'OVERSIZE' CONFIGURATION_TYPE_NONE = 'NONE' # 入口函數,完成業務邏輯編排和處理。 def handler(event, context): """ 處理函數 :param event:事件 :param context:上下文 :return:評估結果 """ # 函數入參 logger.info(f'打印函數入參:{event}') # 校驗Event,代碼可直接復制。 evt = validate_event(event) if not evt: return None creds = context.credentials rule_parameters = evt.get('ruleParameters') result_token = evt.get('resultToken') invoking_event = evt.get('invokingEvent') ordering_timestamp = evt.get('orderingTimestamp') # 資源配置信息。當規則觸發機制設置為配置變更時,該入參有值。當您新建規則或手動執行規則時,配置審計會逐個調用函數觸發對所有資源的評估。如果資源配置變更,配置審計根據變更的資源信息自動調用函數觸發一次資源評估。 configuration_item = invoking_event.get('configurationItem') account_id = configuration_item.get('accountId') resource_id = configuration_item.get('resourceId') resource_type = configuration_item.get('resourceType') region_id = configuration_item.get('regionId') resource_name = configuration_item.get('resourceName') # 判斷當前推送的資源配置信息是否大于配置(100 KB),如果是,則需要調用資源詳情API進行完整數據查詢。 configuration_type = invoking_event.get('configurationType') if configuration_type and configuration_type == CONFIGURATION_TYPE_OVERSIZE: resource_result = get_discovered_resource(creds, resource_id, resource_type, region_id) resource_json = json.loads(resource_result) configuration_item["configuration"] = resource_json["DiscoveredResourceDetail"]["Configuration"] # 對資源進行評估,需要根據實際業務自行實現評估邏輯,以下代碼僅供參考。 compliance_type, annotation = evaluate_configuration_item( rule_parameters, configuration_item) # 設置評估結果,格式需符合以下示例要求。 evaluations = [ { 'accountId': account_id, 'complianceResourceId': resource_id, 'complianceResourceName': resource_name, 'complianceResourceType': resource_type, 'complianceRegionId': region_id, 'orderingTimestamp': ordering_timestamp, 'complianceType': compliance_type, 'annotation': annotation } ] # 將評估結果返回并寫入配置審計,代碼可直接復制。 put_evaluations(creds, result_token, evaluations) return evaluations # 評估ECS實例的CPU核數。 def evaluate_configuration_item(rule_parameters, configuration_item): """ 評估邏輯 :param rule_parameters:規則參數 :param configuration_item:配置項 :return:評估類型 """ # 初始化返回值 compliance_type = COMPLIANCE_TYPE_COMPLIANT annotation = None # 獲取資源配置完整信息 full_configuration = configuration_item['configuration'] if not full_configuration: annotation = 'Configuration is empty.' return compliance_type, annotation # 轉換為JSON configuration = parse_json(full_configuration) cpu_count = configuration.get('Cpu') eq_count = rule_parameters.get('CpuCount') if cpu_count and cpu_count <= int(eq_count): annotation = json.dumps({"configuration":cpu_count,"desiredValue":eq_count,"operator":"Greater","property":"$.Cpu"}) compliance_type = COMPLIANCE_TYPE_NON_COMPLIANT return compliance_type, annotation return compliance_type, annotation def validate_event(event): """ 校驗Event :param event:Event :return:JSON對象 """ if not event: logger.error('Event is empty.') evt = parse_json(event) logger.info('Loading event: %s .' % json.dumps(evt)) if 'resultToken' not in evt: logger.error('ResultToken is empty.') return None if 'ruleParameters' not in evt: logger.error('RuleParameters is empty.') return None if 'invokingEvent' not in evt: logger.error('InvokingEvent is empty.') return None return evt def parse_json(content): """ JSON類型轉換 :param content:JSON字符串 :return:JSON對象 """ try: return json.loads(content) except Exception as e: logger.error('Parse content:{} to json error:{}.'.format(content, e)) return None # 評估結果返回,并寫入配置審計,代碼可直接復制。 def put_evaluations(creds, result_token, evaluations): """ 調用API返回并寫入評估結果 :param context:函數計算上下文 :param result_token:回調令牌 :param evaluations:評估結果 :return: None """ # 需具備權限AliyunConfigFullAccess的函數計算FC的服務角色。 client = AcsClient(creds.access_key_id, creds.access_key_secret, region_id='cn-shanghai') # 新建Request,并設置參數,Domain為config.cn-shanghai.aliyuncs.com。 request = CommonRequest() request.set_domain('config.cn-shanghai.aliyuncs.com') request.set_version('2019-01-08') request.set_action_name('PutEvaluations') request.add_body_params('ResultToken', result_token) request.add_body_params('Evaluations', evaluations) request.add_body_params('SecurityToken', creds.security_token) request.set_method('POST') try: response = client.do_action_with_exception(request) logger.info('PutEvaluations with request: {}, response: {}.'.format(request, response)) except Exception as e: logger.error('PutEvaluations error: %s' % e)
單擊左上角的部署代碼。
基于函數計算創建自定義規則。
登錄配置審計控制臺。
(可選)在左上角選擇目標賬號組。
僅資源目錄下的管理賬號需要執行該操作。單個阿里云賬號不涉及。
在左側導航欄,選擇
。在規則頁面,單擊新建規則。
在選擇創建方式頁面,先選擇基于函數計算自定義,然后選擇函數ARN,再單擊下一步。
在設置基本屬性頁面,先輸入規則名稱,再單擊添加規則入參,規則入參名稱輸入CpuCount,期望值輸入2,然后觸發機制選擇配置變更,最后單擊下一步。
說明規則入參名稱需要與步驟 3代碼中的
rule_parameters
保持一致。當您在創建規則、修改規則和規則重新審計資源時,配置審計會將目標資源類型的所有資源配置信息逐條推送至函數計算,并觸發函數對其進行評估。
在設置生效范圍頁面,資源類型選擇ECS實例,單擊下一步。
說明請您選擇實際待評估的資源類型,避免選擇所有資源類型觸發無效評估。
選擇規則關聯的資源后,規則將檢測您賬號下該資源類型的所有資源。
在設置修正頁面,單擊提交。
說明您可以打開設置修正開關,根據控制臺提示,設置自定義修正。具體操作,請參見設置自定義修正。
查看規則對ECS實例中CPU核數的檢測結果。
在規則列表中,您可以看到目標規則檢測出的不合規資源數。
說明單擊規則ID或操作列詳情,您可以查看最新檢測數據列表。
當目標規則的不合規資源列顯示無數據,但您確認當前賬號下有該規則中設置的資源時,說明函數調用失敗或函數計算提交評估結果到配置審計失敗。您可以在目標函數的調用日志頁簽,單擊操作列的請求日志,定位失敗原因,并逐步修改。具體操作,請參見查看調用日志。