正如@Pylander 指出的那样
到目前为止,Turbodbc 是数据摄取的最佳选择!
我对此感到非常兴奋,以至于我在我的 github 和媒体上写了一个“博客”:请查看https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e
一个工作示例并与 pandas.to_sql 进行比较
长话短说,
使用 turbodbc 我在 3 秒内有 10000 行(77 列)
使用 pandas.to_sql 我在 198 秒内得到了相同的 10000 行(77 列)......
这是我正在做的全部细节
进口:
import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time
加载并处理一些数据 - 用我的 sample.pkl 代替你的:
df = pd.read_pickle('sample.pkl')
df.columns = df.columns.str.strip() # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('', np.nan) # map nans, to drop NAs rows and columns later
df = df.dropna(how='all', axis=0) # remove rows containing only NAs
df = df.dropna(how='all', axis=1) # remove columns containing only NAs
df = df.replace(np.nan, 'NA') # turbodbc hates null values...
使用 sqlAlchemy 创建表
不幸的是,turbodbc 需要大量的开销和大量的 sql 手工劳动,用于创建表和在其上插入数据。
幸运的是,Python 是纯粹的快乐,我们可以自动化编写 sql 代码的过程。
第一步是创建将接收我们数据的表。但是,如果您的表具有多个列,则手动编写 sql 代码创建表可能会出现问题。就我而言,这些表通常有 240 列!
这就是 sqlAlchemy 和 pandas 仍然可以帮助我们的地方:pandas 不适合写入大量行(在本例中为 10000 行),但是表头只有 6 行呢?这样,我们可以自动执行创建表的过程。
创建 sqlAlchemy 连接:
mydb = 'someDB'
def make_con(db):
"""Connect to a specified db."""
database_connection = sqlalchemy.create_engine(
'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
myuser, mypassword,
myhost, db
)
)
return database_connection
pd_connection = make_con(mydb)
在 SQL Server 上创建表
使用 pandas + sqlAlchemy,但只是为前面提到的 turbodbc 准备空间。请注意此处的 df.head():我们使用 pandas + sqlAlchemy 仅插入 6 行数据。这将运行得非常快,并且正在自动创建表。
table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)
既然桌子已经就位,让我们在这里认真一点。
Turbodbc 连接:
def turbo_conn(mydb):
"""Connect to a specified db - turbo."""
database_connection = turbodbc.connect(
driver='ODBC Driver 17 for SQL Server',
server=myhost,
database=mydb,
uid=myuser,
pwd=mypassword
)
return database_connection
为 turbodbc 准备 sql 命令和数据。让我们创造性地自动化这个代码创建:
def turbo_write(mydb, df, table):
"""Use turbodbc to insert data into sql."""
start = time.time()
# preparing columns
colunas = '('
colunas += ', '.join(df.columns)
colunas += ')'
# preparing value place holders
val_place_holder = ['?' for col in df.columns]
sql_val = '('
sql_val += ', '.join(val_place_holder)
sql_val += ')'
# writing sql query for turbodbc
sql = f"""
INSERT INTO {mydb}.dbo.{table} {colunas}
VALUES {sql_val}
"""
# writing array of values for turbodbc
valores_df = [df[col].values for col in df.columns]
# cleans the previous head insert
with connection.cursor() as cursor:
cursor.execute(f"delete from {mydb}.dbo.{table}")
connection.commit()
# inserts data, for real
with connection.cursor() as cursor:
try:
cursor.executemanycolumns(sql, valores_df)
connection.commit()
except Exception:
connection.rollback()
print('something went wrong')
stop = time.time() - start
return print(f'finished in {stop} seconds')
使用 turbodbc 写入数据 - 我在 3 秒内有 10000 行(77 列):
turbo_write(mydb, df.sample(10000), table)
Pandas 方法比较 - 我在 198 秒内得到了相同的 10000 行(77 列)……</p>
table = 'pd_testing'
def pandas_comparisson(df, table):
"""Load data using pandas."""
start = time.time()
df.to_sql(table, con=pd_connection, index=False)
stop = time.time() - start
return print(f'finished in {stop} seconds')
pandas_comparisson(df.sample(10000), table)
环境和条件
Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0’
sqlAlchemy version ‘1.2.12’
pandas version ‘0.23.4’
Microsoft SQL Server 2014
user with bulk operations privileges
请检查https://erickfis.github.io/loose-code/以获取此代码中的更新!