python连接MySQL、SqlServer数据库

2022-05-30 00:00:00 数据 代码 获取 写入 游标

从事python进行数据挖掘、数据处理、数据分析等跟数据打交道的工作,不免会接触到MySQL和SqlServer数据库,下面是python连接MySQL和SqlServer常用的方法进行简单的封装,我从事的大部分工作是获取数据下来进行数据挖掘处理分析,然后对处理完的数据进行写入,所以我只对查询和写入进行封装。


终端直接执行pip命令:

pip install pymysql
pip install pymssql
pip install pandas
复制代码

下面直接上代码

# -*- coding: utf-8 -*-

import pymssql
import pymysql

# 连接MySQL和SqlServer的基类class DataBase(object):
    def __init__(self, host=None, port=None, user=None, pwd=None, db=None, conn=None):
        self.host = host
        self.port = port
        self.user = user
        self.pwd = pwd
        self.db = db
        self.conn = conn

    def get_cursor(self):
        raise NotImplementedError

    def query(self, sql):
        with self.get_cursor() as cur:
            cur.execute(sql)
            # 获取提交sql语句后获取到字段等信息
            # 先获取信息却没执行execute提交sql语句,将获取不到
            description_field_info = cur.description
            description_field = []
            for i in description_field_info:
                description_field.append(i[0])
            # 字段,结果
            return description_field, cur.fetchall()

    def query_one(self, sql):
        with self.get_cursor() as cur:
            cur.execute(sql)
            return cur.fetchone()

    def commit_sql(self, sql):
        with self.get_cursor() as cur:
            cur.execute(sql)
            self.conn.commit()

    def commit_many_sql(self, sql, param):
        with self.get_cursor() as cur:
            cur.executemany(sql, param)
            self.conn.commit()


class MssqlDataBase(DataBase):
    def __init__(self, host=None, port=None, user=None, pwd=None, db=None, conn=None):
        super(MssqlDataBase, self).__init__(host=host, port=port, user=user, pwd=pwd, db=db, conn=conn)
        self.cursor = None

    def get_cursor(self):
        if self.conn:
            self.cursor = self.conn.cursor()
            if not self.cursor:
                raise (NameError, "连接数据库失败")
            return self.cursor
        else:
            self.conn = pymssql.connect(host=self.host, port=self.port, user=self.user, password=self.pwd,
                                        database=self.db,
                                        charset="utf8")
            self.cursor = self.conn.cursor()
            if not self.cursor:
                raise (NameError, "连接数据库失败")
            return self.cursor


class MysqlDataBase(DataBase):
    def __init__(self, host=None, port=None, user=None, pwd=None, db=None, conn=None):
        super(MysqlDataBase, self).__init__(host=host, port=port, user=user, pwd=pwd, db=db, conn=conn)
        self.cursor = None

    def connect(self):
        self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.pwd,
                                    database=self.db,
                                    charset="utf8")

    def get_cursor(self, is_sscursor=None):
        if self.conn:
            # 防止太久不动作断开连接
            self.ping()
            if is_sscursor:
            # 获取流式游标
                self.cursor = pymysql.cursors.SSCursor(self.conn)
            else:
                self.cursor = self.conn.cursor()
            if not self.cursor:
                raise (NameError, "连接数据库失败")
            return self.cursor
        else:
            self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.pwd,
                                        database=self.db,
                                        charset="utf8")
            if is_sscursor:
                self.cursor = self.conn.cursor(cursor=pymysql.cursors.SSCursor)
            else:
                self.cursor = self.conn.cursor()
            if not self.cursor:
                raise (NameError, "连接数据库失败")
            return self.cursor

    def get_sscursor(self, sql):
        cur = self.get_cursor(is_sscursor=True)
        cur.execute(sql)
        return cur

    def ping(self, reconnect=True):
        if self.conn is None:
            self.connect()
        self.conn.ping(reconnect=reconnect)
        self.cursor = None
复制代码
# 提交写在函数里面,调用直接return连接
def get_mysql_world():
        return MysqlDataBase(
            host='', port=3306, user='', pwd='', db='world'
        )
复制代码
if __name__ == '__main__':
    import pandas
    res_field, res = get_mysql_world().query('select * from city limit 1')
    # 直接改变列名转换成DataFrame对象后续进行数据处理
    df = pd.DataFrame(res,columns=res_field)
复制代码

pandas也有读取写入MySQL的相关api,但是,当数据量过大时,使用pandas读取和写入MySQL都是非常耗时的,有可能差个10几倍的速度,大家后续工作遇到可以测试一下,我这边是在工作遇到的问题。

流式游标

当mysql数据量很大时,可以使用用流式游标

Python通过pymysql操作向mysql读取千万、百万级别的数据库时

如果用传统的fetchall()或fetchone()方法,都是先默认在内存里缓存下所有行然后再处理,大量的数据会导致内存资源消耗光,内存容易溢出

此时则建议使用SSCursor(流式游标),避免客户端占用大量内存

这个 cursor 实际上没有缓存下来任何数据,它不会读取所有所有到内存中,它的做法是从储存块中读取记录,并且一条一条返回给你,使用迭代器而不用 fetchall ,即省内存又能很快拿到数据

参考文献

blog.csdn.net/weixin_4205…

相关文章