72

我想向pandas.DataFrame运行 MS SQL 的远程服务器发送大数据。我现在这样做的方法是将对象转换为元组列表,然后使用 pyODBC 的函数data_frame将其发送出去。executemany()它是这样的:

 import pyodbc as pdb

 list_of_tuples = convert_df(data_frame)

 connection = pdb.connect(cnxn_str)

 cursor = connection.cursor()
 cursor.fast_executemany = True
 cursor.executemany(sql_statement, list_of_tuples)
 connection.commit()

 cursor.close()
 connection.close()

data_frame.to_sql()然后我开始怀疑是否可以通过使用方法来加快速度(或者至少更具可读性) 。我想出了以下解决方案:

 import sqlalchemy as sa

 engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
 data_frame.to_sql(table_name, engine, index=False)

现在代码更具可读性,但上传速度至少慢了 150 倍......

有没有办法fast_executemany在使用 SQLAlchemy 时翻转?

我正在使用 pandas-0.20.3、pyODBC-4.0.21 和 sqlalchemy-1.1.13。

4

9 回答 9

79

编辑(2019-03-08): Gord Thompson 在下面评论了来自 sqlalchemy 更新日志的好消息:自 2019-03-04 发布的 SQLAlchemy 1.3.0 以来,sqlalchemy 现在支持engine = create_engine(sqlalchemy_url, fast_executemany=True)方言mssql+pyodbc。即,不再需要定义函数并使用@event.listens_for(engine, 'before_cursor_execute')意味着可以删除下面的函数,只需要在 create_engine 语句中设置标志 - 并且仍然保持加速。

原帖:

刚刚注册了一个帐户来发布这个。我想在上面的帖子下发表评论,因为它是对已经提供的答案的跟进。上面的解决方案对我来说适用于从基于 Ubuntu 的安装写入的 Microsft SQL 存储上的版本 17 SQL 驱动程序。

我用来显着加快速度的完整代码(加速 > 100 倍)如下。这是一个交钥匙代码段,前提是您使用相关详细信息更改连接字符串。对于上面的海报,非常感谢您的解决方案,因为我已经为此寻找了相当长的时间。

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus


conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)


@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    print("FUNC call")
    if executemany:
        cursor.fast_executemany = True


table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))


s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)

根据下面的评论,我想花一些时间来解释一下关于 pandasto_sql实现和查询处理方式的一些限制。有两件事可能会导致MemoryError被提升 afaik:

1) 假设您正在写入远程 SQL 存储。当您尝试使用该to_sql方法编写大型 pandas DataFrame 时,它​​将整个数据帧转换为值列表。这种转换比原始 DataFrame 占用更多的 RAM(最重要的是,旧的 DataFrame 仍然存在于 RAM 中)。此列表提供给executemany您的 ODBC 连接器的最终调用。我认为 ODBC 连接器在处理如此大的查询时遇到了一些麻烦。解决这个问题的一种方法是为to_sql方法提供一个 chunksize 参数(10**5 似乎是最佳的,在 Azure 的 2 CPU 7GB ram MSSQL 存储应用程序上提供大约 600 mbit/s(!)的写入速度 - 不能推荐天蓝色顺便说一句)。因此,第一个限制,即查询大小,可以通过提供chunksize争论。但是,这不能让您编写大小为 10**7 或更大的数据帧(至少不是在我正在使用的具有 ~55GB RAM 的 VM 上),即问题 nr 2。

这可以通过将 DataFrame 分解为np.split(10**6 大小的 DataFrame 块)来规避。这些可以迭代地写掉。当我为 pandas 核心中的方法准备好解决方案时,我将尝试发出拉取请求,to_sql这样您就不必每次都进行预先分解。无论如何,我最终编写了一个与以下类似(不是交钥匙)的功能:

import pandas as pd
import numpy as np

def write_df_to_sql(df, **kwargs):
    chunks = np.split(df, df.shape()[0] / 10**6)
    for chunk in chunks:
        chunk.to_sql(**kwargs)
    return True

可以在此处查看上述代码段的更完整示例:https ://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

这是我编写的一个类,它包含了补丁并减轻了与 SQL 建立连接所带来的一些必要开销。还是要写一些文档。我还计划将补丁贡献给 pandas 本身,但还没有找到一个很好的方法来做到这一点。

我希望这有帮助。

于 2018-02-19T07:20:12.810 回答
40

在联系了 SQLAlchemy 的开发者之后,出现了解决这个问题的方法。非常感谢他们的出色工作!

必须使用游标执行事件并检查executemany标志是否已升起。如果确实如此,请打开该fast_executemany选项。例如:

from sqlalchemy import event

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

更多关于执行事件的信息可以在这里找到。


更新:SQLAlchemy 1.3.0中添加了对fast_executemanyof 的支持,因此不再需要此 hack。pyodbc

于 2018-01-02T17:10:09.287 回答
20

我遇到了同样的问题,但使用的是 PostgreSQL。他们现在刚刚发布了pandas 0.24.0 版to_sql,并且函数中有一个新参数,method它解决了我的问题。

from sqlalchemy import create_engine

engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")

上传速度对我来说快了 100 倍。chunksize如果要发送大量数据,我还建议设置参数。

于 2019-02-01T09:52:33.183 回答
9

我只是想把这个完整的例子作为一个额外的高性能选项发布给那些可以使用新的 turbodbc 库的人:http: //turbodbc.readthedocs.io/en/latest/

显然,pandas .to_sql() 之间有很多选择,通过 sqlalchemy 触发 fast_executemany,直接使用 pyodbc 和 tuples/lists/等,甚至尝试使用平面文件进行 BULK UPLOAD。

希望随着功能在当前 pandas 项目中的发展或将来包括类似 turbodbc 集成的功能,以下内容可能会让生活变得更加愉快。

import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)

test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]

                CREATE TABLE [db_name].[schema].[test]
                (
                    id int NULL,
                    transaction_dt datetime NULL,
                    units int NULL,
                    measures float NULL
                )

                INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
                VALUES (?,?,?,?) '''

cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]

turbodbc 在许多用例中应该非常快(特别是对于 numpy 数组)。请注意将底层 numpy 数组从数据框列作为参数直接传递给查询是多么简单。我也相信这有助于防止创建过度消耗内存的中间对象。希望这有帮助!

于 2018-02-21T19:30:47.573 回答
7

似乎 Pandas 0.23.0 和 0.24.0使用 PyODBC 的多值插入INSERT ... VALUES ...,这阻止了快速执行的帮助——每个块发出一条语句。多值插入块是对旧的慢速 executemany 默认值的改进,但至少在简单的测试中,快速 executemany 方法仍然占主导地位,更不用说不需要手动chunksize计算,就像多值插入所要求的那样。如果以后没有提供配置选项,可以通过monkeypatching来强制旧行为:

import pandas.io.sql

def insert_statement(self, data, conn):
    return self.table.insert(), data

pandas.io.sql.SQLTable.insert_statement = insert_statement

未来就在这里,至少在master分支中可以使用关键字参数来控制插入method=方法to_sql()。它默认为None,这会强制执行 executemany 方法。使用多值插入传递method='multi'结果。它甚至可以用于实现 DBMS 特定的方法,例如 Postgresql COPY

于 2018-08-23T17:59:38.917 回答
6

正如@Pylander 指出的那样

到目前为止,Turbodbc 是数据摄取的最佳选择!

我对此感到非常兴奋,以至于我在我的 github 和媒体上写了一个“博客”:请查看https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e

一个工作示例并与 pandas.to_sql 进行比较

长话短说,

使用 turbodbc 我在 3 秒内有 10000 行(77 列)

使用 pandas.to_sql 我在 198 秒内得到了相同的 10000 行(77 列)......

这是我正在做的全部细节

进口:

import sqlalchemy
import pandas as pd
import numpy as np
import turbodbc
import time

加载并处理一些数据 - 用我的 sample.pkl 代替你的:

df = pd.read_pickle('sample.pkl')

df.columns = df.columns.str.strip()  # remove white spaces around column names
df = df.applymap(str.strip) # remove white spaces around values
df = df.replace('', np.nan)  # map nans, to drop NAs rows and columns later
df = df.dropna(how='all', axis=0)  # remove rows containing only NAs
df = df.dropna(how='all', axis=1)  # remove columns containing only NAs
df = df.replace(np.nan, 'NA')  # turbodbc hates null values...

使用 sqlAlchemy 创建表

不幸的是,turbodbc 需要大量的开销和大量的 sql 手工劳动,用于创建表和在其上插入数据。

幸运的是,Python 是纯粹的快乐,我们可以自动化编写 sql 代码的过程。

第一步是创建将接收我们数据的表。但是,如果您的表具有多个列,则手动编写 sql 代码创建表可能会出现问题。就我而言,这些表通常有 240 列!

这就是 sqlAlchemy 和 pandas 仍然可以帮助我们的地方:pandas 不适合写入大量行(在本例中为 10000 行),但是表头只有 6 行呢?这样,我们可以自动执行创建表的过程。

创建 sqlAlchemy 连接:

mydb = 'someDB'

def make_con(db):
    """Connect to a specified db."""
    database_connection = sqlalchemy.create_engine(
        'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
            myuser, mypassword,
            myhost, db
            )
        )
    return database_connection

pd_connection = make_con(mydb)

在 SQL Server 上创建表

使用 pandas + sqlAlchemy,但只是为前面提到的 turbodbc 准备空间。请注意此处的 df.head():我们使用 pandas + sqlAlchemy 仅插入 6 行数据。这将运行得非常快,并且正在自动创建表。

table = 'testing'
df.head().to_sql(table, con=pd_connection, index=False)

既然桌子已经就位,让我们在这里认真一点。

Turbodbc 连接:

def turbo_conn(mydb):
    """Connect to a specified db - turbo."""
    database_connection = turbodbc.connect(
                                            driver='ODBC Driver 17 for SQL Server',
                                            server=myhost,
                                            database=mydb,
                                            uid=myuser,
                                            pwd=mypassword
                                        )
    return database_connection

为 turbodbc 准备 sql 命令和数据。让我们创造性地自动化这个代码创建:

def turbo_write(mydb, df, table):
    """Use turbodbc to insert data into sql."""
    start = time.time()
    # preparing columns
    colunas = '('
    colunas += ', '.join(df.columns)
    colunas += ')'

    # preparing value place holders
    val_place_holder = ['?' for col in df.columns]
    sql_val = '('
    sql_val += ', '.join(val_place_holder)
    sql_val += ')'

    # writing sql query for turbodbc
    sql = f"""
    INSERT INTO {mydb}.dbo.{table} {colunas}
    VALUES {sql_val}
    """

    # writing array of values for turbodbc
    valores_df = [df[col].values for col in df.columns]

    # cleans the previous head insert
    with connection.cursor() as cursor:
        cursor.execute(f"delete from {mydb}.dbo.{table}")
        connection.commit()

    # inserts data, for real
    with connection.cursor() as cursor:
        try:
            cursor.executemanycolumns(sql, valores_df)
            connection.commit()
        except Exception:
            connection.rollback()
            print('something went wrong')

    stop = time.time() - start
    return print(f'finished in {stop} seconds')

使用 turbodbc 写入数据 - 我在 3 秒内有 10000 行(77 列):

turbo_write(mydb, df.sample(10000), table)

Pandas 方法比较 - 我在 198 秒内得到了相同的 10000 行(77 列)……</p>

table = 'pd_testing'

def pandas_comparisson(df, table):
    """Load data using pandas."""
    start = time.time()
    df.to_sql(table, con=pd_connection, index=False)
    stop = time.time() - start
    return print(f'finished in {stop} seconds')

pandas_comparisson(df.sample(10000), table)

环境和条件

Python 3.6.7 :: Anaconda, Inc.
TURBODBC version ‘3.0.0’
sqlAlchemy version ‘1.2.12’
pandas version ‘0.23.4’
Microsoft SQL Server 2014
user with bulk operations privileges

请检查https://erickfis.github.io/loose-code/以获取此代码中的更新!

于 2018-11-07T19:02:45.737 回答
5

SQL Server INSERT 性能:pyodbc 与 turbodbc

to_sql用于将 pandas DataFrame 上传到 SQL Server 时,turbodbc 肯定会比不带fast_executemany. 但是,fast_executemany启用 pyodbc 后,两种方法的性能基本相同。

测试环境:

[venv1_pyodbc]
pyodbc 2.0.25

[venv2_turbodbc]
turbodbc 3.0.0
sqlalchemy-turbodbc 0.1.0

[两者通用]
Windows 上的 Python 3.6.4 64 位
SQLAlchemy 1.3.0b1
pandas 0.23.4
numpy 1.15.4

测试代码:

# for pyodbc
engine = create_engine('mssql+pyodbc://sa:whatever@SQL_panorama', fast_executemany=True)
# for turbodbc
# engine = create_engine('mssql+turbodbc://sa:whatever@SQL_panorama')

# test data
num_rows = 10000
num_cols = 100
df = pd.DataFrame(
    [[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
    columns=[f'col{y:03}' for y in range(num_cols)]
)

t0 = time.time()
df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")

对每个环境进行了十二 (12) 次测试,丢弃了每个环境的最佳和最差时间。结果(以秒为单位):

   rank  pyodbc  turbodbc
   ----  ------  --------
      1    22.8      27.5
      2    23.4      28.1
      3    24.6      28.2
      4    25.2      28.5
      5    25.7      29.3
      6    26.9      29.9
      7    27.0      31.4
      8    30.1      32.1
      9    33.6      32.5
     10    39.8      32.9
   ----  ------  --------
average    27.9      30.0
于 2019-01-01T20:29:02.230 回答
4

只是想添加到@JK 的答案。

如果您使用这种方法:

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

你得到这个错误:

“sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('HY010', '[HY010] [Microsoft][SQL Server Native Client 11.0]函数序列错误 (0) (SQLParamData)') [SQL: 'INSERT INTO .. . (...) VALUES (?, ?)'] [parameters: ((..., ...), (..., ...)] (此错误的背景: http://sqlalche .me/e/dbapi )"

像这样编码你的字符串值:'yourStringValue'.encode('ascii')

这将解决您的问题。

于 2019-02-22T01:36:27.573 回答
0

我只是修改引擎线,这有助于我将插入速度提高 100 倍。

旧代码 -

import json
import maya
import time
import pandas
import pyodbc
import pandas as pd
from sqlalchemy import create_engine

retry_count = 0
retry_flag = True

hostInfoDf = pandas.read_excel('test.xlsx', sheet_name='test')
print("Read Ok")

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

while retry_flag and retry_count < 5:
  try:
    df.to_sql("table_name",con=engine,if_exists="replace",index=False,chunksize=5000,schema="dbo")
    retry_flag = False
  except:
    retry_count = retry_count + 1
    time.sleep(30)

改装发动机线 -

从 -

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")

至 -

engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True)

向我询问任何与 Python 到 SQL 连接相关的查询,我很乐意为您提供帮助。

于 2020-03-30T06:48:45.527 回答