MYSQL 从项目经理的一次查询, 到PYTHON 解决问题(2) --传统企业使用MYSQL的问题

2021-04-01 00:00:00 软件 数据库 企业 并发 外包

上一期的读者这个话题的读者浏览量不是太多,有点可惜了, 实际上这就是

传统企业在使用MYSQL时的问题. 解决方案很多,作为上一期的续集,我想从

几点来阐述一下传统企业使用MYSQL的一些问题.


1 不少传统企业的软件开发是外包性质的,外包企业都是有一些成熟的架构的,

大部分企业支持的数据库的列表都包含MYSQL ,并且MYSQL也是大部分企业使用

的开源数据库之一. 那问题在哪里


1 传统企业并未有互联网的企业的技术水平,包含运维的水平,MYSQL的维护水平差,

对MYSQL的认知水平也差,例如如果你问 MYSQL 是否适合所有业务的场景,大部分的

回答可能是YES.

2 部分软件外包企业的人员流动大,技术本身积累的一般,当然大的软件外包商还是

可以的,小的软件外包,就不好说了,问什么都支持,其实都是话术,真正能会使用MYSQL

的软件人员就更少了,并且为了和涨春笋形式的软件开发速度一致,部分软件外包将ORACLE

的表结构直接在MYSQL中实现,是部分企业的软件运行不畅和频频出问题的一个原因.


所以呢,真心希望某些软件外包上,能请一个的数据库专家,给你们普及一下表怎么设计,

怎么能符合数据库原理的使用数据库


2 另外在MYSQL 中火热的分表,尤其是多个物理主机形式的分表方式 ,逻辑分表

或者 DBLE 方式的分表,在不少传统企业做起来比较困难,维护MYSQL的难度也提高了.所以

软件外包上的分库分表,就变成了在一个MYSQL实例上的 分库分表, 通过逻辑关系将一个表

打散变为N 张表. 这样解决很好,可使用的人员,尤其是需要通过SQL 来查询业务问题的

一批人,就感到困惑了.


所以就有了下面的这个程序,(如果不清楚这个程序的产生的原因,和在MYSQL的之前通过SQL

来查询产生的问题可以翻翻上一篇前传)



这个程序主要的想法是充分利用MYSQL的高并发,将数据查询打散,通过一个SESSION 处理

一个逻辑的查询,将几十万与几千万的两个表进行程序方式的JOIN ,终获得需要的数据

这里我们开了200个并发,并且计算了120万次,在6分钟交付了数据的分析结果,下面是

相关的程序.


import configparser

import pymysql
import time
import asyncio #标准库异步线程的库,协程,与多并发不同的是这个是单线程的
from aiomysql import create_pool #第三方库,为MYSQL 来进行的异步线程操作库




class Solution: #定义类
def __init__(self, sql1, sql2): #初始化参数
config = configparser.ConfigParser()
config.read('config.ini') #读取配置文件
self.host = config.get('config', 'host')
self.user = config.get('config', 'user')
self.password = config.get('config', 'password')
self.database = config.get('config', 'database')
self.save_database = config.get('config', 'save_database')
self.save_user = config.get('config', 'save_user')
self.save_password = config.get('config', 'save_password')

self.conn = pymysql.connect(host=self.host, user=self.user, password=self.password, database=self.database,
charset='utf8mb4') #
self.sql1 = sql1 #定义两个SQL
self.sql2 = sql2

self.task_num = 300 #异步并发数量, 一次可以干300个事务
self.pool_size = 100 #连接池size读写各100
def get_ids(self, ):
cursor = self.conn.cursor()
cursor.execute(self.sql1)
self.data_a_full = cursor.fetchall()
self.data_a_full_len = len(self.data_a_full) # 获取查询的COUNT

def run(self, ):

self.get_ids()
cur_loop = asyncio.get_event_loop() #获取一个事件的循环
cur_loop.run_until_complete(self.start(cur_loop))

async def start(self, loop):
pool = await create_pool(host=self.host, port=3306,
user=self.user, password=self.password,
db=self.database, loop=loop, maxsize=self.pool_size)
save_pool = await create_pool(host=self.host, port=3306,
user=self.save_user, password=self.save_password,
db=self.save_database, loop=loop, maxsize=self.pool_size)
i = 0
while True:
start = time.time()
m = self.data_a_full[i:i + self.task_num]
i += self.task_num
tasks = []
if len(m) == 0: #每次给300个事务,直到这个池里面的没有数据取出
break
for s in m:
# print(s)
sql = f"{sql2}'{s[0]}'" #PYTHON3.X新特性简写,拼接SQL
c1 = self.select(loop=loop, sql=sql, pool=pool, save_pool=save_pool, data=s)
tasks.append(c1) #执行SQL 语句
await asyncio.gather(*tasks)
print(time.time() - start,'----------',i,'/',self.data_a_full_len)

async def select(self, loop, sql, pool, save_pool, data): #处理的业务逻辑
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(sql)
r = await cur.fetchall()
table_name_dict = {
'F': 'tb_f',
'T': 'tb_t',
'D': 'tb_d',
}
sum_dict = {
'F': 0, 'T': 0, 'D': 0
}
for item in r:
if item[1] is not None:
sum_dict[item[1]] = sum_dict.get(item[1]) + item[0]
data = list(data)
sql_list = []
for key,value in sum_dict.items():
table_name = table_name_dict.get(key)
new_data = data+[value]
new_data[2] = str(new_data[2])
if new_data[4] is not None:
new_data[4] = str(float(new_data[4]))
else:
new_data[4] = 0
new_data[3] = str(new_data[3])
if table_name: #结果插入到MYSQL数据库中
insert_sql = "insert into {} (CONTRACTNO,APPLYNO,ACTIVEDATE,term,AMORTIZEAMT) values ('{}','{}','{}',{},{})".format(
table_name, *new_data)
sql_list.append(insert_sql)
sql_do = ';'.join(sql_list)
# print(sql_do)
async with save_pool.acquire() as save_conn:
async with save_conn.cursor() as save_cur:
await save_cur.execute(sql_do)
await save_conn.commit()



if __name__ == '__main__':
s = time.time()
sql1 = "select CONTRACTNO,APPLYNO,ACTIVEDATE,term from tb_sync_contract where ACTIVEDATE>='2020-07-01'"
sql2 = "select AMORTIZEAMT,SAP_POSTING_IND from tb_amortize where CAMAINID="
solution = Solution(sql1, sql2)
solution.run()

print(time.time() - s)



终我们在一个16G 4CORE 核心的MYSQL 5.7.23 的数据库中,成功的产生200并发,模拟

了75万与2千600百万的数据的JOIN的计算,产生结果 时间在6分钟.


感谢程序的提供者,我们的TEAM的 PYTHON专家兼 REDIS DBA 闫树爽.

另外随着我的TEAM的人员增多, 有PYTHON专家,有POSTGRESQL, MYSQL 的专家,估计

以后能SHARE的文字会越来越多.







相关文章