通过递归CTE获取具有SQLAlChemy的层次结构的顶级父ID
问题描述
我有这样的案例:
| Note table |
|---------------------|------------------|
| id | parent_id |
|---------------------|------------------|
| 1 | Null |
|---------------------|------------------|
| 2 | 1
|---------------------|------------------|
| 3 | 2
|---------------------|------------------|
| 4 | 3
|---------------------|------------------|
我想要实现的是获取顶级父ID。 在本例中,如果我传递ID号4,我将获得ID 1,因为ID 1是顶级父级。 当它达到parent_id上的空值时,表示该id是顶级父级。
我试过了,但返回的是我传递给函数的ID。
def get_top_level_Note(self, id: int):
hierarchy = self.db.session.query(Note).filter(Note.id == id).cte(name="hierarchy", recursive=True)
parent = aliased(hierarchy, name="p")
children = aliased(Note, name="c")
hierarchy = hierarchy.union_all(self.db.session.query(children).filter(children.parent_id == parent.c.id))
result = self.db.session.query(Note).select_entity_from(hierarchy).all()
解决方案
具有名为&Quot;Note&Quot;的现有表
id parent_id
----------- -----------
11 NULL
22 11
33 22
44 33
55 NULL
66 55
在PostgreSQL中稍作改动就会发现
WITH RECURSIVE parent (i, id, parent_id)
AS (
SELECT 0, id, parent_id FROM note WHERE id=44
UNION ALL
SELECT i + 1, n.id, n.parent_id
FROM note n INNER JOIN parent p ON p.parent_id = n.id
WHERE p.parent_id IS NOT NULL
)
SELECT * FROM parent ORDER BY i;
返回
i id parent_id
----------- ----------- -----------
0 44 33
1 33 22
2 22 11
3 11 NULL
,因此我们可以通过将最后一行更改为
来获得顶级父级WITH RECURSIVE parent (i, id, parent_id)
AS (
SELECT 0, id, parent_id FROM note WHERE id=44
UNION ALL
SELECT i + 1, n.id, n.parent_id
FROM note n INNER JOIN parent p ON p.parent_id = n.id
WHERE p.parent_id IS NOT NULL
)
SELECT id FROM parent ORDER BY i DESC LIMIT 1 ;
返回
id
-----------
11
因此,要将其转换为SQLAlChemy(1.4):
from sqlalchemy import (
create_engine,
Column,
Integer,
select,
literal_column,
)
from sqlalchemy.orm import declarative_base
connection_uri = "postgresql://scott:tiger@192.168.0.199/test"
engine = create_engine(connection_uri, echo=False)
Base = declarative_base()
class Note(Base):
__tablename__ = "note"
id = Column(Integer, primary_key=True)
parent_id = Column(Integer)
def get_top_level_note_id(start_id):
note_tbl = Note.__table__
parent_cte = (
select(
literal_column("0").label("i"), note_tbl.c.id, note_tbl.c.parent_id
)
.where(note_tbl.c.id == start_id)
.cte(name="parent_cte", recursive=True)
)
parent_cte_alias = parent_cte.alias("parent_cte_alias")
note_tbl_alias = note_tbl.alias()
parent_cte = parent_cte.union_all(
select(
literal_column("parent_cte_alias.i + 1"),
note_tbl_alias.c.id,
note_tbl_alias.c.parent_id,
)
.where(note_tbl_alias.c.id == parent_cte_alias.c.parent_id)
.where(parent_cte_alias.c.parent_id.is_not(None))
)
stmt = select(parent_cte.c.id).order_by(parent_cte.c.i.desc()).limit(1)
with engine.begin() as conn:
result = conn.execute(stmt).scalar()
return result
if __name__ == "__main__":
test_id = 44
print(
f"top level id for note {test_id} is {get_top_level_note_id(test_id)}"
)
# top level id for note 44 is 11
test_id = 66
print(
f"top level id for note {test_id} is {get_top_level_note_id(test_id)}"
)
# top level id for note 66 is 55
相关文章