使用 SQLAlchemy 进行异步操作
使用SQLAlchemy进行异步操作需要配合AsyncIO库一起使用,并使用异步的数据库驱动程序如asyncpg。以下是使用SQLAlchemy进行异步查询的示例代码:
import asyncio from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, create_engine, Column, Integer, String, DateTime, func, ForeignKey from sqlalchemy.orm import declarative_base, relationship Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(50)) email = Column(String(120), unique=True) created_at = Column(DateTime, default=func.now()) def __repr__(self): return f"<User(name='{self.name}', email='{self.email}', created_at='{self.created_at}')>" class Post(Base): __tablename__ = 'posts' id = Column(Integer, primary_key=True) title = Column(String(50)) content = Column(String) user_id = Column(Integer, ForeignKey('users.id')) created_at = Column(DateTime, default=func.now()) user = relationship("User", back_populates="posts") def __repr__(self): return f"<Post(title='{self.title}', content='{self.content}', user={self.user}, created_at='{self.created_at}')>" User.posts = relationship("Post", order_by=Post.id, back_populates="user") async def main(): # create async engine using asyncpg driver POSTGRES_DB = "mydatabase" POSTGRES_USER = "myuser" POSTGRES_PASSWORD = "mypassword" POSTGRES_HOST = "localhost" POSTGRES_PORT = "5432" DATABASE_URI = f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}" engine = create_async_engine(DATABASE_URI, future=True, echo=True) # create tables async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) # create async session async with AsyncSession(engine) as session: # create a user new_user = User(name="pidancode.com", email="pidancode@example.com") session.add(new_user) await session.commit() # query all users stmt = select(User) results = await session.execute(stmt) for row in results: print(row) if __name__ == '__main__': asyncio.run(main())
在上述代码中,我们使用了PostgreSQL作为数据库,并使用create_async_engine创建异步引擎。我们还使用AsyncSession来执行异步操作。 在main函数中,我们创建了一个新用户并将其添加到数据库中。然后,我们查询了所有用户,并将结果打印到控制台上。
注意,上面的代码仅供参考。实际情况中,您可能需要先安装所需的数据库和库,并相应地配置数据库连接URI等参数。
相关文章