Dbutils 连接池

作者: ropon 分类: Flask 发布时间: 2019-04-10 16:18
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019/4/10 13:27
# @Author  : Ropon
# @File    : dbpool.py

import pymysql
from DBUtils.PooledDB import PooledDB

POOL = PooledDB(
    creator=pymysql,  # 连接数据库的模块
    maxconnections=10,  # 连接池中允许最大连接数 0和None 表示不限制
    mincached=3,  # 初始化时,连接池中至少创建空闲连接数 0表示不创建
    maxcached=5,  # 连接池中最多闲置连接数 0和None 表示不限制
    maxshared=5,  # 连接池中最多共享的连接数 0和None 表示全部共享
    blocking=True,  # 连接池中无可用连接,是否阻塞等待 True 等待 False 不等待报错
    maxusage=None,  # 一个连接最多被重复使用次数 None表示无限制
    setsession=[],
    ping=0,
    host="x.x.x.x",
    port=3306,
    user="xxxxxx",
    password="xxxxxx",
    db="xxxxxx",
    charset="utf8"
)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019/4/10 14:07
# @Author  : Ropon
# @File    : sqlhelper.py

import pymysql
from dbpool import POOL


def create_conn():
    conn = POOL.connection()
    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    return conn, cursor


def close_conn(conn, cursor):
    cursor.close()
    conn.close()


def insert(sql, args):
    conn, cursor = create_conn()
    res = cursor.execute(sql, args)
    conn.commit()
    close_conn(conn, cursor)
    return res


def fetch_one(sql, args=None):
    conn, cursor = create_conn()
    cursor.execute(sql, args)
    res = cursor.fetchone()
    close_conn(conn, cursor)
    return res


def fetch_all(sql, args=None):
    conn, cursor = create_conn()
    cursor.execute(sql, args)
    res = cursor.fetchall()
    close_conn(conn, cursor)
    return res


# sql = "select * from user where nid=%s"
# print(fetch_one(sql, 4))

sql = "select * from user"
print(fetch_all(sql))

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!