需求:
项目迭代过程,开发需要每天处理遗留缺陷,特别是严重/致命缺陷,需要在发布前处理完成。为了及时同步缺陷处理进展,需要每天统计bug状态及修复情况并在项目组(企业微信项目群)里同步信息
代码:
import requests
import time
import json
import pandas as pd
from dateutil.parser import parse
import schedule
# 缺陷年龄统计
def bugs_age(api_user,api_password,workspace_id,startime_endtime):
url = "https://api.tapd.cn/bugs" #获取缺接口
try:
#============所有待处理问题:新/重新打开/已解决/挂起/延期postponed/接受处理/已拒绝(状态:ALL)
send_bugs = []
register_data = {
"workspace_id":workspace_id,
"status": "new|reopened|in_progress",
"created": startime_endtime,
"limit": 200,
}
result = requests.get(url,params=register_data,auth=(api_user,api_password))
#print ('返回结果:
',json.dumps(result.json(),indent=4,ensure_ascii=False))
all_bugs_num = len(result.json()["data"])
print ("待处理所有问题:%s条"%all_bugs_num)
send_age_bugs = []
for i in result.json()["data"]:
startdate = i["Bug"]["modified"]
enddate = pd.to_datetime(startdate) + pd.DateOffset(days=7) # 未处理天数5个工作日+2天周末
nowdate = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
start_date = parse(startdate)
end_date = parse(str(enddate))
now_date = parse(nowdate)
if (now_date-end_date).days > -1:
age = "%s天未处理且无评论"%(now_date-start_date).days
# 判断是否有评论代码
bug_id=i["Bug"]["id"]
comments_num = bugs_comments(api_user,api_password,workspace_id,bug_id)
if comments_num < 1:
print (bug_id,age)
send_age_bugs.append([i["Bug"]["title"],i["Bug"]["status"],i["Bug"]["current_owner"],i["Bug"]["modified"],age])
## else:
## pass
## #print (bug_id,"有评论")
print ("需要发送未处理bug信息:",send_age_bugs)
return send_age_bugs
except Exception as e:
print ('异常:',e)
# 判断bug是否有评论
def bugs_comments(api_user,api_password,workspace_id,bug_id):
url = "https://api.tapd.cn/comments" #获取缺接口
try:
send_bugs = []
register_data = {
"workspace_id":workspace_id,
"entry_id":bug_id,
#"created": startime_endtime,
"limit": 1,
}
result = requests.get(url,params=register_data,auth=(api_user,api_password))
#print ('返回结果:
',json.dumps(result.json(),indent=4,ensure_ascii=False))
all_bugs_num = len(result.json()["data"])
return all_bugs_num
#print ("待处理所有问题:%s条"%all_bugs_num)
except Exception as e:
all_bugs_num = 0
print ('异常:',e)
# 监控新/重新打开/接受处理/延期等bug状态,是否会被测试重启或开发拒绝
def bugs_info(api_user,api_password,workspace_id,startime_endtime):
url = "https://api.tapd.cn/bugs" #获取缺接口
try:
#============所有待处理问题:新/重新打开/已解决/挂起/延期/接受处理/已拒绝(状态:ALL)
register_data = {
"workspace_id":workspace_id,
"status": "reopened|rejected|postponed",
"created": startime_endtime,
"limit": 200,
}
result = requests.get(url,params=register_data,auth=(api_user,api_password))
#print ('返回结果:
',json.dumps(result.json(),indent=4,ensure_ascii=False))
all_bugs_num = len(result.json()["data"])
print ("待处理所有问题:%s条"%all_bugs_num)
a = []
b = []
c = []
for i in result.json()["data"]:
#print (i)
if i["Bug"]["status"] == "reopened":
i["Bug"]["status"] = "重新打开"
elif i["Bug"]["status"] == "rejected":
i["Bug"]["status"] = "已拒绝"
bugs = [i["Bug"]["title"],i["Bug"]["status"],i["Bug"]["current_owner"]]
#print (bugs)
if bugs[1] == "已拒绝":
a.append(bugs)
elif bugs[1] == "重新打开":
b.append(bugs)
elif bugs[1] == "postponed":
c.append(bugs)
print (len(a),len(b),len(c))
print ("需要发送的已拒绝bug&重新打开bug&延期bug",a,b,c)
return a,b,c
except Exception as e:
print ('异常:',e)
# 按严重程度统计
def bugs_info(api_user,api_password,workspace_id,startime_endtime):
url = "https://api.tapd.cn/bugs" #获取缺接口
try:
#==============待处理严重问题:新/重新打开/接受处理(状态:严重/致命)
all_data = []
register_data = {
"workspace_id":workspace_id,
"status": "new|reopened|in_progress",
"severity": "fatal|serious",
"created": startime_endtime,
"limit": 200,
}
r = requests.get(url,params=register_data,auth=(api_user,api_password))
#print('获取接口返回结果:',r.json())
bugs_num = len(r.json()["data"])
print ("待处理严重以上问题:%s条"%bugs_num)
for i in r.json()["data"]:
if i["Bug"]["severity"] == "serious":
i["Bug"]["severity"] = "严重"
elif i["Bug"]["severity"] == "fatal":
i["Bug"]["severity"] = "致命"
if i["Bug"]["status"] == "new":
i["Bug"]["status"] = "新"
elif i["Bug"]["status"] == "reopened":
i["Bug"]["status"] = "重新打开"
elif i["Bug"]["status"] == "in_progress":
i["Bug"]["status"] = "接受处理"
#a = "标题:%s,状态:%s,严重程度:%s,处理人:%s,概率:%s"%(i["Bug"]["title"],i["Bug"]["status"],i["Bug"]["severity"],i["Bug"]["current_owner"],i["Bug"]["custom_field_7"])
a = [i["Bug"]["title"],i["Bug"]["status"],i["Bug"]["severity"],i["Bug"]["current_owner"],i["Bug"]["frequency"],i["Bug"]["module"],i["Bug"]["id"]]
#print (a)
all_data.append(a)
print (all_data)
except Exception as e:
print ('异常:',e)
def send_markdown(webhook,content):
register_data = {
"msgtype": "markdown",#支持文本(text)、markdown(markdown)、图片(image)、图文(news)四种消息类型
"markdown": {
"content": "%s"%content
}
}
response = requests.post(url = webhook,json=register_data)
#print (response.text)
print ("send message sucess..")
def main():
api_user = '账号'
api_password = '密码'
# 项目ID
workspace_id = "项目ID,创建项目时生成"
# 统计bug时间段
startime_endtime = "2022-06-01~2022-08-30"
# 企业微信群连接
webhook = "在企业微信群,创建一个机器人,点击获取此链接"
bugs_info(api_user,api_password,workspace_id,startime_endtime)
# 超时未处理的bug>5天(工作日)且无评论bug
age_bugs = bugs_age(api_user,api_password,workspace_id,startime_endtime)
if 0【超时未处理的bug>5天(工作日)且无评论:%s个,处理提醒】**
"%len(age_bugs)+str(age_bugs).replace("'],","'],
")
send_markdown(webhook,send_age_bugs)
# 若数据大于20,只展示20个bug
elif len(age_bugs)>20:
send_age_bugs =" **【超时未处理的bug>5天(工作日)且无评论:%s个,处理提醒】**
"%len(age_bugs)+str(age_bugs[0:20]).replace("'],","'],
")
send_markdown(webhook,send_age_bugs)
# 拒绝&重开&延期bug
a,b,c = bugs_info(api_user,api_password,workspace_id,startime_endtime)
status_bugs = a+b
if len(status_bugs)>0:
send_status_bugs = " **【已拒绝:%s个,重开:%s个,延期:%s个,处理提醒】**
"%(len(a),len(b),len(c))+str(status_bugs).replace("'],","'],
")
send_markdown(webhook,send_status_bugs)
# 若数据大于20,只展示20个bug
elif len(age_bugs)>20:
send_status_bugs = " **【已拒绝:%s个,重开:%s个,延期:%s个,处理提醒】**
"%(len(a),len(b),len(c))+str(status_bugs[0:20]).replace("'],","'],
")
send_markdown(webhook,send_status_bugs)
if __name__ == '__main__':
main()
# 周一到周五的18:30执行
at_time = "18:30"
schedule.every().monday.at(at_time).do(main)
schedule.every().tuesday.at(at_time).do(main)
schedule.every().wednesday.at(at_time).do(main)
schedule.every().thursday.at(at_time).do(main)
schedule.every().friday.at(at_time).do(main)
print("定时任务执行中,每天%s(工作日)进行提醒"%at_time)
while True:
schedule.run_pending()
time.sleep(1)
执行:
测试结果:
留言与评论(共有 0 条评论) “” |