关注我《程序猿集锦》获取更多分享。
目录结构
使用python来操作MySQL数据库,有很多中方式。有如下几种:
后面实战中会使用到的SQL语句如下:
create database mydb1 default charset utf8mb4;use mydb1;-- mydb1.mytab1 definitionCREATE TABLE `mytab1` ( `id` smallint(6) NOT NULL AUTO_INCREMENT, `col1` varchar(255) DEFAULT NULL, `col2` varchar(255) DEFAULT NULL, `col3` varchar(255) DEFAULT NULL, `col4` int(11) DEFAULT NULL, `col5` varchar(16) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;use mydb1;begin;INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(-2, 'g1', 'g2', 'g3', -1, '第-2条记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(-1, 'g1', 'g2', 'g3', -1, '第-1条记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(1, 'a1', 'a2', 'a3', 1, 'a1,"a2,"a3');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(2, 'b1', 'b2', 'b3', 2, 'b1,"b2,"b3');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(3, 'c1', 'c2', 'c3', 6, 'c1,"c2,"c3');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(4, 'd1', 'd2', 'd3', 8, '第四条记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(5, 'e1', 'e2', 'e3', 10, '新增加的记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(6, 'f1', 'f2', 'f3', 666, '第6条记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(9, 'A1', 'A2', 'A3', 4, 'A5记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(10, 'A1', 'A2', 'A3', 4, 'A5记录');commit;
MySQLdb:c语言编写,只支持Python2.x,最多支持到Python3.4版本,又叫MySQL-Python。
GitHub仓库的地址为:https://github.com/farcepest/MySQLdb1
仓库名称原先为MySQLdb,后来为了和MySQLdb支持Python3.x的版本区分开,就改为了MySQLdb1,而支持Python3的MySQLdb版本也不在叫MySQLdb了,而是改为了mysqlclient
mysqlclient:基于MySQLdb改进,支持了Python3.x,同时修复了一些问题。也就是Python3版本的MySQLdb。
Github的地址为:https://github.com/PyMySQL/mysqlclient
官方文档地址为:https://mysqlclient.readthedocs.io/index.html
安装的时候使用:pip3 install mysqlclient命令或者pip install mysqlclient来安装,GitHub仓库中的README文件中有详细的安装方式,这里不再赘述。
使用方式和原先的MySQLdb一样,没有什么区别。示例如下:
import MySQLdb# 如果提示没有安装MySQLdb,执行pip3 install mysqlclient,如果pip3命令保证,则执行brew install mysql,然后在执行pip3 install mysqlclient,这是在Mac系统上面的Python3的环境下面使用MySQLdb的时候这样操作。# 创建连接conn = MySQLdb.connect(host="127.0.0.1", user="root", password="root", port=33061, database="mydb1", charset="utf8")# 获取游标# cursor = conn.cursor() # 返回结果是元组形式,不带列名称,只有字段的值组成的元组cursor = conn.cursor(MySQLdb.cursors.DictCursor) # 返回结果是字典形式,列表为key,列值为value。# 执行SQL语句row = cursor.execute("select * from mytab1 limit 2;")print(row) # 影响的行数# 获取执行结果result = cursor.fetchall()# 关闭连接cursor.close()conn.close()# 遍历返回结果for item in result: print(type(item)) print(item)
mysql connector for python:是MySQL官方提供的纯Python驱动.
安装方式使用pip3 install mysql-connector-python
import mysql.connector# 安装方式使用 pip3 install mysql-connector-pythonconn=mysql.connector.connect(host = '127.0.0.1' # 连接名称,默认127.0.0.1 ,user = 'root' # 用户名 ,passwd='password' # 密码 ,port= 3306 # 端口,默认为3306 ,db='test' # 数据库名称 ,charset='utf8' # 字符编码)cur = conn.cursor() # 生成游标对象sql="select * from `student` " # SQL语句cur.execute(sql) # 执行SQL语句data = cur.fetchall() # 通过fetchall方法获得数据cur.close() # 关闭游标conn.close() # 关闭连接for i in data[:2]: # 打印输出前2条数据print (i)
pymysql:采用Python编写,性能低于MySQLdb和mysqlclient。
安装方式使用pip3 install pymysql,它的简单的示例如下:
import pymysql # 打开数据库连接conn = pymysql.connect(host='localhost', user='testuser', password='test123', database='TESTDB') # 使用 cursor() 方法创建一个游标对象 cursorcursor = conn.cursor() # 使用 execute() 方法执行 SQL 查询 cursor.execute("SELECT VERSION()") # 使用 fetchone() 方法获取单条数据.data = cursor.fetchone() print ("Database version : %s " % data) # 关闭数据库连接cursor.close()conn.close()
上面的方式适用于简单的数据库操作,如果想使用pymysql来作为操作数据库的工具,那么把它结合连接池一起来使用,将会更加的高效。下面介绍pymysql结合连接池PooledDB一起来操作MySQL数据库。
前提:
pip3 install pymysqlpip3 install PooledDB
在Python2的版本中,经常使用的是MySQLdb,但是这个MySQLdb只支持到Python3.4版本。所以还是直接用pymysql这个工具包或者mysqlclient也就是MySQLdb支持Python3的版本。
MySQLdb要快点,原因是这个是C写的,速度快
MySQLdb只支持Python2.x,还不支持3.x。可以用PyMySQL代替。安装方法:pip install PyMySQL
然后在需要的项目中,把 init.py中添加两行:意思是把pymysql当成MySQLdb使用。
import pymysqlpymysql.install_as_MySQLdb()
以后,单独介绍这个ORM框架。
import loggingimport pymysqlfrom dbutils.pooled_db import PooledDBclass PymysqlPooledSingletonDBUtil: def __init__(self): """ 构造方法,初始化数据库连接池。 :return: """ # TODO 连接信息从数据库中获取 self.connection_pool = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1, # 所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 maxshared=3, blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] # ping MySQL服务端,检查是否服务可用。 # 如:0 = None = never, # 1 = default = whenever it is requested, # 2 = when a cursor is created, # 4 = when a query is executed, # 7 = always ping=0, host='127.0.0.1', # 数据库主机地址 port=33061, # 数据库主机地址提供的监听端口 user='root', # 数据库用户名 password='root', # 数据库用户的密码 database='mydb1', # 连接到哪一个数据库 charset='utf8' # 连接数据库的时候使用的字符集编码 ) def get_db_conn_cursor(self): """ 获取数据库连接,即从连接池中获取一个数据库连接 :return: 数据库连接对象connection和cursor。 """ try: conn = self.connection_pool.connection() # cursor = conn.cursor() # 此时查询结果返回的是元组 cursor = conn.cursor(pymysql.cursors.DictCursor) # 此时查询结果返回的是字典 return conn, cursor except Exception as e: logging.error("获取连接对象失败,其错误信息如下:") logging.error(e) finally: pass def close_db_conn_cursor(self, *args): """ 关闭数据库连接,即把数据库连接放回连接池中 :param args: 待关闭的连接资源 :return: """ try: for item in args: item.close() except Exception as e: logging.error("关闭数据库对象失败,其错误信息如下:") logging.error(e) finally: pass def execute_sql(self, sql, args=None): """ 执行SQL语句 :param sql: 待执行的SQL语句 :param args: SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: execute_result = cursor.execute(sql, args) conn.commit() print(type(execute_result)) print(execute_result) return execute_result # 如果执行成功,返回执行成功的结果,否则返回False except Exception as e: logging.error("SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中 def fetch_one(self, sql, args=None): """ 根据指定的SQL查询语句和SQL查询语句需要的参数,查询一行数据。 :param sql: 待执行的SQL查询语句。 :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: """ conn, cursor = self.get_db_conn_cursor() try: cursor.execute(sql, args) result_one = cursor.fetchone() return result_one except Exception as e: logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql)) logging.error(e) finally: logging.info("查询的SQL语句:{},查询成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) def fetch_all(self, sql, args=None): """ 根据指定的SQL查询语句和SQL查询语句需要的参数,查询所有的数据行。 :param sql: 待执行的SQL查询语句 :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: """ conn, cursor = self.get_db_conn_cursor() try: cursor.execute(sql, args) result_all = cursor.fetchall() return result_all except Exception as e: logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql)) logging.error(e) finally: logging.info("查询的SQL语句:{},查询成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) def insert_one(self, sql, args=None): """ 执行INSERT SQL语句 :param sql: 待执行的INSERT SQL语句 :param args: INSERT SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: insert_result = cursor.execute(sql, args) conn.commit() print(type(insert_result)) print(insert_result) return insert_result # 如果执行插入成功,返回受影响的行数 except Exception as e: logging.error("插入的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("插入的SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中 def insert_many(self, sql, args=None): """ 执行批量插入语句 :param sql: 待执行的插入语句 :param args: 批量插入语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: insert_many_result = cursor.executemany(sql, args) conn.commit() print(type(insert_many_result)) print(insert_many_result) return insert_many_result # 如果执行插入成功,返回受影响的行数 except Exception as e: logging.error("批量插入的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("批量插入的SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中 def delete(self, sql, args=None): """ 执行DELETE删除语句 :param sql: 待执行DELETE删除语句 :param args: 待执行DELETE删除语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: delete_result = cursor.execute(sql, args) conn.commit() print(type(delete_result)) print(delete_result) return delete_result # 如果执行删除成功,返回受影响的行数 except Exception as e: logging.error("删除的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("删除的SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中 def update(self, sql, args=None): """ 执行UPDATE修改语句 :param sql: 待执行UPDATE修改语句 :param args: 待执行UPDATE修改语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: update_result = cursor.execute(sql, args) conn.commit() print(type(update_result)) print(update_result) return update_result # 如果执行修改成功,返回受影响的行数 except Exception as e: logging.error("修改的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("修改的SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中# 初始化一次,以后其他地方使用该工具类,只要在前面执行from xxx import db就可以了。db = PymysqlPooledSingletonDBUtil()# if __name__ == '__main__':# db = PymysqlPooledSingletonDBUtil()# select_sql = """select * from mytab1;"""# result = db.fetch_all(select_sql)# print(result)
from pymysql_pool_singleton import dbdef query_one(): """查询一条记录""" sql = """select * from mytab1;""" result = db.fetch_one(sql) print(result)def query_all(): """查询所有记录""" sql = """select * from mytab1;""" result = db.fetch_all(sql) print(result) for item in result: print(type(item)) print(item) print()def query_with_condition_demo1(): """查询所有记录,带有多个where条件,参数严格按照顺序传入""" sql = """select * from mytab1 where id > %s and col5 like %s ;""" result = db.fetch_all(sql, (2, "%记录%")) print(result) for item in result: print(type(item)) print(item) print()def query_with_condition_demo2(): """查询所有记录,带有多个where条件,参数通过key指定""" sql = """select * from mytab1 where id > %(id)s and col5 like %(col5)s ;""" result = db.fetch_all(sql=sql, args={"col5": "%记录%", "id": 2}) print(result) for item in result: print(type(item)) print(item) print()def insert_one_demo1(): """执行插入数据的操作""" sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);""" result = db.insert_one(sql, args=("A1", "A2", "A3", 4, "A5记录")) print(result)def insert_one_demo2(): """执行插入数据的操作""" sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);""" result = db.insert_one(sql, args={"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"}) print(result)def insert_batch_demo1(): """执行批量插入数据的操作""" sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);""" li = [ ("A1", "A2", "A3", 4, "A5记录"), ("A1", "A2", "A3", 4, "A5记录"), ("A1", "A2", "A3", 4, "A5记录"), ("A1", "A2", "A3", 4, "A5记录"), ] result = db.insert_many(sql, args=li) print(result)def insert_batch_demo2(): """执行批量插入数据的操作""" # 问题:如何批量插入的时候,指定字段名称站位呢? sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);""" tp = [ {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"}, {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"}, {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"}, ] result = db.insert_one(sql, args=tp) print(result)def delete(): """删除的操作""" sql = """delete from mytab1 where id > %s;""" # result = db.delete(sql, 30) # 如果采用这样的方式传入,因为30这是一个值,但是cursor.execute(sql, args)中需要的args类型需要为:list,tuple,dict三种的一种,所以采用下面的方式传入一个(30,)把它转换为一个tuple再传入。 result = db.delete(sql, (30,)) print("删除的影响的行数为:{}".format(result))def update(): """修改的操作""" sql = """update mytab1 set col1=%s, col2=%s where id > %s;""" result = db.delete(sql, ("aaaaaa", "bbbbbb", 10)) print("更新的影响的行数为:{}".format(result))if __name__ == "__main__": # query_one() # query_all() # query_with_condition_demo1() # query_with_condition_demo2() # insert_one_demo1() # insert_one_demo2() # insert_batch_demo1() # insert_batch_demo2() delete() # update()
import loggingimport pymysqlfrom dbutils.pooled_db import PooledDBconnection_pool = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1, # 所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 maxshared=3, blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] # ping MySQL服务端,检查是否服务可用。 # 如:0 = None = never, # 1 = default = whenever it is requested, # 2 = when a cursor is created, # 4 = when a query is executed, # 7 = always ping=0, host='127.0.0.1', # 数据库主机地址 port=33061, # 数据库主机地址提供的监听端口 user='root', # 数据库用户名 password='root', # 数据库用户的密码 database='mydb1', # 连接到哪一个数据库 charset='utf8' # 连接数据库的时候使用的字符集编码)class PymysqlPooledContextDBUtil: def __init__(self): """ 构造方法,初始化数据库连接池。 :return: """ # TODO 连接信息从数据库中获取 self.conn = conn = connection_pool.connection() # self.cursor = conn.cursor() # 此时查询结果返回的是元组 self.cursor = conn.cursor(pymysql.cursors.DictCursor) # 此时查询结果返回的是字典 def __enter__(self): """ 进入的方法 :return: """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ 离开的方法 :param exc_type: :param exc_val: :param exc_tb: :return: """ self.cursor.close() self.conn.close() def execute_sql(self, sql, args=None): """ 执行SQL语句 :param sql: 待执行的SQL语句 :param args: SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: execute_result = self.cursor.execute(sql, args) self.conn.commit() print(type(execute_result)) print(execute_result) return execute_result # 如果执行成功,返回执行成功的结果,否则返回False except Exception as e: logging.error("SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("SQL语句:{},执行成功!".format(sql)) def fetch_one(self, sql, args=None): """ 根据指定的SQL查询语句和SQL查询语句需要的参数,查询一行数据。 :param sql: 待执行的SQL查询语句 :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: """ try: self.cursor.execute(sql, args) result_one = self.cursor.fetchone() return result_one except Exception as e: logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql)) logging.error(e) finally: logging.info("查询的SQL语句:{},查询成功!".format(sql)) def fetch_all(self, sql, args=None): """ 根据指定的SQL查询语句和SQL查询语句需要的参数,查询所有的数据行。 :param sql: 待执行的SQL查询语句 :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: """ try: self.cursor.execute(sql, args) result_all = self.cursor.fetchall() return result_all except Exception as e: logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql)) logging.error(e) finally: logging.info("查询的SQL语句:{},查询成功!".format(sql)) def insert_one(self, sql, args=None): """ 执行INSERT SQL语句 :param sql: 待执行的INSERT SQL语句 :param args: INSERT SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: insert_result = self.cursor.execute(sql, args) self.conn.commit() print(type(insert_result)) print(insert_result) return insert_result # 如果执行插入成功,返回受影响的行数 except Exception as e: logging.error("插入的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("插入的SQL语句:{},执行成功!".format(sql)) def insert_many(self, sql, args=None): """ 执行批量插入语句 :param sql: 待执行的插入语句 :param args: 批量插入语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: insert_many_result = self.cursor.executemany(sql, args) self.conn.commit() print(type(insert_many_result)) print(insert_many_result) return insert_many_result # 如果执行插入成功,返回受影响的行数 except Exception as e: logging.error("批量插入的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("批量插入的SQL语句:{},执行成功!".format(sql)) def delete(self, sql, args=None): """ 执行DELETE删除语句 :param sql: 待执行DELETE删除语句 :param args: 待执行DELETE删除语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: delete_result = self.cursor.execute(sql, args) self.conn.commit() print(type(delete_result)) print(delete_result) return delete_result # 如果执行删除成功,返回受影响的行数 except Exception as e: logging.error("删除的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("删除的SQL语句:{},执行成功!".format(sql)) def update(self, sql, args=None): """ 执行UPDATE修改语句 :param sql: 待执行UPDATE修改语句 :param args: 待执行UPDATE修改语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: update_result = self.cursor.execute(sql, args) self.conn.commit() print(type(update_result)) print(update_result) return update_result # 如果执行修改成功,返回受影响的行数 except Exception as e: logging.error("修改的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("修改的SQL语句:{},执行成功!".format(sql))
from pymysql_pool_context import PymysqlPooledContextDBUtildef query_one():"""查询一条"""with PymysqlPooledContextDBUtil() as db:sql = """select * from mytab1;"""result = db.fetch_one(sql)print(result)def query_all():"""查询多条"""with PymysqlPooledContextDBUtil() as db:sql = """select * from mytab1;"""result = db.fetch_all(sql)print(result)def query_with_condition_demo1():"""查询所有记录,带有多个where条件,参数严格按照顺序传入"""with PymysqlPooledContextDBUtil() as db:sql = """select * from mytab1 where id > %s and col5 like %s ;"""result = db.fetch_all(sql, (2, "%记录%"))print(result)for item in result:print(type(item))print(item)print()def query_with_condition_demo2():"""查询所有记录,带有多个where条件,参数通过key指定"""with PymysqlPooledContextDBUtil() as db:sql = """select * from mytab1 where id > %(id)s and col5 like %(col5)s ;"""result = db.fetch_all(sql=sql, args={"col5": "%记录%", "id": 2})print(result)for item in result:print(type(item))print(item)print()def insert_one_demo1():"""执行插入数据的操作"""with PymysqlPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""result = db.insert_one(sql, args=("A1", "A2", "A3", 4, "A5记录"))print(result)def insert_one_demo2():"""执行插入数据的操作"""with PymysqlPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""result = db.insert_one(sql, args={"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"})print(result)def insert_batch_demo1():"""执行批量插入数据的操作"""with PymysqlPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""li = [("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),]result = db.insert_many(sql, args=li)print(result)def insert_batch_demo2():"""执行批量插入数据的操作"""with PymysqlPooledContextDBUtil() as db:# 问题:如何批量插入的时候,指定字段名称站位呢?sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""tp = [{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},]result = db.insert_one(sql, args=tp)print(result)def delete():"""删除的操作"""with PymysqlPooledContextDBUtil() as db:sql = """delete from mytab1 where id > %s;"""# result = db.delete(sql, 30) # 如果采用这样的方式传入,因为30这是一个值,但是cursor.execute(sql, args)中需要的args类型需要为:list,tuple,dict三种的一种,所以采用下面的方式传入一个(30,)把它转换为一个tuple再传入。result = db.delete(sql, (30,))print("删除的影响的行数为:{}".format(result))def update():"""修改的操作"""with PymysqlPooledContextDBUtil() as db:sql = """update mytab1 set col1=%s, col2=%s where id > %s;"""result = db.delete(sql, ("aaaaaa", "bbbbbb", 10))print("更新的影响的行数为:{}".format(result))if __name__ == "__main__":# query_one()# query_all()# query_with_condition_demo1()# query_with_condition_demo2()# insert_one_demo1()# insert_one_demo2()# insert_batch_demo1()# insert_batch_demo2()delete()# update()
和pymysql类似,只要把代码中的pymysql替换成MySQLdb就可以了。体现creator=pymysql和cursor = conn.cursor(pymysql.cursors.DictCursor)这两行代码上。
import loggingimport MySQLdbfrom dbutils.pooled_db import PooledDBclass MySQLdbPooledSingletonDBUtil: def __init__(self): """ 构造方法,初始化数据库连接池。 :return: """ # TODO 连接信息从数据库中获取 self.connection_pool = PooledDB( creator=MySQLdb, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1, # 所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 maxshared=3, blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] # ping MySQL服务端,检查是否服务可用。 # 如:0 = None = never, # 1 = default = whenever it is requested, # 2 = when a cursor is created, # 4 = when a query is executed, # 7 = always ping=0, host='127.0.0.1', # 数据库主机地址 port=33061, # 数据库主机地址提供的监听端口 user='root', # 数据库用户名 password='root', # 数据库用户的密码 database='mydb1', # 连接到哪一个数据库 charset='utf8' # 连接数据库的时候使用的字符集编码 ) def get_db_conn_cursor(self): """ 获取数据库连接,即从连接池中获取一个数据库连接 :return: 数据库连接对象connection和cursor。 """ try: conn = self.connection_pool.connection() # cursor = conn.cursor() # 此时查询结果返回的是元组 cursor = conn.cursor(MySQLdb.cursors.DictCursor) # 此时查询结果返回的是字典 return conn, cursor except Exception as e: logging.error("获取连接对象失败,其错误信息如下:") logging.error(e) finally: pass def close_db_conn_cursor(self, *args): """ 关闭数据库连接,即把数据库连接放回连接池中 :param args: 待关闭的连接资源 :return: """ try: for item in args: item.close() except Exception as e: logging.error("关闭数据库对象失败,其错误信息如下:") logging.error(e) finally: pass def execute_sql(self, sql, args=None): """ 执行SQL语句 :param sql: 待执行的SQL语句 :param args: SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: execute_result = cursor.execute(sql, args) conn.commit() print(type(execute_result)) print(execute_result) return execute_result # 如果执行成功,返回执行成功的结果,否则返回False except Exception as e: logging.error("SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中 def fetch_one(self, sql, args=None): """ 根据指定的SQL查询语句和SQL查询语句需要的参数,查询一行数据。 :param sql: 待执行的SQL查询语句。 :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: """ conn, cursor = self.get_db_conn_cursor() try: cursor.execute(sql, args) result_one = cursor.fetchone() return result_one except Exception as e: logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql)) logging.error(e) finally: logging.info("查询的SQL语句:{},查询成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) def fetch_all(self, sql, args=None): """ 根据指定的SQL查询语句和SQL查询语句需要的参数,查询所有的数据行。 :param sql: 待执行的SQL查询语句 :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: """ conn, cursor = self.get_db_conn_cursor() try: cursor.execute(sql, args) result_all = cursor.fetchall() return result_all except Exception as e: logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql)) logging.error(e) finally: logging.info("查询的SQL语句:{},查询成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) def insert_one(self, sql, args=None): """ 执行INSERT SQL语句 :param sql: 待执行的INSERT SQL语句 :param args: INSERT SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: insert_result = cursor.execute(sql, args) conn.commit() print(type(insert_result)) print(insert_result) return insert_result # 如果执行插入成功,返回受影响的行数 except Exception as e: logging.error("插入的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("插入的SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中 def insert_many(self, sql, args=None): """ 执行批量插入语句 :param sql: 待执行的插入语句 :param args: 批量插入语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: insert_many_result = cursor.executemany(sql, args) conn.commit() print(type(insert_many_result)) print(insert_many_result) return insert_many_result # 如果执行插入成功,返回受影响的行数 except Exception as e: logging.error("批量插入的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("批量插入的SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中 def delete(self, sql, args=None): """ 执行DELETE删除语句 :param sql: 待执行DELETE删除语句 :param args: 待执行DELETE删除语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: delete_result = cursor.execute(sql, args) conn.commit() print(type(delete_result)) print(delete_result) return delete_result # 如果执行删除成功,返回受影响的行数 except Exception as e: logging.error("删除的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("删除的SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中 def update(self, sql, args=None): """ 执行UPDATE修改语句 :param sql: 待执行UPDATE修改语句 :param args: 待执行UPDATE修改语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ conn, cursor = self.get_db_conn_cursor() try: update_result = cursor.execute(sql, args) conn.commit() print(type(update_result)) print(update_result) return update_result # 如果执行修改成功,返回受影响的行数 except Exception as e: logging.error("修改的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) conn.rollback() return False finally: logging.info("修改的SQL语句:{},执行成功!".format(sql)) self.close_db_conn_cursor(cursor, conn) # 关闭连接,其实是把练级放回连接池中# 初始化一次,以后其他地方使用该工具类,只要在前面执行from xxx import db就可以了。db = MySQLdbPooledSingletonDBUtil()# if __name__ == '__main__':# db = MySQLdbPooledSingletonDBUtil()# select_sql = """select * from mytab1;"""# result = db.fetch_all(select_sql)# print(result)
from mysqldb_pool_singleton import dbdef query_one(): """查询一条记录""" sql = """select * from mytab1;""" result = db.fetch_one(sql) print(result)def query_all(): """查询所有记录""" sql = """select * from mytab1;""" result = db.fetch_all(sql) print(result) for item in result: print(type(item)) print(item) print()def query_with_condition_demo1(): """查询所有记录,带有多个where条件,参数严格按照顺序传入""" sql = """select * from mytab1 where id > %s and col5 like %s ;""" result = db.fetch_all(sql, (2, "%记录%")) print(result) for item in result: print(type(item)) print(item) print()def query_with_condition_demo2(): """查询所有记录,带有多个where条件,参数通过key指定""" sql = """select * from mytab1 where id > %(id)s and col5 like %(col5)s ;""" result = db.fetch_all(sql=sql, args={"col5": "%记录%", "id": 2}) print(result) for item in result: print(type(item)) print(item) print()def insert_one_demo1(): """执行插入数据的操作""" sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);""" result = db.insert_one(sql, args=("A1", "A2", "A3", 4, "A5记录")) print(result)def insert_one_demo2(): """执行插入数据的操作""" sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);""" result = db.insert_one(sql, args={"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"}) print(result)def insert_batch_demo1(): """执行批量插入数据的操作""" sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);""" li = [ ("A1", "A2", "A3", 4, "A5记录"), ("A1", "A2", "A3", 4, "A5记录"), ("A1", "A2", "A3", 4, "A5记录"), ("A1", "A2", "A3", 4, "A5记录"), ] result = db.insert_many(sql, args=li) print(result)def insert_batch_demo2(): """执行批量插入数据的操作""" # 问题:如何批量插入的时候,指定字段名称站位呢? sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);""" tp = [ {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"}, {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"}, {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"}, ] result = db.insert_one(sql, args=tp) print(result)def delete(): """删除的操作""" sql = """delete from mytab1 where id > %s;""" # result = db.delete(sql, 30) # 如果采用这样的方式传入,因为30这是一个值,但是cursor.execute(sql, args)中需要的args类型需要为:list,tuple,dict三种的一种,所以采用下面的方式传入一个(30,)把它转换为一个tuple再传入。 result = db.delete(sql, (30,)) print("删除的影响的行数为:{}".format(result))def update(): """修改的操作""" sql = """update mytab1 set col1=%s, col2=%s where id > %s;""" result = db.delete(sql, ("aaaaaa", "bbbbbb", 10)) print("更新的影响的行数为:{}".format(result))if __name__ == "__main__": # query_one() query_all() # query_with_condition_demo1() # query_with_condition_demo2() # insert_one_demo1() # insert_one_demo2() # insert_batch_demo1() # insert_batch_demo2() delete() # update()
import loggingimport MySQLdbfrom dbutils.pooled_db import PooledDBconnection_pool = PooledDB( creator=MySQLdb, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1, # 所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 maxshared=3, blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] # ping MySQL服务端,检查是否服务可用。 # 如:0 = None = never, # 1 = default = whenever it is requested, # 2 = when a cursor is created, # 4 = when a query is executed, # 7 = always ping=0, host='127.0.0.1', # 数据库主机地址 port=33061, # 数据库主机地址提供的监听端口 user='root', # 数据库用户名 password='root', # 数据库用户的密码 database='mydb1', # 连接到哪一个数据库 charset='utf8' # 连接数据库的时候使用的字符集编码)class MySQLdbPooledContextDBUtil: def __init__(self): """ 构造方法,初始化数据库连接池。 :return: """ # TODO 连接信息从数据库中获取 self.conn = conn = connection_pool.connection() # self.cursor = conn.cursor() # 此时查询结果返回的是元组 self.cursor = conn.cursor(MySQLdb.cursors.DictCursor) # 此时查询结果返回的是字典 def __enter__(self): """ 进入的方法 :return: """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ 离开的方法 :param exc_type: :param exc_val: :param exc_tb: :return: """ self.cursor.close() self.conn.close() def execute_sql(self, sql, args=None): """ 执行SQL语句 :param sql: 待执行的SQL语句 :param args: SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: execute_result = self.cursor.execute(sql, args) self.conn.commit() print(type(execute_result)) print(execute_result) return execute_result # 如果执行成功,返回执行成功的结果,否则返回False except Exception as e: logging.error("SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("SQL语句:{},执行成功!".format(sql)) def fetch_one(self, sql, args=None): """ 根据指定的SQL查询语句和SQL查询语句需要的参数,查询一行数据。 :param sql: 待执行的SQL查询语句 :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: """ try: self.cursor.execute(sql, args) result_one = self.cursor.fetchone() return result_one except Exception as e: logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql)) logging.error(e) finally: logging.info("查询的SQL语句:{},查询成功!".format(sql)) def fetch_all(self, sql, args=None): """ 根据指定的SQL查询语句和SQL查询语句需要的参数,查询所有的数据行。 :param sql: 待执行的SQL查询语句 :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: """ try: self.cursor.execute(sql, args) result_all = self.cursor.fetchall() return result_all except Exception as e: logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql)) logging.error(e) finally: logging.info("查询的SQL语句:{},查询成功!".format(sql)) def insert_one(self, sql, args=None): """ 执行INSERT SQL语句 :param sql: 待执行的INSERT SQL语句 :param args: INSERT SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: insert_result = self.cursor.execute(sql, args) self.conn.commit() print(type(insert_result)) print(insert_result) return insert_result # 如果执行插入成功,返回受影响的行数 except Exception as e: logging.error("插入的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("插入的SQL语句:{},执行成功!".format(sql)) def insert_many(self, sql, args=None): """ 执行批量插入语句 :param sql: 待执行的插入语句 :param args: 批量插入语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: insert_many_result = self.cursor.executemany(sql, args) self.conn.commit() print(type(insert_many_result)) print(insert_many_result) return insert_many_result # 如果执行插入成功,返回受影响的行数 except Exception as e: logging.error("批量插入的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("批量插入的SQL语句:{},执行成功!".format(sql)) def delete(self, sql, args=None): """ 执行DELETE删除语句 :param sql: 待执行DELETE删除语句 :param args: 待执行DELETE删除语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: delete_result = self.cursor.execute(sql, args) self.conn.commit() print(type(delete_result)) print(delete_result) return delete_result # 如果执行删除成功,返回受影响的行数 except Exception as e: logging.error("删除的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("删除的SQL语句:{},执行成功!".format(sql)) def update(self, sql, args=None): """ 执行UPDATE修改语句 :param sql: 待执行UPDATE修改语句 :param args: 待执行UPDATE修改语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。 :return: 执行结果 """ try: update_result = self.cursor.execute(sql, args) self.conn.commit() print(type(update_result)) print(update_result) return update_result # 如果执行修改成功,返回受影响的行数 except Exception as e: logging.error("修改的SQL语句:{},执行失败,其错误信息如下:".format(sql)) logging.error(e) self.conn.rollback() return False finally: logging.info("修改的SQL语句:{},执行成功!".format(sql))
from mysqldb_pool_context import MySQLdbPooledContextDBUtildef query_one():"""查询一条"""with MySQLdbPooledContextDBUtil() as db:sql = """select * from mytab1;"""result = db.fetch_one(sql)print(result)def query_all():"""查询多条"""with MySQLdbPooledContextDBUtil() as db:sql = """select * from mytab1;"""result = db.fetch_all(sql)print(result)def query_with_condition_demo1():"""查询所有记录,带有多个where条件,参数严格按照顺序传入"""with MySQLdbPooledContextDBUtil() as db:sql = """select * from mytab1 where id > %s and col5 like %s ;"""result = db.fetch_all(sql, (2, "%记录%"))print(result)for item in result:print(type(item))print(item)print()def query_with_condition_demo2():"""查询所有记录,带有多个where条件,参数通过key指定"""with MySQLdbPooledContextDBUtil() as db:sql = """select * from mytab1 where id > %(id)s and col5 like %(col5)s ;"""result = db.fetch_all(sql=sql, args={"col5": "%记录%", "id": 2})print(result)for item in result:print(type(item))print(item)print()def insert_one_demo1():"""执行插入数据的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""result = db.insert_one(sql, args=("A1", "A2", "A3", 4, "A5记录"))print(result)def insert_one_demo2():"""执行插入数据的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""result = db.insert_one(sql, args={"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"})print(result)def insert_batch_demo1():"""执行批量插入数据的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""li = [("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),]result = db.insert_many(sql, args=li)print(result)def insert_batch_demo2():"""执行批量插入数据的操作"""with MySQLdbPooledContextDBUtil() as db:# 问题:如何批量插入的时候,指定字段名称站位呢?sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""tp = [{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},]result = db.insert_one(sql, args=tp)print(result)def delete():"""删除的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """delete from mytab1 where id > %s;"""# result = db.delete(sql, 30) # 如果采用这样的方式传入,因为30这是一个值,但是cursor.execute(sql, args)中需要的args类型需要为:list,tuple,dict三种的一种,所以采用下面的方式传入一个(30,)把它转换为一个tuple再传入。result = db.delete(sql, (30,))print("删除的影响的行数为:{}".format(result))def update():"""修改的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """update mytab1 set col1=%s, col2=%s where id > %s;"""result = db.delete(sql, ("aaaaaa", "bbbbbb", 10))print("更新的影响的行数为:{}".format(result))if __name__ == "__main__":# query_one()# query_all()# query_with_condition_demo1()# query_with_condition_demo2()# insert_one_demo1()# insert_one_demo2()# insert_batch_demo1()# insert_batch_demo2()delete()# update()
关注我《程序猿集锦》获取更多分享。
留言与评论(共有 0 条评论) “” |