如何将SqlAlChemy联接查询序列化为JSON?

2022-03-10 00:00:00 python sqlalchemy json flask-sqlalchemy

问题描述

是否有人可以帮助我设计/查找一个JSON编码器,该编码器在我的sqlalChemy查询中使用联接时可以工作,并且可以序列化我指定的结果。

查询如下:data = db.session.query(Post).join(Users).filter(Post.area == id, Users.id == Post.user_id).all()

然后我像这样做一个json转储:json_object = json.dumps(data, cls=AlchemyEncoder)

传统上我使用自己的JSON编码器,如下所示:

class AlchemyEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o.__class__, DeclarativeMeta):
            data = {}
            fields = o.__json__() if hasattr(o, '__json__') else dir(o)
            for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
                value = o.__getattribute__(field)
                try:
                    json.dumps(value)
                    data[field] = value
                except TypeError:
                    data[field] = None
            return data
        return json.JSONEncoder.default(self, o)

它引用的函数将如下所示:

 def __json__(self):
        return ['id', 'user_id', 'message', 'media', 'time_uploaded', 'lifetime', 'votes', 'area']
但是,我在查询中联接的表中有一些列,我希望将它们包含在我创建的JSON对象中。我的JSON编码器显然只能使用我查询的第一个表中找到的列。

编辑:

下面是我的两个SQLAlChemy类的外观:

class Post(Base):
    __tablename__ = "post"

    id = db.Column('id', UUIDType(binary=False), primary_key=True)
    user_id = db.Column('user_id', UUIDType(binary=False), db.ForeignKey('users.id'))
    votes = db.Column('votes', db.Integer)
    message = db.Column('message', db.Unicode)
    media = db.Column('media', db.Unicode)
    lifetime = db.Column('lifetime', db.Integer)
    time_uploaded = db.Column('time_uploaded', db.DateTime, server_default=func.now())
    area = db.Column('area', db.Unicode)

    user = db.relationship('Users', foreign_keys=user_id)

    def __json__(self):
        return ['id', 'username', 'name', 'profile_picture', 'message', 'media', 'time_uploaded', 'lifetime', 'votes', 'area']


class Users(Base):
    __tablename__ = "users"

    id = db.Column('id', UUIDType(binary=False), primary_key=True)
    username = db.Column('username', db.Unicode, unique=True)
    email = db.Column('email', db.Unicode, unique=True)
    password = db.Column('password', db.Unicode)
    name = db.Column('name', db.Unicode)
    bio = db.Column('bio', db.Unicode)
    profile_picture = db.Column('profile_picture', db.Unicode) 

解决方案

sqlalChemy将您传递给query方法的任何内容都放在查询的select部分中,因此如果您希望获得2个类,您可以执行以下操作:

db.session.query(Post, Users).join(Users).filter(Post.area == id, Users.id == Post.user_id).all()

这将返回Post和Users类的元组数组(因此您必须更改编码器才能接收元组)

编辑:

以下是如何更改编码器以接受元组的示例:

class AlchemyEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, tuple):
            data = {}
            for obj in o:
                data.update(self.parse_sqlalchemy_object(obj))
            return data
        if isinstance(o.__class__, DeclarativeMeta):
            return self.parse_sqlalchemy_object(o)
        return json.JSONEncoder.default(self, o)

    def parse_sqlalchemy_object(self, o):
        data = {}
        fields = o.__json__() if hasattr(o, '__json__') else dir(o)
        for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
            value = o.__getattribute__(field)
            try:
                json.dumps(value)
                data[field] = value
            except TypeError:
                data[field] = None
        return data

相关文章