使用 SQLAlchemy 进行异步操作

2023-04-05 00:00:00 sqlalchemy 操作

使用SQLAlchemy进行异步操作需要配合AsyncIO库一起使用,并使用异步的数据库驱动程序如asyncpg。以下是使用SQLAlchemy进行异步查询的示例代码:

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, create_engine, Column, Integer, String, DateTime, func, ForeignKey
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    email = Column(String(120), unique=True)
    created_at = Column(DateTime, default=func.now())

    def __repr__(self):
        return f"<User(name='{self.name}', email='{self.email}', created_at='{self.created_at}')>"

class Post(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    title = Column(String(50))
    content = Column(String)
    user_id = Column(Integer, ForeignKey('users.id'))
    created_at = Column(DateTime, default=func.now())

    user = relationship("User", back_populates="posts")

    def __repr__(self):
        return f"<Post(title='{self.title}', content='{self.content}', user={self.user}, created_at='{self.created_at}')>"

User.posts = relationship("Post", order_by=Post.id, back_populates="user")

async def main():
    # create async engine using asyncpg driver
    POSTGRES_DB = "mydatabase"
    POSTGRES_USER = "myuser"
    POSTGRES_PASSWORD = "mypassword"
    POSTGRES_HOST = "localhost"
    POSTGRES_PORT = "5432"

    DATABASE_URI = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"

    engine = create_async_engine(DATABASE_URI, future=True, echo=True)

    # create tables
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    # create async session
    async with AsyncSession(engine) as session:
        # create a user
        new_user = User(name="pidancode.com", email="pidancode@example.com")
        session.add(new_user)
        await session.commit()

        # query all users
        stmt = select(User)
        results = await session.execute(stmt)

        for row in results:
            print(row)

if __name__ == '__main__':
    asyncio.run(main())

在上述代码中,我们使用了PostgreSQL作为数据库,并使用create_async_engine创建异步引擎。我们还使用AsyncSession来执行异步操作。 在main函数中,我们创建了一个新用户并将其添加到数据库中。然后,我们查询了所有用户,并将结果打印到控制台上。

注意,上面的代码仅供参考。实际情况中,您可能需要先安装所需的数据库和库,并相应地配置数据库连接URI等参数。

相关文章