python自动化统计TAPD缺陷,并定时通过企业微信群提醒开发处理

需求:

项目迭代过程,开发需要每天处理遗留缺陷,特别是严重/致命缺陷,需要在发布前处理完成。为了及时同步缺陷处理进展,需要每天统计bug状态及修复情况并在项目组(企业微信项目群)里同步信息

代码:

import requests
import time
import json
import pandas as pd
from dateutil.parser import parse
import schedule

# 缺陷年龄统计
def bugs_age(api_user,api_password,workspace_id,startime_endtime):
    
    url = "https://api.tapd.cn/bugs" #获取缺接口
    
    try:
        #============所有待处理问题:新/重新打开/已解决/挂起/延期postponed/接受处理/已拒绝(状态:ALL)
        send_bugs = []
        
        register_data = {
            "workspace_id":workspace_id,
            "status": "new|reopened|in_progress",
            "created": startime_endtime,
            "limit": 200,
            }
        
        result = requests.get(url,params=register_data,auth=(api_user,api_password))
        
        #print ('返回结果:
',json.dumps(result.json(),indent=4,ensure_ascii=False))
        
        all_bugs_num = len(result.json()["data"])
        
        print ("待处理所有问题:%s条"%all_bugs_num)

        send_age_bugs = []

        for i in result.json()["data"]:

            startdate = i["Bug"]["modified"]

            enddate = pd.to_datetime(startdate) + pd.DateOffset(days=7) # 未处理天数5个工作日+2天周末

            nowdate = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())

            start_date = parse(startdate)

            end_date = parse(str(enddate))
         
            now_date = parse(nowdate)

            if (now_date-end_date).days > -1:

                age = "%s天未处理且无评论"%(now_date-start_date).days

                # 判断是否有评论代码
                
                bug_id=i["Bug"]["id"]
                
                comments_num = bugs_comments(api_user,api_password,workspace_id,bug_id)

                if comments_num < 1:

                    print (bug_id,age)
                    
                    send_age_bugs.append([i["Bug"]["title"],i["Bug"]["status"],i["Bug"]["current_owner"],i["Bug"]["modified"],age])
##                else:
##                    pass
##                    #print (bug_id,"有评论")
        print ("需要发送未处理bug信息:",send_age_bugs)
        return send_age_bugs
    except Exception as e:
        print ('异常:',e)        



# 判断bug是否有评论
def bugs_comments(api_user,api_password,workspace_id,bug_id):
    
    url = "https://api.tapd.cn/comments" #获取缺接口
    
    try:
        send_bugs = []
        
        register_data = {
            "workspace_id":workspace_id,
            "entry_id":bug_id,
            #"created": startime_endtime,
            "limit": 1,
            }
        
        result = requests.get(url,params=register_data,auth=(api_user,api_password))
        
        #print ('返回结果:
',json.dumps(result.json(),indent=4,ensure_ascii=False))
        
        all_bugs_num = len(result.json()["data"])

        return all_bugs_num
        
        #print ("待处理所有问题:%s条"%all_bugs_num)

    except Exception as e:
        all_bugs_num = 0
        print ('异常:',e) 




# 监控新/重新打开/接受处理/延期等bug状态,是否会被测试重启或开发拒绝
def bugs_info(api_user,api_password,workspace_id,startime_endtime):
    
    url = "https://api.tapd.cn/bugs" #获取缺接口
    
    try:
        #============所有待处理问题:新/重新打开/已解决/挂起/延期/接受处理/已拒绝(状态:ALL)
        
        register_data = {
            "workspace_id":workspace_id,
            "status": "reopened|rejected|postponed",
            "created": startime_endtime,
            "limit": 200,
            }
        
        result = requests.get(url,params=register_data,auth=(api_user,api_password))
        
        #print ('返回结果:
',json.dumps(result.json(),indent=4,ensure_ascii=False))
        
        all_bugs_num = len(result.json()["data"])
        
        print ("待处理所有问题:%s条"%all_bugs_num)

        a = []
        b = []
        c = []
        for i in result.json()["data"]:
            #print (i)
            
            if i["Bug"]["status"] == "reopened":
                i["Bug"]["status"] = "重新打开"
            elif i["Bug"]["status"] == "rejected":
                i["Bug"]["status"] = "已拒绝"

            bugs = [i["Bug"]["title"],i["Bug"]["status"],i["Bug"]["current_owner"]]

            #print (bugs)
            if bugs[1] == "已拒绝":
                a.append(bugs)
            elif  bugs[1] == "重新打开":
                b.append(bugs)
            elif bugs[1] == "postponed":
                c.append(bugs)

        print (len(a),len(b),len(c))
        print ("需要发送的已拒绝bug&重新打开bug&延期bug",a,b,c)
        return a,b,c
    except Exception as e:
        print ('异常:',e)

# 按严重程度统计
def bugs_info(api_user,api_password,workspace_id,startime_endtime):
    url = "https://api.tapd.cn/bugs" #获取缺接口   
    try:
        #==============待处理严重问题:新/重新打开/接受处理(状态:严重/致命)
        all_data = []
        register_data = {
            "workspace_id":workspace_id,
            "status": "new|reopened|in_progress",
            "severity": "fatal|serious",
            "created": startime_endtime,
            "limit": 200,
            }
        r = requests.get(url,params=register_data,auth=(api_user,api_password))
        #print('获取接口返回结果:',r.json())
        bugs_num = len(r.json()["data"])
        print ("待处理严重以上问题:%s条"%bugs_num)
        for i in r.json()["data"]:

            if i["Bug"]["severity"] == "serious":
                i["Bug"]["severity"] = "严重"
                
            elif i["Bug"]["severity"] == "fatal":
                i["Bug"]["severity"] = "致命"

            if i["Bug"]["status"] == "new":
                i["Bug"]["status"] = "新"

            elif i["Bug"]["status"] == "reopened":
                i["Bug"]["status"] = "重新打开"

            elif i["Bug"]["status"] == "in_progress":
                i["Bug"]["status"] = "接受处理"

            #a = "标题:%s,状态:%s,严重程度:%s,处理人:%s,概率:%s"%(i["Bug"]["title"],i["Bug"]["status"],i["Bug"]["severity"],i["Bug"]["current_owner"],i["Bug"]["custom_field_7"])
            a = [i["Bug"]["title"],i["Bug"]["status"],i["Bug"]["severity"],i["Bug"]["current_owner"],i["Bug"]["frequency"],i["Bug"]["module"],i["Bug"]["id"]]
            #print (a)
            all_data.append(a)

        print (all_data)
    except Exception as e:
        print ('异常:',e)

   

def send_markdown(webhook,content):
    register_data = {
    "msgtype": "markdown",#支持文本(text)、markdown(markdown)、图片(image)、图文(news)四种消息类型
    "markdown": {
        "content": "%s"%content
        }
    }
    response = requests.post(url = webhook,json=register_data)
    #print (response.text)
    print ("send message sucess..")




def main():
    api_user = '账号'
    
    api_password = '密码'

    # 项目ID
    workspace_id = "项目ID,创建项目时生成"

    # 统计bug时间段
    startime_endtime = "2022-06-01~2022-08-30"

    # 企业微信群连接
    webhook = "在企业微信群,创建一个机器人,点击获取此链接"

    bugs_info(api_user,api_password,workspace_id,startime_endtime)

    # 超时未处理的bug>5天(工作日)且无评论bug
    age_bugs = bugs_age(api_user,api_password,workspace_id,startime_endtime)

    if  0【超时未处理的bug>5天(工作日)且无评论:%s个,处理提醒】**
 
"%len(age_bugs)+str(age_bugs).replace("'],","'],
 
")

        send_markdown(webhook,send_age_bugs)
    # 若数据大于20,只展示20个bug
    elif len(age_bugs)>20:
        send_age_bugs =" **【超时未处理的bug>5天(工作日)且无评论:%s个,处理提醒】**
 
"%len(age_bugs)+str(age_bugs[0:20]).replace("'],","'],
 
")

        send_markdown(webhook,send_age_bugs)


    # 拒绝&重开&延期bug
    a,b,c = bugs_info(api_user,api_password,workspace_id,startime_endtime)

    status_bugs = a+b
    if len(status_bugs)>0:
        send_status_bugs = " **【已拒绝:%s个,重开:%s个,延期:%s个,处理提醒】**
 
"%(len(a),len(b),len(c))+str(status_bugs).replace("'],","'],
 
")
        
        send_markdown(webhook,send_status_bugs)

    # 若数据大于20,只展示20个bug
    elif len(age_bugs)>20:
        send_status_bugs = " **【已拒绝:%s个,重开:%s个,延期:%s个,处理提醒】**
 
"%(len(a),len(b),len(c))+str(status_bugs[0:20]).replace("'],","'],
 
")
        
        send_markdown(webhook,send_status_bugs)
if __name__ == '__main__':
    main()
    # 周一到周五的18:30执行
    at_time = "18:30"
    schedule.every().monday.at(at_time).do(main)
    schedule.every().tuesday.at(at_time).do(main)
    schedule.every().wednesday.at(at_time).do(main)
    schedule.every().thursday.at(at_time).do(main)
    schedule.every().friday.at(at_time).do(main)

    print("定时任务执行中,每天%s(工作日)进行提醒"%at_time)
    while True:
        schedule.run_pending()
        time.sleep(1)

执行:

测试结果:

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章