pip install qcloudsms_py#!/usr/bin/env python# -*- coding:utf-8 -*-import ssl# ssl._create_default_https_context = ssl._create_unverified_contextfrom qcloudsms_py import SmsMultiSender, SmsSingleSenderfrom qcloudsms_py.httpclient import HTTPErrordef send_sms_single(phone_num, template_id, template_param_list): """ 单条发送短信 :param phone_num: 手机号 :param template_id: 腾讯云短信模板ID :param template_param_list: 短信模板所需参数列表,例如:【验证码:{1},描述:{2}】,则传递参数 [888,666]按顺序去格式化模板 :return: """ appid = 112142311 # 自己应用ID appkey = "8cc5b87123y423423412387930004" # 自己应用Key sms_sign = "Python之路" # 自己腾讯云创建签名时填写的签名内容(使用公众号的话这个值一般是公众号全称或简称) sender = SmsSingleSender(appid, appkey) try: response = sender.send_with_param(86, phone_num, template_id, template_param_list, sign=sms_sign) except HTTPError as e: response = {'result': 1000, 'errmsg': "网络异常发送失败"} return responsedef send_sms_multi(phone_num_list, template_id, param_list): """ 批量发送短信 :param phone_num_list:手机号列表 :param template_id:腾讯云短信模板ID :param param_list:短信模板所需参数列表,例如:【验证码:{1},描述:{2}】,则传递参数 [888,666]按顺序去格式化模板 :return: """ appid = 112142311 appkey = "8cc5b87123y423423412387930004" sms_sign = "Python之路" sender = SmsMultiSender(appid, appkey) try: response = sender.send_with_param(86, phone_num_list, template_id, param_list, sign=sms_sign) except HTTPError as e: response = {'result': 1000, 'errmsg': "网络异常发送失败"} return responseif __name__ == '__main__': result1 = send_sms_single("15131255089", 548760, [666, ]) print(result1) result2 = send_sms_single( ["15131255089", "15131255089", "15131255089", ],548760, [999, ]) print(result2)折叠
## settings.pyTENCENT_COS_SECRET_ID = 'AKIDEH8j4WuhRtBF' # 替换为用户的 secretIdTENCENT_COS_SECRET_KEY = '5apoY5XilhVd' # 替换为用户的 secretKey# -*- coding=utf-8# appid 已在配置中移除,请在参数 Bucket 中带上 appid。Bucket 由 BucketName-APPID 组成# 1. 设置用户配置, 包括 secretId,secretKey 以及 Regionfrom qcloud_cos import CosConfigfrom qcloud_cos import CosS3Clientfrom django.conf import settingsdef create_bucket(bucket, region='ap-shanghai'): # token = None # 使用临时密钥需要传入 Token,默认为空,可不填 # scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填 config = CosConfig(Region=region, SecretId=settings.TENCENT_COS_SECRET_ID, SecretKey=settings.TENCENT_COS_SECRET_KEY) # 2. 获取客户端对象 client = CosS3Client(config) # 创建存储桶 client.create_bucket( Bucket = bucket, # 桶名称 ACL = "public-read", # private / public-read / public-read-write ) # 解决跨域问题 cors_config = { 'CORSRule':[ { 'AllowedOrigin':'*', 'AllowedMethod':['GET', 'POST', 'PUT', 'DELETE', 'HEAD'], 'AllowedHeader':'*', 'ExposeHeader':'*', 'MaxAgeSeconds':500 } ] } client.put_bucket_cors(Bucket=bucket, CORSConfiguration=cors_config)def upload_file(bucket, file_obj, key, region='ap-shanghai'): config = CosConfig(Region=region, SecretId=settings.TENCENT_COS_SECRET_ID, SecretKey=settings.TENCENT_COS_SECRET_KEY) # 2. 获取客户端对象 client = CosS3Client(config) #### 高级上传接口(推荐) # 根据文件大小自动选择简单上传或分块上传,分块上传具备断点续传功能。 response = client.upload_file_from_buffer( Bucket=bucket, Body = file_obj, # 本地文件的路径 Key=key, # 上传到桶之后的文件名 ) ## 返回图片路径 # https://saas-1259576896.cos.ap-shanghai.myqcloud.com/QQ%E5%9B%BE%E7%89%8720190706214717.jpg return 'https://{}.cos.{}.myqcloud.com/{}'.format(bucket, region, key)def delete_file(bucket, key, region='ap-shanghai'): config = CosConfig(Region=region, SecretId=settings.TENCENT_COS_SECRET_ID, SecretKey=settings.TENCENT_COS_SECRET_KEY) client = CosS3Client(config) client.delete_object( Bucket=bucket, Key=key, # 文件名 )折叠
## settings.pyEMAIL_HOST = "smtp.163.com" # 服务器EMAIL_PORT = 25 # 一般情况下都为25EMAIL_HOST_USER = "m18055559006@163.com" # 账号EMAIL_HOST_PASSWORD = "MUQSSTXASITFJCUE" # 密码 (注意:这里的密码指的是授权码)EMAIL_USE_TLS = False # 一般都为FalseEMAIL_FROM = "m18055559006@163.com" # 邮箱来自from random import Random # 用于生成随机码from django.core.mail import send_mail # 发送邮件模块from django.conf import settings # setting.py添加的的配置信息def random_str(randomlength=8): """ 随机字符串 :param randomlength: 字符串长度 :return: String 类型字符串 """ str = '' chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789' length = len(chars) - 1 random = Random() for i in range(randomlength): str += chars[random.randint(0, length)] return stremail_title = "注册激活"# email_body = "请点击下面的链接激活你的账号:http://127.0.0.1:8000/active/{0}".format(code)email_body = "您的邮箱注册验证码为:{0}, 该验证码有效时间为两分钟,请及时进行验证。".format(random_str(16))# 发送邮件email = '1241880457@qq.com'send_status = send_mail(email_title, email_body, settings.EMAIL_FROM, [email])print(send_status)
pip install aliyun-python-sdk-core-v3pip install idna # 如果报没有idna包安装此包from aliyunsdkcore.client import AcsClientfrom aliyunsdkcore.request import CommonRequestdef ali_send_email(): client = AcsClient('', '', 'cn-hangzhou') request = CommonRequest() request.set_accept_format('json') request.set_domain('dm.aliyuncs.com') request.set_method('POST') request.set_protocol_type('https') request.set_version('2015-11-23') request.set_action_name('SingleSendMail') request.add_query_param('RegionId', "cn-hangzhou") request.add_query_param('AccountName', "admin@chenwenyin.top") request.add_query_param('AddressType', "1") request.add_query_param('ReplyToAddress', "false") request.add_query_param('ToAddress', "m17857589080@163.com") request.add_query_param('Subject', "注册") request.add_query_param('TagName', "注册") code = random_str() email_body = "您的邮箱注册验证码为:{0}, 该验证码有效时间为两分钟,请及时进行验证。".format(code) request.add_query_param('TextBody', email_body) response = client.do_action_with_exception(request) print(str(response, encoding = 'utf-8')) return responseali_send_email()
https://github.com/fzlee/alipay
# 安装pip install python-alipay-sdk# alipay_public_key.pem-----BEGIN PUBLIC KEY-----拿应用公钥跟支付宝换来的支付宝公钥-----END PUBLIC KEY-----# app_private_key.pem-----BEGIN RSA PRIVATE KEY-----通过支付宝公钥私钥签发软件签发的应用私钥-----END RSA PRIVATE KEY-----开发:https://openapi.alipay.com/gateway.do沙箱:https://openapi.alipaydev.com/gateway.do# pay.pyfrom alipay import AliPayfrom . import settings# 支付对象alipay = AliPay( appid=settings.APP_ID, app_notify_url=None, app_private_key_string=settings.APP_PRIVATE_KEY_STRING, alipay_public_key_string=settings.ALIPAY_PUBLIC_KEY_STRING, sign_type=settings.SIGN, debug=settings.DEBUG)# 支付网关gateway = settings.GATEWAY## 支付宝回调from utils.logging import logger# 支付回调接口class SuccessViewSet(ViewSet): authentication_classes = () permission_classes = () # 支付宝同步回调给前台,在同步通知给后台处理 def get(self, request, *args, **kwargs): # return Response('后台已知晓,Over!!!') # 不能在该接口完成订单修改操作 # 但是可以在该接口中校验订单状态(已经收到支付宝post异步通知,订单已修改),告诉前台 # print(type(request.query_params)) # django.http.request.QueryDict # print(type(request.query_params.dict())) # dict out_trade_no = request.query_params.get('out_trade_no') try: models.Order.objects.get(out_trade_no=out_trade_no, order_status=1) return APIResponse(result=True) except: return APIResponse(1, 'error', result=False) # 支付宝异步回调处理 def post(self, request, *args, **kwargs): try: result_data = request.data.dict() out_trade_no = result_data.get('out_trade_no') signature = result_data.pop('sign') from libs import iPay result = iPay.alipay.verify(result_data, signature) if result and result_data["trade_status"] in ("TRADE_SUCCESS", "TRADE_FINISHED"): # 完成订单修改:订单状态、流水号、支付时间 models.Order.objects.filter(out_trade_no=out_trade_no).update(order_status=1) # 完成日志记录 logger.warning('%s订单支付成功' % out_trade_no) return Response('success') else: logger.error('%s订单支付失败' % out_trade_no) except: pass return Response('failed')折叠
# settings.pyOTS_ID = "LTAI4G9b9XpoZyERTEQ1Xmz"OTS_SECRET = "JkUTv1nEWhGGERhjE09mmUbqjkVODdh"OTS_INSTANCE = "cwy-renran"OTS_ENDPOINT = "https://cwy-renran.cn-hangzhou.ots.aliyuncs.com"# 安装pip install tablestore# utils.pyfrom tablestore import *from django.conf import settingsclass OTS(object): """工具类: 表格存储""" def __init__(self): self.client = OTSClient(settings.OTS_ENDPOINT, settings.OTS_ID, settings.OTS_SECRET, settings.OTS_INSTANCE) def list_table(self): """列出所有的存储表""" return self.client.list_table() def create_table(self, table_name, schema_of_primary_key): """ :param table_name: 字符串,表名 :param schema_of_primary_key: 列表,每一个成员是一个元祖,表示字段的名称和类型 :return: """ # 通过表名和主键列的schema创建一个tableMeta。 table_meta = TableMeta(table_name, schema_of_primary_key) # 创建TableOptions,数据保留31536000秒,超过后自动删除;最大3个版本;写入时指定的版本值和当前标准时间相差不能超过1天。 table_options = TableOptions(31536000, 3, 86400) # 设置预留读吞吐量为0,预留写吞吐量为0。 reserved_throughput = ReservedThroughput(CapacityUnit(0, 0)) try: self.client.create_table(table_meta, table_options, reserved_throughput) return True # 处理异常。 except Exception: return False def delete_table(self, table_name): """ 删除表 :param table_name: 参数如果是字符串,则表示删除一张表, 参数如果是列表,则表示删除多张表 :return: """ tables = [] if type(table_name) is str: tables.append(table_name) else: tables = table_name ret = { "success": [], "fails": [] } for table in tables: try: self.client.delete_table(table) ret["success"].append(table) except Exception: ret["fails"].append(table) return ret def put_row(self, table_name, primary_key, attribute_columns={}): """ 添加一条数据 :param table_name: 本次添加数据的表名 :param primary_key: 主键列 :param attribute_columns: 其他属性列 :return: 新增数据的主键 """ primary_key_list = [] for key, value in primary_key.items(): primary_key_list.append((key, value)) attribute_columns_list = [] for key, value in attribute_columns.items(): attribute_columns_list.append((key, value)) row = Row(primary_key_list, attribute_columns_list) # 添加数据的条件 # EXPECT_NOT_EXIST 如果主键重复,则报错! condition = Condition(RowExistenceExpectation.EXPECT_NOT_EXIST) try: # 调用put_row方法, ReturnType.RT_PK表示返回新增数据的主键信息,如果不设置则返回None consumed, return_row = self.client.put_row(table_name, row, condition, ReturnType.RT_PK) return True, return_row.primary_key except OTSClientError as e: """网络异常""" return False, "put row failed, http_status:%d, error_message:%s" % ( e.get_http_status(), e.get_error_message()) # 服务端异常,一般为参数错误或者流控错误。 except OTSServiceError as e: """参数有误""" return False, "put row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % ( e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()) except Exception as e: """其他错误""" return False, "未知的异常" def get_row(self, table_name, primary_key, columns_to_get=[], column_filter=None): """""" # 指定查询结果返回的属性列 # 过滤条件 """ # ====================== 多条件 ======================== # 逻辑条件 cond = CompositeColumnCondition(LogicalOperator.AND) cond = CompositeColumnCondition(LogicalOperator.OR) cond = CompositeColumnCondition(LogicalOperator.NOT) # 比较条件 ComparatorType.NOT_EQUAL != ComparatorType.EQUAL == GREATER_THAN > GREATER_EQUAL >= LESS_THAN < LESS_EQUAL <= 举例: 查询一个学生信息,性别为男的,小于20岁===> (sex=男 and age < 20) or (sex=女 and age < 17) cond = CompositeColumnCondition(LogicalOperator.AND) cond.add_sub_condition(SingleColumnCondition("sex", '男', ComparatorType.EQUAL)) cond.add_sub_condition(SingleColumnCondition("age", 20, ComparatorType.LESS_THAN)) # ====================== 单条件 ======================== cond = SingleColumnCondition("age", 20, ComparatorType.LESS_THAN) """ primary_key_list = [] for key, value in primary_key.items(): primary_key_list.append((key, value)) try: # 调用get_row接口查询 consumed, return_row, next_token = self.client.get_row(table_name, primary_key_list, columns_to_get, column_filter, 1) data = {} if return_row is not None: for att in return_row.primary_key: # 打印每一个字段的内容 data[att[0]] = att[1] if len(columns_to_get) > 0: """如果有指定要返回其他属性列""" for att in return_row.attribute_columns: # 打印每一个字段的内容 data[att[0]] = att[1] return True, data # 客户端异常,一般为参数错误或者网络异常。 except OTSClientError as e: return False, "网络异常,获取数据失败, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()) # 服务端异常,一般为参数错误或者流控错误。 except OTSServiceError as e: return False, "参数有误,获取数据失败, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % ( e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()) except Exception as e: return False, "未知异常, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % ( e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()) def update_row(self, table_name, primary_key, attribute_columns): """更新一条数据""" primary_key_list = [] for key, value in primary_key.items(): primary_key_list.append((key, value)) attribute_columns_list = [] for key, value in attribute_columns.items(): attribute_columns_list.append((key, value)) update_of_attribute_columns = { 'PUT': attribute_columns_list, } row = Row(primary_key_list, update_of_attribute_columns) try: consumed, return_row = self.client.update_row(table_name, row, condition=None, return_type=ReturnType.RT_PK) data = {} for att in return_row.primary_key: # 打印每一个字段的内容 data[att[0]] = att[1] return True, data # 客户端异常,一般为参数错误或者网络异常。 except OTSClientError as e: return False, "更新失败, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()) # 服务端异常,一般为参数错误或者流控错误。 except OTSServiceError as e: return False, "更新失败, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % ( e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()) except Exception as e: return False, "未知异常" def delete_row(self, table_name, primary_key): """根据主键删除一条数据""" primary_key_list = [] for key, value in primary_key.items(): primary_key_list.append((key, value)) row = Row(primary_key_list) try: consumed, return_row = self.client.delete_row(table_name, row, None) return True, "删除成功" except OTSClientError as e: return False, "更新失败!网络异常, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()) except OTSServiceError as e: return False, "更新失败,参数有误, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % ( e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()) except Exception as e: return False, "未知异常" def get_list_0(self, table_name, primary_key_list, columns_to_get=[], column_filter=None): """ 根据指定主键查询多条数据 :param table: 字符串,表名 :param primary_key_list: 主键列表 :param columns_to_get: 返回属性字段列表 :param column_filter: 条件 :return: 列表, 查询结果 """ # 读取3行。 rows_to_get = [] for item in primary_key_list: primary_key = [] for key, value in item.items(): primary_key.append((key, value)) rows_to_get.append(primary_key) # 构造批量读请求。 request = BatchGetRowRequest() # 增加表table_name中需要读取的行,最后一个参数1表示读取最新的一个版本。 request.add(TableInBatchGetRowItem(table_name, rows_to_get, columns_to_get, column_filter, 1)) try: result = self.client.batch_get_row(request) if result.is_all_succeed(): """只有在获取到全部数据以后才表示读取多条数据成功""" table_result = result.get_result_by_table(table_name) data = [] for item in table_result: row = {} if item.is_ok: for att in item.row.primary_key: # 打印每一个字段的内容 row[att[0]] = att[1] for att in item.row.attribute_columns: # 打印每一个字段的内容 row[att[0]] = att[1] else: return False, '部分数据参数有误,读取数据失败。 error code: %s, error message: %s' % ( item.error_code, item.error_message) data.append(row) return True, data else: return False, '部分数据参数有误,读取数据失败。' # 客户端异常,一般为参数错误或者网络异常。 except OTSClientError as e: return False, "读取数据失败,网络异常。 http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()) # 服务端异常,一般为参数错误或者流控错误。 except OTSServiceError as e: return False, "读取数据失败,参数有误。 http_status:%d, error_code:%s, error_message:%s, request_id:%s" % ( e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()) except Exception as e: return False, "未知异常" def add_list(self, table_name, primary_key_list, attribute_columns_list=[]): """ 添加多条数据 :param table_name: 字符串,表名 :param primary_key_list: 主键列 :param attribute_columns_list: 属性列 :return: """ put_row_items = self.row_items(PutRowItem, primary_key_list, attribute_columns_list) status, result = self.request(table_name, put_row_items) if status == False: return False, result # 输出每一行添加数据的结果。 succ, fail = result.get_put() for item in succ: print('添加数据成功, consume %s write cu.' % item.consumed.write) for item in fail: print('添加数据失败, error code: %s, error message: %s' % (item.error_code, item.error_message)) return True def update_list(self, table_name, primary_key_list, attribute_columns_list=[]): """更新多条数据""" update_row_items = self.row_items(UpdateRowItem, primary_key_list, attribute_columns_list) status, result = self.request(table_name, update_row_items) if status == False: return False, result # 输出每一行更新数据的结果 succ, fail = result.get_update() for item in succ: print('更新数据成功, consume %s write cu.' % item.consumed.write) for item in fail: print('更新数据失败, error code: %s, error message: %s' % (item.error_code, item.error_message)) return True def delete_list(self, table_name, primary_key_list): """删除多条数据""" delete_row_items = self.row_items(DeleteRowItem, primary_key_list) status, result = self.request(table_name, delete_row_items) if status == False: return False, result # 输出每一行删除数据的结果。 succ, fail = result.get_delete() for item in succ: print('删除数据成功, consume %s write cu.' % item.consumed.write) for item in fail: print('删除数据失败, error code: %s, error message: %s' % (item.error_code, item.error_message)) return True def row_items(self, cls, primary_key_list, attribute_columns_list=[]): if len(primary_key_list) != len(attribute_columns_list) and len(attribute_columns_list) != 0: return False, "参数有误!主键和属性列数量不对应,无法完成添加操作" # 增加多行组件的组装。 row_items = [] for key0, primary_item in enumerate(primary_key_list): primary_row = [] for key1, value in primary_item.items(): primary_row.append((key1, value)) attribute_row = None if len(attribute_columns_list) > 0: if cls == PutRowItem: """添加多条""" attribute_row = [] for key2, value in attribute_columns_list[key0].items(): attribute_row.append((key2, value)) elif cls == UpdateRowItem: """更新多条""" attribute_row = {"put": []} for key2, value in attribute_columns_list[key0].items(): attribute_row["put"].append((key2, value)) row = Row(primary_row, attribute_row) condition = Condition(RowExistenceExpectation.IGNORE) item = cls(row, condition) row_items.append(item) return row_items def request(self, table_name, put_row_items): """ 构造批量写请求 :param table_name: :param put_row_items: :return: """ request = BatchWriteRowRequest() request.add(TableInBatchWriteRowItem(table_name, put_row_items)) try: result = self.client.batch_write_row(request) if result.is_all_succeed(): return True, result except OTSClientError as e: return False, "网络异常, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()) except OTSServiceError as e: return False, "参数有误, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % ( e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()) except Exception as e: return False, "未知异常" def get_list(self, table_name, start_key, end_key, columns_to_get=[], limit=90, cond=None, derection=Direction.FORWARD): """ 根据指定范围查询数据 :param table_name: 字符串,表名 :param start_key: 字典,查询的开始主键位置 :param end_key: 字典,查询的结束主键位置 :param columns_to_get: 列表,属性列 :param limit: 整型, 查询结果数量 :param cond: Cond类对象,查询条件 :param derection: 查询的排列方式,Direction.FORWARD 正序,Direction.BACKWARD 倒序 :return: """ # 设置范围查询的起始主键。 inclusive_start_primary_key = [] if type(start_key) is dict: for key, value in start_key.items(): inclusive_start_primary_key.append((key, value)) else: inclusive_start_primary_key=start_key # 设置范围查询的结束主键。 exclusive_end_primary_key = [] if type(end_key) is dict: for key, value in end_key.items(): exclusive_end_primary_key.append((key, value)) else: exclusive_end_primary_key=end_key try: # 读取数据 consumed, next_start_primary_key, row_list, next_token = self.client.get_range( table_name, derection, inclusive_start_primary_key, exclusive_end_primary_key, columns_to_get, limit, column_filter=cond, max_version=1, ) all_rows = [] all_rows.extend(row_list) # 打印主键和属性列。 data = [] for row in all_rows: data_item = {} for att in row.primary_key: data_item[att[0]] = att[1] if len(columns_to_get) > 0: """如果有指定要返回其他属性列""" for att in row.attribute_columns: data_item[att[0]] = att[1] data.append(data_item) return {"status": True, "data": data, "token": next_start_primary_key} # 客户端异常,一般为参数错误或者网络异常。 except OTSClientError as e: return False, "网络异常, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()) # 服务端异常,一般为参数错误或者流控错误。 except OTSServiceError as e: return False, "参数有误, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % ( e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()) except Exception as e: return False, "未知异常"
留言与评论(共有 0 条评论) “” |