将大型 Pandas 数据帧写入 SQL Server 数据库
我有 74 个相对较大的 Pandas DataFrame(大约 34,600 行和 8 列),我试图尽快将它们插入到 SQL Server 数据库中.在做了一些研究之后,我了解到好的 ole pandas.to_sql
函数并不适合将如此大的插入到 SQL Server 数据库中,这是我最初采用的方法(非常慢 - 将近一个小时应用程序完成,而使用 mysql 数据库时大约需要 4 分钟.)
I have 74 relatively large Pandas DataFrames (About 34,600 rows and 8 columns) that I am trying to insert into a SQL Server database as quickly as possible. After doing some research, I learned that the good ole pandas.to_sql
function is not good for such large inserts into a SQL Server database, which was the initial approach that I took (very slow - almost an hour for the application to complete vs about 4 minutes when using mysql database.)
这篇文章 和许多其他 StackOverflow 帖子都帮助我指明了正确的方向,但我遇到了障碍:
This article, and many other StackOverflow posts have been helpful in pointing me in the right direction, however I've hit a roadblock:
出于上面链接中解释的原因,我正在尝试使用 SQLAlchemy 的核心而不是 ORM.所以,我将数据帧转换为字典,使用 pandas.to_dict
然后执行 execute()
和 insert()
:>
I am trying to use SQLAlchemy's Core rather than the ORM for reasons explained in the link above. So, I am converting the dataframe to a dictionary, using pandas.to_dict
and then doing an execute()
and insert()
:
self._session_factory.engine.execute(
TimeSeriesResultValues.__table__.insert(),
data)
# 'data' is a list of dictionaries.
问题是插入没有得到任何值——它们显示为一堆空括号,我得到这个错误:
The problem is that insert is not getting any values -- they appear as a bunch of empty parenthesis and I get this error:
(pyodbc.IntegretyError) ('23000', "[23000] [FreeTDS][SQL Server]Cannot
insert the value NULL into the column...
我传入的字典列表中有值,所以我不知道为什么这些值没有显示出来.
There are values in the list of dictionaries that I passed in, so I can't figure out why the values aren't showing up.
这是我要离开的示例:
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name": 'NAME ' + str(i)} for i in range(n)]
)
print("SQLAlchemy Core: Total time for " + str(n) +
" records " + str(time.time() - t0) + " secs")
推荐答案
我有一个悲伤的消息要告诉你,SQLAlchemy 实际上并没有为 SQL Server 实现批量导入,它实际上只是执行同样缓慢的单个 INSERTto_sql
正在执行的语句.我会说你最好的选择是尝试使用 bcp
命令行工具编写一些脚本.这是我过去使用过的脚本,但不能保证:
I've got some sad news for you, SQLAlchemy actually doesn't implement bulk imports for SQL Server, it's actually just going to do the same slow individual INSERT statements that to_sql
is doing. I would say that your best bet is to try and script something up using the bcp
command line tool. Here is a script that I've used in the past, but no guarantees:
from subprocess import check_output, call
import pandas as pd
import numpy as np
import os
pad = 0.1
tablename = 'sandbox.max.pybcp_test'
overwrite=True
raise_exception = True
server = 'P01'
trusted_connection= True
username=None
password=None
delimiter='|'
df = pd.read_csv('D:/inputdata.csv', encoding='latin', error_bad_lines=False)
def get_column_def_sql(col):
if col.dtype == object:
width = col.str.len().max() * (1+pad)
return '[{}] varchar({})'.format(col.name, int(width))
elif np.issubdtype(col.dtype, float):
return'[{}] float'.format(col.name)
elif np.issubdtype(col.dtype, int):
return '[{}] int'.format(col.name)
else:
if raise_exception:
raise NotImplementedError('data type {} not implemented'.format(col.dtype))
else:
print('Warning: cast column {} as varchar; data type {} not implemented'.format(col, col.dtype))
width = col.str.len().max() * (1+pad)
return '[{}] varchar({})'.format(col.name, int(width))
def create_table(df, tablename, server, trusted_connection, username, password, pad):
if trusted_connection:
login_string = '-E'
else:
login_string = '-U {} -P {}'.format(username, password)
col_defs = []
for col in df:
col_defs += [get_column_def_sql(df[col])]
query_string = 'CREATE TABLE {}
({})
GO
QUIT'.format(tablename, ',
'.join(col_defs))
if overwrite == True:
query_string = "IF OBJECT_ID('{}', 'U') IS NOT NULL DROP TABLE {};".format(tablename, tablename) + query_string
query_file = 'c:\pybcp_tempqueryfile.sql'
with open (query_file,'w') as f:
f.write(query_string)
if trusted_connection:
login_string = '-E'
else:
login_string = '-U {} -P {}'.format(username, password)
o = call('sqlcmd -S {} {} -i {}'.format(server, login_string, query_file), shell=True)
if o != 0:
raise BaseException("Failed to create table")
# o = call('del {}'.format(query_file), shell=True)
def call_bcp(df, tablename):
if trusted_connection:
login_string = '-T'
else:
login_string = '-U {} -P {}'.format(username, password)
temp_file = 'c:\pybcp_tempqueryfile.csv'
#remove the delimiter and change the encoding of the data frame to latin so sql server can read it
df.loc[:,df.dtypes == object] = df.loc[:,df.dtypes == object].apply(lambda col: col.str.replace(delimiter,'').str.encode('latin'))
df.to_csv(temp_file, index = False, sep = '|', errors='ignore')
o = call('bcp sandbox.max.pybcp_test2 in c:pybcp_tempqueryfile.csv -S "localhost" -T -t^| -r
-c')
相关文章