日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

批量刪除作業

1 背景

由于歷史原因,提交了大量的批量計算 DAG 作業,但是作業沒有及時清理;或者單次提交了大量的作業,需要做清理操作。目前批量計算控制臺支持單頁批量刪除,但若存在幾十頁作業的情況,清理動作也比較麻煩。因此提供了一個批量清理批量計算作業的工作。

2 工具

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import threading
import sys
PY2 = sys.version_info[0] == 2
if PY2:
    import Queue as bcQueue
else:
    import queue as bcQueue
import json
import time
from batchcompute import Client, ClientError
import argparse
from functools import wraps

def retryWrapper(func):
    @wraps(func)
    def wrapper(*args,**kwargs):
        index = 0
        while True:
            try:
                res = func(*args,**kwargs)
                break
            except ClientError as e:
                status = e.get_status_code()
                if status >= 400 and status < 500:
                    raise Exception(str(e))
                if status >= 500 and status < 600:
                    if index > 6:
                        raise Exception(str(e))
                    else:
                        time.sleep(0.5 * pow(2,index))
                        index += 1
            except Exception as e:
                if index > 6:
                    raise Exception(str(e))
                else:
                    time.sleep(0.5 * pow(2,index))
                    index += 1
        return res
    return wrapper

class ShowProcess():
    """
    顯示處理進度的類
    調用該類相關函數即可實現處理進度的顯示
    """
    i = 0 # 當前的處理進度
    max_steps = 0 # 總共需要處理的次數
    max_arrow = 50 #進度條的長度
    infoDone = 'done'

    # 初始化函數,需要知道總共的處理次數
    def __init__(self, max_steps, infoDone = 'Done'):
        self.max_steps = max_steps
        self.i = 0
        self.infoDone = infoDone

    # 顯示函數,根據當前的處理進度i顯示進度
    # 效果為[>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>]100.00%
    def show_process(self, i=None):
        if i is not None:
            self.i = i
        else:
            self.i += 1
        num_arrow = int(self.i * self.max_arrow / self.max_steps) #計算顯示多少個'>'
        num_line = self.max_arrow - num_arrow #計算顯示多少個'-'
        percent = self.i * 100.0 / self.max_steps #計算完成進度,格式為xx.xx%
        process_bar = '[' + '>' * num_arrow + '-' * num_line + ']'\
                      + '%.2f' % percent + '%' + '\r' #帶輸出的字符串,'\r'表示不換行回到最左邊
        sys.stdout.write(process_bar) #這兩句打印字符到終端
        sys.stdout.flush()
        if self.i >= self.max_steps:
            self.close()

    def close(self):
        print('')
        print(self.infoDone)
        self.i = 0

class DeleteJob(threading.Thread):
    def __init__(self, client, jobQueue, error_queue):
        threading.Thread.__init__(self)
        self.jobQueue = jobQueue
        self.error_queue = error_queue
        self.client = client

    @retryWrapper
    def delteSpecJob(self, jobId):
        self.client.delete_job(jobId)

    def run(self):
        while True:
            try:
                jobId = self.jobQueue.get(block=False)
                self.delteSpecJob(jobId)
                self.jobQueue.task_done()
            except bcQueue.Empty:
                break
            except Exception as e:
                self.jobQueue.task_done()
                self.error_queue.put('Delte Job(%s) exception %s' % (jobId, str(e)))

class DeleteJobs:
    def __init__(self, client, days):
        self.client = client
        self.days = days

    @retryWrapper
    def listSpecIdxJob(self, marker, max_item):
        response = self.client.list_jobs(marker, max_item)
        return response

    def listAndDeleteJobs(self):
        marker = ""
        max_item = 100
        index = 0
        jobQueue = bcQueue.Queue(0)
        error_queue = bcQueue.Queue(0)

        print("Begin List Spec Jobs...")
        while marker or index == 0:
            response = self.listSpecIdxJob(marker, max_item)
            marker = response.NextMarker
            for job in response.Items:
                if job.State == "Running" or job.State == "Waiting":
                    continue
                if self.days == 0:
                    jobQueue.put(job.Id)
                else:
                    createTime = job.CreationTime.strftime("%Y-%m-%d %H:%M:%S.%f")
                    timestap = int(time.mktime(time.strptime(createTime, "%Y-%m-%d %H:%M:%S.%f")))
                    detaTime = self.days * 3600 * 24
                    if int(time.time()) - timestap >= detaTime:
                        jobQueue.put(job.Id)
                        # print "jobId: %s" % job.Id
            index += 1

        print("List Spec Jobs Finish...")
        threadnum = multiprocessing.cpu_count()
        size = jobQueue.qsize()
        if size < threadnum:
            threadnum = size

        thread_pool = []
        for threadnum in range(threadnum):
            current = DeleteJob(self.client, jobQueue, error_queue)
            thread_pool.append(current)
            current.start()
        print("Begin Delete Jobs...")
        process_bar = ShowProcess(size, 'OK')
        totalSize = size
        while size != 0:
            size = jobQueue.qsize()
            process_bar.show_process(totalSize - size)
            time.sleep(0.5)

        jobQueue.join()
        for thread in thread_pool:
            thread.join()

        errs = []
        while True:
            try:
                error = error_queue.get(block=False)
                errs.append(error)
            except bcQueue.Empty:
                break
        return errs

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        formatter_class = argparse.ArgumentDefaultsHelpFormatter,
        description = 'python scripyt for Delete Batchcompute jobs',
        usage='deleteJobs.py <positional argument> [<args>]',
    )
    region_surpport = ["cn-beijing", "cn-zhangjiakou", "cn-hangzhou", "cn-huhehaote", "cn-shanghai", "cn-shenzhen"]

    parser.add_argument('--region', action='store', type = str, choices=region_surpport, help = 'the region info.')
    parser.add_argument('--id', action='store', type = str, required=True, help = 'keyid.')
    parser.add_argument('--key', action='store', type = str, required=True, help = 'key secrete.')
    parser.add_argument('--day', action='store', type=int, required=True, help = 'delete the jobs which creating time before the spec day.')
    args = parser.parse_args()

    BatchHost = "batchcompute.%s.aliyuncs.com" % args.region
    client = Client(BatchHost, args.id, args.key)

    deleteJobs = DeleteJobs(client, args.day)
    err = deleteJobs.listAndDeleteJobs()
    if len(err) != 0 :
        a = ""
        print("\n".join(err))
    else:
        print("delete Jobs success!!!")

3 執行刪除動作

3.1 準備工作

復制腳本到執行機器,執行機器 必須安裝批量計算的SDK:

pip install batchcompute

若執行機器已經安裝批量計算的 SDK ,則建議更新到最新版本。

pip install --upgrade batchcompute

準備好阿里云 AK 信息,且該 AK 已經開通批量計算相關權限。

執行機器安裝好 Python,建議 Python 2.7及以上。

3.2 執行腳本

python delete.py --region cn-beijing --id xxxx --key xxx --day 10
  • id 表示 AK的 ID 信息

  • key 表示 AK的 KEY 信息

  • day 表示保留最近 N 天的意思

以上的示例表示:刪除北京 region 10天前創建的非等待和運行狀態的作業。

3.3 執行結果

deletejobs