python环境测试MySQLdb、DB

2023-01-31 01:01:43 python 测试 环境

   首先介绍下Mysqldb、DBUtil、sqlobject:

   (1)mysqldb 是用于python连接Mysql数据库的接口,它实现了 Python 数据库 api 规范 V2.0,基于 MySQL C API 上建立的。除了MySQLdb外,python还可以通过oursql, PyMySQL, myconnpy等模块实现MySQL数据库操作;

   (2)DBUtil中提供了几种连接池,用以提高数据库的访问性能,例如PooledDB,PesistentDB等

   (3)sqlobject可以实现数据库ORM映射的第三方模块,可以以对象、实例的方式轻松操作数据库中记录。

   

    为测试这三者的性能,简单做一个例子:50个并发访问4000条记录的单表,数据库记录如下:

wKioL1UHAynA120PAACIghwJV_M537.jpg

    测试代码如下:

    1、MySQLdb的代码如下,其中在connDB()中把连接池相关代码暂时做了一个注释,去掉这个注释既可以使用连接池来创建数据库连接:

   (1)DBOperator.py

import MySQLdb
from stockmining.stocks.setting import LoggerFactory
import connectionpool

class DBOperator(object):
    
    def __init__(self):
        self.logger = LoggerFactory.getLogger('DBOperator')
        self.conn = None
              
    def connDB(self):
        self.conn=MySQLdb.connect(host="127.0.0.1",user="root",passwd="root",db="pystock",port=3307,charset="utf8")  
        #当需要使用连接池的时候开启
        #self.conn=connectionpool.pool.connection()
        return self.conn

    def closeDB(self):
        if(self.conn != None):
            self.conn.close()  
  
    def execute(self, sql):
        try:
            if(self.conn != None):
                cursor = self.conn.cursor()
            else:
                raise MySQLdb.Error('No connection')
            
            n = cursor.execute(sql)
            return n
        except MySQLdb.Error,e:
            self.logger.error("Mysql Error %d: %s" % (e.args[0], e.args[1]))
 
    def findBySQL(self, sql):
        try:
            if(self.conn != None):
                cursor = self.conn.cursor()
            else:
                raise MySQLdb.Error('No connection')
            
            cursor.execute(sql)
            rows = cursor.fetchall() 
            return rows
        except MySQLdb.Error,e:
            self.logger.error("Mysql Error %d: %s" % (e.args[0], e.args[1]))

   

   (2)测试代码testMysql.py,做了50个并发,对获取到的数据库记录做了个简单遍历。

import threading  
import time  
import DBOperator

def run(): 
    operator = DBOperator()
    operator.connDB()
    starttime = time.time()
    sql = "select * from stock_cash_tencent"
    peeps = operator.findBySQL(sql)
    for r in peeps: pass  
    operator.closeDB()
    endtime = time.time()
    diff =  (endtime - starttime)*1000
    print diff
    
def test():  
    for i in range(50):  
        threading.Thread(target = run).start() 
        time.sleep(1)
    
if __name__ == '__main__':  
     test()

   

    2、连接池相关代码:

    (1)connectionpool.py

from DBUtils import PooledDB
import MySQLdb
import string

maxconn = 30            #最大连接数
mincached = 10           #最小空闲连接
maxcached = 20          #最大空闲连接
maxshared = 30          #最大共享连接
connstring="root#root#127.0.0.1#3307#pystock#utf8" #数据库地址
dbtype = "mysql"  

def createConnectionPool(connstring, dbtype):
    db_conn = connstring.split("#");
    if dbtype=='mysql':
        try:
            pool = PooledDB.PooledDB(MySQLdb, user=db_conn[0],passwd=db_conn[1],host=db_conn[2],port=string.atoi(db_conn[3]),db=db_conn[4],charset=db_conn[5], mincached=mincached,maxcached=maxcached,maxshared=maxshared,maxconnections=maxconn)
            return pool
        except Exception, e:
            raise Exception,'conn datasource Excepts,%s!!!(%s).'%(db_conn[2],str(e))
            return None


pool = createConnectionPool(connstring, dbtype)

   

   3、sqlobject相关代码

   (1)connection.py

from sqlobject.mysql import builder

conn = builder()(user='root', passWord='root',
                 host='127.0.0.1', db='pystock', port=3307, charset='utf8')

  

    (2)StockCashTencent.py对应到数据库中的表,50个并发并作了一个简单的遍历。(实际上,如果不做遍历,只做count()计算,sqlobject性能是相当高的。)

import sqlobject
import time
from connection import conn
import threading

class StockCashTencent(sqlobject.SQLObject):
    _connection = conn
    
    code = sqlobject.StrinGCol()
    name = sqlobject.StringCol()
    date = sqlobject.StringCol()
    main_in_cash = sqlobject.FloatCol()   
    main_out_cash = sqlobject.FloatCol()  
    main_net_cash = sqlobject.FloatCol()
    main_net_rate= sqlobject.FloatCol()
    private_in_cash= sqlobject.FloatCol()
    private_out_cash= sqlobject.FloatCol()
    private_net_cash= sqlobject.FloatCol()
    private_net_rate= sqlobject.FloatCol()
    total_cash= sqlobject.FloatCol()

def test():
    starttime = time.time()
    query  = StockCashTencent.select(True)
    for result in query: pass
    endtime = time.time()
    diff =  (endtime - starttime)*1000
    print diff
        
if __name__ == '__main__':  
   for i in range(50):  
        threading.Thread(target = test).start()   
        time.sleep(1)


      测试结果如下:

MySQLdb平均(毫秒)99.63×××71
DBUtil平均(毫秒)97.07998276
sqlobject平均(毫秒)343.2999897


wKioL1UHAnXQKbJEAAD8JsxLQw8592.jpg

  

     结论:其实就测试数据而言,MySQLdb单连接和DBUtil连接池的性能并没有很大的区别(100个并发下也相差无几),相反sqlobject虽然具有的编程上的便利性,但是却带来性能上的巨大不足,在实际中使用哪个模块就要斟酌而定了。


     (冯智杰:PMP、信息系统项目管理师,大型企业高级开发经理,关注金融、供应链行业发展,擅长敏捷开发项目管理、技术研发等)

相关文章