Apache Airflow - 使用 pymssql + SQLAlchemy 连接到 MS SQL Server 的问题

2021-10-26 00:00:00 python airflow sql-server pymssql

我在使用 pymssql 连接到 Apache Airflow 1.10.1 中的 Azure MS SQL Server 2014 数据库时遇到问题.我想使用 Airflow 提供的 MsSqlHook 类,为了方便在 Airflow UI 中创建我的连接,然后使用 SqlAlchemy 为我的连接创建上下文管理器:

@contextmanagerdef mssql_session(dt_conn_id):sqla_engine = MsSqlHook(mssql_conn_id=dt_conn_id).get_sqlalchemy_engine()session = sessionmaker(bind=sqla_engine)()尝试:让步除了:会话回滚()增加别的:session.commit()最后:session.close()

但是当我这样做时,当我运行请求时出现此错误:

<块引用>

sqlalchemy.exc.InterfaceError: (pyodbc.InterfaceError) ('IM002','[IM002] [unixODBC][Driver Manager]未找到数据源名称,并且没有默认驱动程序指定 (0) (SQLDriverConnect)') (此背景错误在:

为了解决这个问题,我在从 MsSqlHook 继承的新类中重载了 DbApiHook 中的 get_uri 方法,其中我建立了自己的连接字符串,但它根本不干净...

感谢您的帮助

解决方案

你说得对.没有简单、直接的方法可以让 Airflow 做你想做的事.我个人会在你的上下文管理器中构建 sqlalchemy 引擎,比如 create_engine(hook.get_uri().replace("://", "+pymssql://")) -- 然后我会把代码扔到可重用的地方.

I am facing a problem to connect to an Azure MS SQL Server 2014 database in Apache Airflow 1.10.1 using pymssql. I want to use the MsSqlHook class provided by Airflow, for the convenience to create my connection in the Airflow UI, and then create a context manager for my connection using SqlAlchemy:

@contextmanager
def mssql_session(dt_conn_id):
    sqla_engine = MsSqlHook(mssql_conn_id=dt_conn_id).get_sqlalchemy_engine()
    session = sessionmaker(bind=sqla_engine)()
    try:
        yield session
    except:
        session.rollback()
        raise
    else:
        session.commit()
    finally:
        session.close()

But when I do that, I have this error when I run a request :

sqlalchemy.exc.InterfaceError: (pyodbc.InterfaceError) ('IM002', '[IM002] [unixODBC][Driver Manager]Data source name not found, and no default driver specified (0) (SQLDriverConnect)') (Background on this error at: http://sqlalche.me/e/rvf5)

It seems come from pyodbc whereas I want to use pymssql (and in MsSqlHook, the method get_conn uses pymssql !)

I searched in the source code of Airflow the cause. I noticed that the method get_uri from the class DbApiHook (from which is inherited MsSqlHook) builds the connection string passed to SqlAlchemy like this:

'{conn.conn_type}://{login}{host}/{conn.schema}'

But conn.conn_type is simply equal to 'mssql' whereas we need to specify the DBAPI as described here: https://docs.sqlalchemy.org/en/latest/core/engines.html#microsoft-sql-server (for example : 'mssql+pymssql://scott:tiger@hostname:port/dbname')

So, by default, I think it uses pyodbc. But how can I set properly the conn_type of the connection to 'mssql+pymssql' instead of 'mssql' ? In the Airflow IU, you can simply select SQL server in a dropdown list, but not set as you want :

To work around the issue, I overload the get_uri method from DbApiHook in a new class I created inherited from MsSqlHook, and in which I build my own connection string, but it's not clean at all...

Thanks for any help

解决方案

You're right. There's no easy, straightforward way to get Airflow to do what you want. Personally I would build the sqlalchemy engine inside of your context manager, something like create_engine(hook.get_uri().replace("://", "+pymssql://")) -- then I would toss the code somewhere reusable.

相关文章