Python操作MySQL数据库

关注我《程序猿集锦》获取更多分享。

目录结构


连接MySQL的工具

使用python来操作MySQL数据库,有很多中方式。有如下几种:

  • MySQLdb
  • mysqlclient
  • mysql connector for python
  • pymysql
  • SQLAlchemy

后面实战中会使用到的SQL语句如下:

create database mydb1 default charset utf8mb4;use mydb1;-- mydb1.mytab1 definitionCREATE TABLE `mytab1` (  `id` smallint(6) NOT NULL AUTO_INCREMENT,  `col1` varchar(255) DEFAULT NULL,  `col2` varchar(255) DEFAULT NULL,  `col3` varchar(255) DEFAULT NULL,  `col4` int(11) DEFAULT NULL,  `col5` varchar(16) DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;use mydb1;begin;INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(-2, 'g1', 'g2', 'g3', -1, '第-2条记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(-1, 'g1', 'g2', 'g3', -1, '第-1条记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(1, 'a1', 'a2', 'a3', 1, 'a1,"a2,"a3');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(2, 'b1', 'b2', 'b3', 2, 'b1,"b2,"b3');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(3, 'c1', 'c2', 'c3', 6, 'c1,"c2,"c3');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(4, 'd1', 'd2', 'd3', 8, '第四条记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(5, 'e1', 'e2', 'e3', 10, '新增加的记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(6, 'f1', 'f2', 'f3', 666, '第6条记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(9, 'A1', 'A2', 'A3', 4, 'A5记录');INSERT INTO mydb1.mytab1 (id, col1, col2, col3, col4, col5) VALUES(10, 'A1', 'A2', 'A3', 4, 'A5记录');commit;

MySQLdb1

MySQLdb:c语言编写,只支持Python2.x,最多支持到Python3.4版本,又叫MySQL-Python

GitHub仓库的地址为:https://github.com/farcepest/MySQLdb1

仓库名称原先为MySQLdb,后来为了和MySQLdb支持Python3.x的版本区分开,就改为了MySQLdb1,而支持Python3MySQLdb版本也不在叫MySQLdb了,而是改为了mysqlclient

mysqlclient

mysqlclient:基于MySQLdb改进,支持了Python3.x,同时修复了一些问题。也就是Python3版本的MySQLdb

Github的地址为:https://github.com/PyMySQL/mysqlclient

官方文档地址为:https://mysqlclient.readthedocs.io/index.html

安装的时候使用:pip3 install mysqlclient命令或者pip install mysqlclient来安装,GitHub仓库中的README文件中有详细的安装方式,这里不再赘述。

使用方式和原先的MySQLdb一样,没有什么区别。示例如下:

import MySQLdb# 如果提示没有安装MySQLdb,执行pip3 install mysqlclient,如果pip3命令保证,则执行brew install mysql,然后在执行pip3 install mysqlclient,这是在Mac系统上面的Python3的环境下面使用MySQLdb的时候这样操作。# 创建连接conn = MySQLdb.connect(host="127.0.0.1", user="root", password="root", port=33061, database="mydb1", charset="utf8")# 获取游标# cursor = conn.cursor()  # 返回结果是元组形式,不带列名称,只有字段的值组成的元组cursor = conn.cursor(MySQLdb.cursors.DictCursor)  # 返回结果是字典形式,列表为key,列值为value。# 执行SQL语句row = cursor.execute("select * from mytab1 limit 2;")print(row)  # 影响的行数# 获取执行结果result = cursor.fetchall()# 关闭连接cursor.close()conn.close()# 遍历返回结果for item in result:    print(type(item))    print(item)

mysql connector for python

mysql connector for python:是MySQL官方提供的纯Python驱动.

安装方式使用pip3 install mysql-connector-python

import mysql.connector# 安装方式使用 pip3 install mysql-connector-pythonconn=mysql.connector.connect(host = '127.0.0.1' # 连接名称,默认127.0.0.1  ,user = 'root' # 用户名  ,passwd='password' # 密码  ,port= 3306 # 端口,默认为3306  ,db='test' # 数据库名称  ,charset='utf8' # 字符编码)cur = conn.cursor() # 生成游标对象sql="select * from `student` " # SQL语句cur.execute(sql) # 执行SQL语句data = cur.fetchall() # 通过fetchall方法获得数据cur.close() # 关闭游标conn.close() # 关闭连接for i in data[:2]: # 打印输出前2条数据print (i)

pymysql

pymysql:采用Python编写,性能低于MySQLdbmysqlclient

安装方式使用pip3 install pymysql,它的简单的示例如下:

import pymysql # 打开数据库连接conn = pymysql.connect(host='localhost', user='testuser', password='test123', database='TESTDB') # 使用 cursor() 方法创建一个游标对象 cursorcursor = conn.cursor() # 使用 execute()  方法执行 SQL 查询 cursor.execute("SELECT VERSION()") # 使用 fetchone() 方法获取单条数据.data = cursor.fetchone() print ("Database version : %s " % data) # 关闭数据库连接cursor.close()conn.close()

上面的方式适用于简单的数据库操作,如果想使用pymysql来作为操作数据库的工具,那么把它结合连接池一起来使用,将会更加的高效。下面介绍pymysql结合连接池PooledDB一起来操作MySQL数据库。

前提:

pip3 install pymysqlpip3 install PooledDB

Python2的版本中,经常使用的是MySQLdb,但是这个MySQLdb只支持到Python3.4版本。所以还是直接用pymysql这个工具包或者mysqlclient也就是MySQLdb支持Python3的版本。

MySQLdb要快点,原因是这个是C写的,速度快

MySQLdb只支持Python2.x,还不支持3.x。可以用PyMySQL代替。安装方法:pip install PyMySQL
然后在需要的项目中,把 init.py中添加两行:意思是把pymysql当成MySQLdb使用。

import pymysqlpymysql.install_as_MySQLdb()

SQLAlchemy

以后,单独介绍这个ORM框架。

pymysql结合PooledDB连接池

使用单例方式

工具类实现方式

import loggingimport pymysqlfrom dbutils.pooled_db import PooledDBclass PymysqlPooledSingletonDBUtil:    def __init__(self):        """        构造方法,初始化数据库连接池。        :return:        """        # TODO 连接信息从数据库中获取        self.connection_pool = PooledDB(            creator=pymysql,  # 使用链接数据库的模块            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建            maxcached=5,  # 链接池中最多闲置的链接,0和None不限制            # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,            # 所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。            maxshared=3,            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]            # ping MySQL服务端,检查是否服务可用。            # 如:0 = None = never,            # 1 = default = whenever it is requested,            # 2 = when a cursor is created,            # 4 = when a query is executed,            # 7 = always            ping=0,            host='127.0.0.1',  # 数据库主机地址            port=33061,  # 数据库主机地址提供的监听端口            user='root',  # 数据库用户名            password='root',  # 数据库用户的密码            database='mydb1',  # 连接到哪一个数据库            charset='utf8'  # 连接数据库的时候使用的字符集编码        )    def get_db_conn_cursor(self):        """        获取数据库连接,即从连接池中获取一个数据库连接        :return: 数据库连接对象connection和cursor。        """        try:            conn = self.connection_pool.connection()            # cursor = conn.cursor()  # 此时查询结果返回的是元组            cursor = conn.cursor(pymysql.cursors.DictCursor)  # 此时查询结果返回的是字典            return conn, cursor        except Exception as e:            logging.error("获取连接对象失败,其错误信息如下:")            logging.error(e)        finally:            pass    def close_db_conn_cursor(self, *args):        """        关闭数据库连接,即把数据库连接放回连接池中        :param args: 待关闭的连接资源        :return:        """        try:            for item in args:                item.close()        except Exception as e:            logging.error("关闭数据库对象失败,其错误信息如下:")            logging.error(e)        finally:            pass    def execute_sql(self, sql, args=None):        """        执行SQL语句        :param sql: 待执行的SQL语句        :param args: SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            execute_result = cursor.execute(sql, args)            conn.commit()            print(type(execute_result))            print(execute_result)            return execute_result  # 如果执行成功,返回执行成功的结果,否则返回False        except Exception as e:            logging.error("SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中    def fetch_one(self, sql, args=None):        """        根据指定的SQL查询语句和SQL查询语句需要的参数,查询一行数据。        :param sql: 待执行的SQL查询语句。        :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return:        """        conn, cursor = self.get_db_conn_cursor()        try:            cursor.execute(sql, args)            result_one = cursor.fetchone()            return result_one        except Exception as e:            logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql))            logging.error(e)        finally:            logging.info("查询的SQL语句:{},查询成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)    def fetch_all(self, sql, args=None):        """        根据指定的SQL查询语句和SQL查询语句需要的参数,查询所有的数据行。        :param sql: 待执行的SQL查询语句        :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return:        """        conn, cursor = self.get_db_conn_cursor()        try:            cursor.execute(sql, args)            result_all = cursor.fetchall()            return result_all        except Exception as e:            logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql))            logging.error(e)        finally:            logging.info("查询的SQL语句:{},查询成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)    def insert_one(self, sql, args=None):        """        执行INSERT SQL语句        :param sql: 待执行的INSERT SQL语句        :param args: INSERT SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            insert_result = cursor.execute(sql, args)            conn.commit()            print(type(insert_result))            print(insert_result)            return insert_result  # 如果执行插入成功,返回受影响的行数        except Exception as e:            logging.error("插入的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("插入的SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中    def insert_many(self, sql, args=None):        """        执行批量插入语句        :param sql: 待执行的插入语句        :param args: 批量插入语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            insert_many_result = cursor.executemany(sql, args)            conn.commit()            print(type(insert_many_result))            print(insert_many_result)            return insert_many_result  # 如果执行插入成功,返回受影响的行数        except Exception as e:            logging.error("批量插入的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("批量插入的SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中    def delete(self, sql, args=None):        """        执行DELETE删除语句        :param sql: 待执行DELETE删除语句        :param args: 待执行DELETE删除语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            delete_result = cursor.execute(sql, args)            conn.commit()            print(type(delete_result))            print(delete_result)            return delete_result  # 如果执行删除成功,返回受影响的行数        except Exception as e:            logging.error("删除的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("删除的SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中    def update(self, sql, args=None):        """        执行UPDATE修改语句        :param sql: 待执行UPDATE修改语句        :param args: 待执行UPDATE修改语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            update_result = cursor.execute(sql, args)            conn.commit()            print(type(update_result))            print(update_result)            return update_result  # 如果执行修改成功,返回受影响的行数        except Exception as e:            logging.error("修改的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("修改的SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中# 初始化一次,以后其他地方使用该工具类,只要在前面执行from xxx import db就可以了。db = PymysqlPooledSingletonDBUtil()# if __name__ == '__main__':#     db = PymysqlPooledSingletonDBUtil()#     select_sql = """select * from mytab1;"""#     result = db.fetch_all(select_sql)#     print(result)

工具类调用方式

from pymysql_pool_singleton import dbdef query_one():    """查询一条记录"""    sql = """select * from mytab1;"""    result = db.fetch_one(sql)    print(result)def query_all():    """查询所有记录"""    sql = """select * from mytab1;"""    result = db.fetch_all(sql)    print(result)    for item in result:        print(type(item))        print(item)        print()def query_with_condition_demo1():    """查询所有记录,带有多个where条件,参数严格按照顺序传入"""    sql = """select * from mytab1 where id > %s and col5 like %s ;"""    result = db.fetch_all(sql, (2, "%记录%"))    print(result)    for item in result:        print(type(item))        print(item)        print()def query_with_condition_demo2():    """查询所有记录,带有多个where条件,参数通过key指定"""    sql = """select * from mytab1 where id > %(id)s and col5 like %(col5)s ;"""    result = db.fetch_all(sql=sql, args={"col5": "%记录%", "id": 2})    print(result)    for item in result:        print(type(item))        print(item)        print()def insert_one_demo1():    """执行插入数据的操作"""    sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""    result = db.insert_one(sql, args=("A1", "A2", "A3", 4, "A5记录"))    print(result)def insert_one_demo2():    """执行插入数据的操作"""    sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""    result = db.insert_one(sql, args={"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"})    print(result)def insert_batch_demo1():    """执行批量插入数据的操作"""    sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""    li = [        ("A1", "A2", "A3", 4, "A5记录"),        ("A1", "A2", "A3", 4, "A5记录"),        ("A1", "A2", "A3", 4, "A5记录"),        ("A1", "A2", "A3", 4, "A5记录"),    ]    result = db.insert_many(sql, args=li)    print(result)def insert_batch_demo2():    """执行批量插入数据的操作"""    # 问题:如何批量插入的时候,指定字段名称站位呢?    sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""    tp = [        {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},        {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},        {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},    ]    result = db.insert_one(sql, args=tp)    print(result)def delete():    """删除的操作"""    sql = """delete from mytab1 where id > %s;"""    # result = db.delete(sql, 30)  # 如果采用这样的方式传入,因为30这是一个值,但是cursor.execute(sql, args)中需要的args类型需要为:list,tuple,dict三种的一种,所以采用下面的方式传入一个(30,)把它转换为一个tuple再传入。    result = db.delete(sql, (30,))    print("删除的影响的行数为:{}".format(result))def update():    """修改的操作"""    sql = """update mytab1 set col1=%s, col2=%s where id > %s;"""    result = db.delete(sql, ("aaaaaa", "bbbbbb", 10))    print("更新的影响的行数为:{}".format(result))if __name__ == "__main__":    # query_one()    # query_all()    # query_with_condition_demo1()    # query_with_condition_demo2()    # insert_one_demo1()    # insert_one_demo2()    # insert_batch_demo1()    # insert_batch_demo2()    delete()    # update()

使用上下文方式

工具类实现方式

import loggingimport pymysqlfrom dbutils.pooled_db import PooledDBconnection_pool = PooledDB(    creator=pymysql,  # 使用链接数据库的模块    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制    # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,    # 所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。    maxshared=3,    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]    # ping MySQL服务端,检查是否服务可用。    # 如:0 = None = never,    # 1 = default = whenever it is requested,    # 2 = when a cursor is created,    # 4 = when a query is executed,    # 7 = always    ping=0,    host='127.0.0.1',  # 数据库主机地址    port=33061,  # 数据库主机地址提供的监听端口    user='root',  # 数据库用户名    password='root',  # 数据库用户的密码    database='mydb1',  # 连接到哪一个数据库    charset='utf8'  # 连接数据库的时候使用的字符集编码)class PymysqlPooledContextDBUtil:    def __init__(self):        """        构造方法,初始化数据库连接池。        :return:        """        # TODO 连接信息从数据库中获取        self.conn = conn = connection_pool.connection()        # self.cursor = conn.cursor()  # 此时查询结果返回的是元组        self.cursor = conn.cursor(pymysql.cursors.DictCursor)  # 此时查询结果返回的是字典    def __enter__(self):        """        进入的方法        :return:        """        return self    def __exit__(self, exc_type, exc_val, exc_tb):        """        离开的方法        :param exc_type:        :param exc_val:        :param exc_tb:        :return:        """        self.cursor.close()        self.conn.close()    def execute_sql(self, sql, args=None):        """        执行SQL语句        :param sql: 待执行的SQL语句        :param args: SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            execute_result = self.cursor.execute(sql, args)            self.conn.commit()            print(type(execute_result))            print(execute_result)            return execute_result  # 如果执行成功,返回执行成功的结果,否则返回False        except Exception as e:            logging.error("SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("SQL语句:{},执行成功!".format(sql))    def fetch_one(self, sql, args=None):        """        根据指定的SQL查询语句和SQL查询语句需要的参数,查询一行数据。        :param sql: 待执行的SQL查询语句        :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return:        """        try:            self.cursor.execute(sql, args)            result_one = self.cursor.fetchone()            return result_one        except Exception as e:            logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql))            logging.error(e)        finally:            logging.info("查询的SQL语句:{},查询成功!".format(sql))    def fetch_all(self, sql, args=None):        """        根据指定的SQL查询语句和SQL查询语句需要的参数,查询所有的数据行。        :param sql: 待执行的SQL查询语句        :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return:        """        try:            self.cursor.execute(sql, args)            result_all = self.cursor.fetchall()            return result_all        except Exception as e:            logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql))            logging.error(e)        finally:            logging.info("查询的SQL语句:{},查询成功!".format(sql))    def insert_one(self, sql, args=None):        """        执行INSERT SQL语句        :param sql: 待执行的INSERT SQL语句        :param args: INSERT SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            insert_result = self.cursor.execute(sql, args)            self.conn.commit()            print(type(insert_result))            print(insert_result)            return insert_result  # 如果执行插入成功,返回受影响的行数        except Exception as e:            logging.error("插入的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("插入的SQL语句:{},执行成功!".format(sql))    def insert_many(self, sql, args=None):        """        执行批量插入语句        :param sql: 待执行的插入语句        :param args: 批量插入语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            insert_many_result = self.cursor.executemany(sql, args)            self.conn.commit()            print(type(insert_many_result))            print(insert_many_result)            return insert_many_result  # 如果执行插入成功,返回受影响的行数        except Exception as e:            logging.error("批量插入的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("批量插入的SQL语句:{},执行成功!".format(sql))    def delete(self, sql, args=None):        """        执行DELETE删除语句        :param sql: 待执行DELETE删除语句        :param args: 待执行DELETE删除语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            delete_result = self.cursor.execute(sql, args)            self.conn.commit()            print(type(delete_result))            print(delete_result)            return delete_result  # 如果执行删除成功,返回受影响的行数        except Exception as e:            logging.error("删除的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("删除的SQL语句:{},执行成功!".format(sql))    def update(self, sql, args=None):        """        执行UPDATE修改语句        :param sql: 待执行UPDATE修改语句        :param args: 待执行UPDATE修改语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            update_result = self.cursor.execute(sql, args)            self.conn.commit()            print(type(update_result))            print(update_result)            return update_result  # 如果执行修改成功,返回受影响的行数        except Exception as e:            logging.error("修改的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("修改的SQL语句:{},执行成功!".format(sql))

工具类调用方式

from pymysql_pool_context import PymysqlPooledContextDBUtildef query_one():"""查询一条"""with PymysqlPooledContextDBUtil() as db:sql = """select * from mytab1;"""result = db.fetch_one(sql)print(result)def query_all():"""查询多条"""with PymysqlPooledContextDBUtil() as db:sql = """select * from mytab1;"""result = db.fetch_all(sql)print(result)def query_with_condition_demo1():"""查询所有记录,带有多个where条件,参数严格按照顺序传入"""with PymysqlPooledContextDBUtil() as db:sql = """select * from mytab1 where id > %s and col5 like %s ;"""result = db.fetch_all(sql, (2, "%记录%"))print(result)for item in result:print(type(item))print(item)print()def query_with_condition_demo2():"""查询所有记录,带有多个where条件,参数通过key指定"""with PymysqlPooledContextDBUtil() as db:sql = """select * from mytab1 where id > %(id)s and col5 like %(col5)s ;"""result = db.fetch_all(sql=sql, args={"col5": "%记录%", "id": 2})print(result)for item in result:print(type(item))print(item)print()def insert_one_demo1():"""执行插入数据的操作"""with PymysqlPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""result = db.insert_one(sql, args=("A1", "A2", "A3", 4, "A5记录"))print(result)def insert_one_demo2():"""执行插入数据的操作"""with PymysqlPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""result = db.insert_one(sql, args={"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"})print(result)def insert_batch_demo1():"""执行批量插入数据的操作"""with PymysqlPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""li = [("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),]result = db.insert_many(sql, args=li)print(result)def insert_batch_demo2():"""执行批量插入数据的操作"""with PymysqlPooledContextDBUtil() as db:# 问题:如何批量插入的时候,指定字段名称站位呢?sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""tp = [{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},]result = db.insert_one(sql, args=tp)print(result)def delete():"""删除的操作"""with PymysqlPooledContextDBUtil() as db:sql = """delete from mytab1 where id > %s;"""# result = db.delete(sql, 30)  # 如果采用这样的方式传入,因为30这是一个值,但是cursor.execute(sql, args)中需要的args类型需要为:list,tuple,dict三种的一种,所以采用下面的方式传入一个(30,)把它转换为一个tuple再传入。result = db.delete(sql, (30,))print("删除的影响的行数为:{}".format(result))def update():"""修改的操作"""with PymysqlPooledContextDBUtil() as db:sql = """update mytab1 set col1=%s, col2=%s where id > %s;"""result = db.delete(sql, ("aaaaaa", "bbbbbb", 10))print("更新的影响的行数为:{}".format(result))if __name__ == "__main__":# query_one()# query_all()# query_with_condition_demo1()# query_with_condition_demo2()# insert_one_demo1()# insert_one_demo2()# insert_batch_demo1()# insert_batch_demo2()delete()# update()

mysqlclient结合PooledDB连接池

pymysql类似,只要把代码中的pymysql替换成MySQLdb就可以了。体现creator=pymysqlcursor = conn.cursor(pymysql.cursors.DictCursor)这两行代码上。

使用单例方式

工具类实现方式

import loggingimport MySQLdbfrom dbutils.pooled_db import PooledDBclass MySQLdbPooledSingletonDBUtil:    def __init__(self):        """        构造方法,初始化数据库连接池。        :return:        """        # TODO 连接信息从数据库中获取        self.connection_pool = PooledDB(            creator=MySQLdb,  # 使用链接数据库的模块            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建            maxcached=5,  # 链接池中最多闲置的链接,0和None不限制            # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,            # 所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。            maxshared=3,            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]            # ping MySQL服务端,检查是否服务可用。            # 如:0 = None = never,            # 1 = default = whenever it is requested,            # 2 = when a cursor is created,            # 4 = when a query is executed,            # 7 = always            ping=0,            host='127.0.0.1',  # 数据库主机地址            port=33061,  # 数据库主机地址提供的监听端口            user='root',  # 数据库用户名            password='root',  # 数据库用户的密码            database='mydb1',  # 连接到哪一个数据库            charset='utf8'  # 连接数据库的时候使用的字符集编码        )    def get_db_conn_cursor(self):        """        获取数据库连接,即从连接池中获取一个数据库连接        :return: 数据库连接对象connection和cursor。        """        try:            conn = self.connection_pool.connection()            # cursor = conn.cursor()  # 此时查询结果返回的是元组            cursor = conn.cursor(MySQLdb.cursors.DictCursor)  # 此时查询结果返回的是字典            return conn, cursor        except Exception as e:            logging.error("获取连接对象失败,其错误信息如下:")            logging.error(e)        finally:            pass    def close_db_conn_cursor(self, *args):        """        关闭数据库连接,即把数据库连接放回连接池中        :param args: 待关闭的连接资源        :return:        """        try:            for item in args:                item.close()        except Exception as e:            logging.error("关闭数据库对象失败,其错误信息如下:")            logging.error(e)        finally:            pass    def execute_sql(self, sql, args=None):        """        执行SQL语句        :param sql: 待执行的SQL语句        :param args: SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            execute_result = cursor.execute(sql, args)            conn.commit()            print(type(execute_result))            print(execute_result)            return execute_result  # 如果执行成功,返回执行成功的结果,否则返回False        except Exception as e:            logging.error("SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中    def fetch_one(self, sql, args=None):        """        根据指定的SQL查询语句和SQL查询语句需要的参数,查询一行数据。        :param sql: 待执行的SQL查询语句。        :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return:        """        conn, cursor = self.get_db_conn_cursor()        try:            cursor.execute(sql, args)            result_one = cursor.fetchone()            return result_one        except Exception as e:            logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql))            logging.error(e)        finally:            logging.info("查询的SQL语句:{},查询成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)    def fetch_all(self, sql, args=None):        """        根据指定的SQL查询语句和SQL查询语句需要的参数,查询所有的数据行。        :param sql: 待执行的SQL查询语句        :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return:        """        conn, cursor = self.get_db_conn_cursor()        try:            cursor.execute(sql, args)            result_all = cursor.fetchall()            return result_all        except Exception as e:            logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql))            logging.error(e)        finally:            logging.info("查询的SQL语句:{},查询成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)    def insert_one(self, sql, args=None):        """        执行INSERT SQL语句        :param sql: 待执行的INSERT SQL语句        :param args: INSERT SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            insert_result = cursor.execute(sql, args)            conn.commit()            print(type(insert_result))            print(insert_result)            return insert_result  # 如果执行插入成功,返回受影响的行数        except Exception as e:            logging.error("插入的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("插入的SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中    def insert_many(self, sql, args=None):        """        执行批量插入语句        :param sql: 待执行的插入语句        :param args: 批量插入语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            insert_many_result = cursor.executemany(sql, args)            conn.commit()            print(type(insert_many_result))            print(insert_many_result)            return insert_many_result  # 如果执行插入成功,返回受影响的行数        except Exception as e:            logging.error("批量插入的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("批量插入的SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中    def delete(self, sql, args=None):        """        执行DELETE删除语句        :param sql: 待执行DELETE删除语句        :param args: 待执行DELETE删除语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            delete_result = cursor.execute(sql, args)            conn.commit()            print(type(delete_result))            print(delete_result)            return delete_result  # 如果执行删除成功,返回受影响的行数        except Exception as e:            logging.error("删除的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("删除的SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中    def update(self, sql, args=None):        """        执行UPDATE修改语句        :param sql: 待执行UPDATE修改语句        :param args: 待执行UPDATE修改语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        conn, cursor = self.get_db_conn_cursor()        try:            update_result = cursor.execute(sql, args)            conn.commit()            print(type(update_result))            print(update_result)            return update_result  # 如果执行修改成功,返回受影响的行数        except Exception as e:            logging.error("修改的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            conn.rollback()            return False        finally:            logging.info("修改的SQL语句:{},执行成功!".format(sql))            self.close_db_conn_cursor(cursor, conn)  # 关闭连接,其实是把练级放回连接池中# 初始化一次,以后其他地方使用该工具类,只要在前面执行from xxx import db就可以了。db = MySQLdbPooledSingletonDBUtil()# if __name__ == '__main__':#     db = MySQLdbPooledSingletonDBUtil()#     select_sql = """select * from mytab1;"""#     result = db.fetch_all(select_sql)#     print(result)

工具类调用方式

from mysqldb_pool_singleton import dbdef query_one():    """查询一条记录"""    sql = """select * from mytab1;"""    result = db.fetch_one(sql)    print(result)def query_all():    """查询所有记录"""    sql = """select * from mytab1;"""    result = db.fetch_all(sql)    print(result)    for item in result:        print(type(item))        print(item)        print()def query_with_condition_demo1():    """查询所有记录,带有多个where条件,参数严格按照顺序传入"""    sql = """select * from mytab1 where id > %s and col5 like %s ;"""    result = db.fetch_all(sql, (2, "%记录%"))    print(result)    for item in result:        print(type(item))        print(item)        print()def query_with_condition_demo2():    """查询所有记录,带有多个where条件,参数通过key指定"""    sql = """select * from mytab1 where id > %(id)s and col5 like %(col5)s ;"""    result = db.fetch_all(sql=sql, args={"col5": "%记录%", "id": 2})    print(result)    for item in result:        print(type(item))        print(item)        print()def insert_one_demo1():    """执行插入数据的操作"""    sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""    result = db.insert_one(sql, args=("A1", "A2", "A3", 4, "A5记录"))    print(result)def insert_one_demo2():    """执行插入数据的操作"""    sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""    result = db.insert_one(sql, args={"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"})    print(result)def insert_batch_demo1():    """执行批量插入数据的操作"""    sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""    li = [        ("A1", "A2", "A3", 4, "A5记录"),        ("A1", "A2", "A3", 4, "A5记录"),        ("A1", "A2", "A3", 4, "A5记录"),        ("A1", "A2", "A3", 4, "A5记录"),    ]    result = db.insert_many(sql, args=li)    print(result)def insert_batch_demo2():    """执行批量插入数据的操作"""    # 问题:如何批量插入的时候,指定字段名称站位呢?    sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""    tp = [        {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},        {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},        {"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},    ]    result = db.insert_one(sql, args=tp)    print(result)def delete():    """删除的操作"""    sql = """delete from mytab1 where id > %s;"""    # result = db.delete(sql, 30)  # 如果采用这样的方式传入,因为30这是一个值,但是cursor.execute(sql, args)中需要的args类型需要为:list,tuple,dict三种的一种,所以采用下面的方式传入一个(30,)把它转换为一个tuple再传入。    result = db.delete(sql, (30,))    print("删除的影响的行数为:{}".format(result))def update():    """修改的操作"""    sql = """update mytab1 set col1=%s, col2=%s where id > %s;"""    result = db.delete(sql, ("aaaaaa", "bbbbbb", 10))    print("更新的影响的行数为:{}".format(result))if __name__ == "__main__":    # query_one()    query_all()    # query_with_condition_demo1()    # query_with_condition_demo2()    # insert_one_demo1()    # insert_one_demo2()    # insert_batch_demo1()    # insert_batch_demo2()    delete()    # update()

使用上下文方式

工具类实现方式

import loggingimport MySQLdbfrom dbutils.pooled_db import PooledDBconnection_pool = PooledDB(    creator=MySQLdb,  # 使用链接数据库的模块    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制    # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,    # 所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。    maxshared=3,    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]    # ping MySQL服务端,检查是否服务可用。    # 如:0 = None = never,    # 1 = default = whenever it is requested,    # 2 = when a cursor is created,    # 4 = when a query is executed,    # 7 = always    ping=0,    host='127.0.0.1',  # 数据库主机地址    port=33061,  # 数据库主机地址提供的监听端口    user='root',  # 数据库用户名    password='root',  # 数据库用户的密码    database='mydb1',  # 连接到哪一个数据库    charset='utf8'  # 连接数据库的时候使用的字符集编码)class MySQLdbPooledContextDBUtil:    def __init__(self):        """        构造方法,初始化数据库连接池。        :return:        """        # TODO 连接信息从数据库中获取        self.conn = conn = connection_pool.connection()        # self.cursor = conn.cursor()  # 此时查询结果返回的是元组        self.cursor = conn.cursor(MySQLdb.cursors.DictCursor)  # 此时查询结果返回的是字典    def __enter__(self):        """        进入的方法        :return:        """        return self    def __exit__(self, exc_type, exc_val, exc_tb):        """        离开的方法        :param exc_type:        :param exc_val:        :param exc_tb:        :return:        """        self.cursor.close()        self.conn.close()    def execute_sql(self, sql, args=None):        """        执行SQL语句        :param sql: 待执行的SQL语句        :param args: SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            execute_result = self.cursor.execute(sql, args)            self.conn.commit()            print(type(execute_result))            print(execute_result)            return execute_result  # 如果执行成功,返回执行成功的结果,否则返回False        except Exception as e:            logging.error("SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("SQL语句:{},执行成功!".format(sql))    def fetch_one(self, sql, args=None):        """        根据指定的SQL查询语句和SQL查询语句需要的参数,查询一行数据。        :param sql: 待执行的SQL查询语句        :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return:        """        try:            self.cursor.execute(sql, args)            result_one = self.cursor.fetchone()            return result_one        except Exception as e:            logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql))            logging.error(e)        finally:            logging.info("查询的SQL语句:{},查询成功!".format(sql))    def fetch_all(self, sql, args=None):        """        根据指定的SQL查询语句和SQL查询语句需要的参数,查询所有的数据行。        :param sql: 待执行的SQL查询语句        :param args: 待执行的SQL查询语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return:        """        try:            self.cursor.execute(sql, args)            result_all = self.cursor.fetchall()            return result_all        except Exception as e:            logging.error("查询的SQL语句:{},查询失败,其错误信息如下:".format(sql))            logging.error(e)        finally:            logging.info("查询的SQL语句:{},查询成功!".format(sql))    def insert_one(self, sql, args=None):        """        执行INSERT SQL语句        :param sql: 待执行的INSERT SQL语句        :param args: INSERT SQL语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            insert_result = self.cursor.execute(sql, args)            self.conn.commit()            print(type(insert_result))            print(insert_result)            return insert_result  # 如果执行插入成功,返回受影响的行数        except Exception as e:            logging.error("插入的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("插入的SQL语句:{},执行成功!".format(sql))    def insert_many(self, sql, args=None):        """        执行批量插入语句        :param sql: 待执行的插入语句        :param args: 批量插入语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            insert_many_result = self.cursor.executemany(sql, args)            self.conn.commit()            print(type(insert_many_result))            print(insert_many_result)            return insert_many_result  # 如果执行插入成功,返回受影响的行数        except Exception as e:            logging.error("批量插入的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("批量插入的SQL语句:{},执行成功!".format(sql))    def delete(self, sql, args=None):        """        执行DELETE删除语句        :param sql: 待执行DELETE删除语句        :param args: 待执行DELETE删除语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            delete_result = self.cursor.execute(sql, args)            self.conn.commit()            print(type(delete_result))            print(delete_result)            return delete_result  # 如果执行删除成功,返回受影响的行数        except Exception as e:            logging.error("删除的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("删除的SQL语句:{},执行成功!".format(sql))    def update(self, sql, args=None):        """        执行UPDATE修改语句        :param sql: 待执行UPDATE修改语句        :param args: 待执行UPDATE修改语句需要的参数,参数类型为list,tuple,dict三者中的任何一种。        :return: 执行结果        """        try:            update_result = self.cursor.execute(sql, args)            self.conn.commit()            print(type(update_result))            print(update_result)            return update_result  # 如果执行修改成功,返回受影响的行数        except Exception as e:            logging.error("修改的SQL语句:{},执行失败,其错误信息如下:".format(sql))            logging.error(e)            self.conn.rollback()            return False        finally:            logging.info("修改的SQL语句:{},执行成功!".format(sql))

工具类调用方式

from mysqldb_pool_context import MySQLdbPooledContextDBUtildef query_one():"""查询一条"""with MySQLdbPooledContextDBUtil() as db:sql = """select * from mytab1;"""result = db.fetch_one(sql)print(result)def query_all():"""查询多条"""with MySQLdbPooledContextDBUtil() as db:sql = """select * from mytab1;"""result = db.fetch_all(sql)print(result)def query_with_condition_demo1():"""查询所有记录,带有多个where条件,参数严格按照顺序传入"""with MySQLdbPooledContextDBUtil() as db:sql = """select * from mytab1 where id > %s and col5 like %s ;"""result = db.fetch_all(sql, (2, "%记录%"))print(result)for item in result:print(type(item))print(item)print()def query_with_condition_demo2():"""查询所有记录,带有多个where条件,参数通过key指定"""with MySQLdbPooledContextDBUtil() as db:sql = """select * from mytab1 where id > %(id)s and col5 like %(col5)s ;"""result = db.fetch_all(sql=sql, args={"col5": "%记录%", "id": 2})print(result)for item in result:print(type(item))print(item)print()def insert_one_demo1():"""执行插入数据的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""result = db.insert_one(sql, args=("A1", "A2", "A3", 4, "A5记录"))print(result)def insert_one_demo2():"""执行插入数据的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""result = db.insert_one(sql, args={"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"})print(result)def insert_batch_demo1():"""执行批量插入数据的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%s, %s, %s, %s, %s);"""li = [("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),("A1", "A2", "A3", 4, "A5记录"),]result = db.insert_many(sql, args=li)print(result)def insert_batch_demo2():"""执行批量插入数据的操作"""with MySQLdbPooledContextDBUtil() as db:# 问题:如何批量插入的时候,指定字段名称站位呢?sql = """insert into mytab1 (col1, col2, col3, col4, col5) values(%(col1)s, %(col2)s, %(col3)s, %(col4)s, %(col5)s);"""tp = [{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},{"col2": "A2", "col4": 4, "col1": "A1", "col5": "A5记录", "col3": "A3"},]result = db.insert_one(sql, args=tp)print(result)def delete():"""删除的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """delete from mytab1 where id > %s;"""# result = db.delete(sql, 30)  # 如果采用这样的方式传入,因为30这是一个值,但是cursor.execute(sql, args)中需要的args类型需要为:list,tuple,dict三种的一种,所以采用下面的方式传入一个(30,)把它转换为一个tuple再传入。result = db.delete(sql, (30,))print("删除的影响的行数为:{}".format(result))def update():"""修改的操作"""with MySQLdbPooledContextDBUtil() as db:sql = """update mytab1 set col1=%s, col2=%s where id > %s;"""result = db.delete(sql, ("aaaaaa", "bbbbbb", 10))print("更新的影响的行数为:{}".format(result))if __name__ == "__main__":# query_one()# query_all()# query_with_condition_demo1()# query_with_condition_demo2()# insert_one_demo1()# insert_one_demo2()# insert_batch_demo1()# insert_batch_demo2()delete()# update()

关注我《程序猿集锦》获取更多分享。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章