Pages - Menu

Pages - Menu

Pages

2022年7月16日土曜日

Python、FastAPI + SQLAlchemyで非同期WebAPI Posted by rhoboro on 2021-06-12

https://www.rhoboro.com/2021/06/12/async-fastapi-sqlalchemy.html

少し前にSQLAlchemy 1.4がリリースされました。 このSQLAlchemy 1.4には大きな特徴として次の2点があります。

  • SQLAlchemy ORMでこれまでの記法(1.x Style)に加えて、2.0 Styleと呼ばれる新しい記法に対応
  • CoreとORMの両方でイベントループを使った非同期処理に対応

今後のロードマップでは、SQLAlchemy 2.0では2.0 Styleのみがサポートされます。 また、イベントループを使った非同期処理もこの数年で一気に広まってきました。 SQLAlchemyが対応したこともこの流れをさらに加速させると思います。

そこで最近よく利用しているFastAPIと組み合わせて、SQLAlchemy 2.0 Styleを使った非同期Web APIのサンプルプロジェクトを用意しました。 コードはrhoboro/async-fastapi-sqlalchemyリポジトリに置いています。

これはわたしが既存アプリケーションを実際にマイグレーションしたときの経験をベースに用意しました。 マイグレーションはまず同期処理のまま2.0 Styleに移行し、その後async対応しました。 振り返るとそれぞれ変更範囲が大きく大変でしたので、2ステップに分けて正解だったと思います。

2.0 Styleへの対応

詳細は公式ドキュメントを確認してください。
大きく変わったのはORMでのSELECT文です。 1.x StyleのORMでは次のようにSELECT文を書いていました。

Base = declarative_base()
class Note(Base):
  __tablename__ = "notes"
  id = Column(Integer, priamry_key=True)


results = session.query(Note).filter_by(id=1).all()

また、scoped_session.query_propertyをモデルのクラス属性queryに代入し、次のように使っている人もいたと思います。わたしはこの書き方にORMらしさを感じていて結構好きでした笑

session = scoped_session(...)
Base.query = session.query_property()

results = Note.query.filter_by(id=1).all()

これらと同等の処理を2.0 Styleで書くと次のようになります。 2.0 Styleではこのようにstatementを作成してからそれをsession.execute()に渡して実行します。

statement = session.select(Note).filter_by(id==1)
result = session.execute(statement).all()

実はこの書き方はCoreとほぼ同等の記法になっています。 2.0 StyleではこのようにORMとCoreで乖離していたいくつかの処理が標準化されています。

The biggest changes in SQLAlchemy 2.0 are targeting the residual assumptions left over from this early period in SQLAlchemy’s development as well as the leftover artifacts resulting from the incremental introduction of key API features such as Query and Declarative. It also hopes standardize some newer capabilities that have proven to be very effective. ── Migrating to SQLAlchemy 2.0

これだけ見ると書き方が少し違うだけに見えますが、Note.query...の書き方を多用していた身としては、これまで裏で行われていたセッションオブジェクトの受け渡しを自分で行う必要が出てきたためマイグレーションはそれなり大変でした。

マイグレーション後はセッションオブジェクトが必要なシーンで都度with文を使っています。 サンプルプロジェクトでは各use_caseモジュールでの処理の開始時で統一しています。(このコードはasync対応も入っています。)

class ReadNote:
    ...
    async def execute(self, note_id: int) -> NoteSchema:
        async with self.async_session() as session:
            note = await Note.read_by_id(session, note_id)
            if not note:
                raise HTTPException(status_code=404)
            return NoteSchema.from_orm(note)

with session() と with session.begin() の使い分け

Sessionを取得するのにwith session()の形式を使っている箇所とwith session.begin()の形式を使っているところがあります。 後者はトランザクションが開始されていて、withブロックを抜ける際に自動でコミット(またはロールバック)が行われます。

    async def execute(self, note_id: int) -> NoteSchema:
        async with self.async_session() as session:
            note = await Note.read_by_id(session, note_id)
            if not note:
                raise HTTPException(status_code=404)
            return NoteSchema.from_orm(note)
    async def execute(self, notebook_id: int, title: str, content: str) -> NoteSchema:
        async with self.async_session.begin() as session:
            notebook = await Notebook.read_by_id(session, notebook_id)
            if not notebook:
                raise HTTPException(status_code=404)
            note = await Note.create(session, notebook.id, title, content)
            return NoteSchema.from_orm(note)

async対応

async対応に関してはいくつかポイントを紹介します。

FastAPIでの対応

async defでリクエストハンドラを定義するだけでasync対応できます。 めちゃくちゃ簡単ですね。

@router.post("/", response_model=CreateNoteResponse)
async def create(
    request: Request,
    data: CreateNoteRequest,
    use_case: CreateNote = Depends(CreateNote),
) -> NoteSchema:
    return await use_case.execute(data.notebook_id, data.title, data.content)

FastAPIはドキュメントも充実していて、スキーマ定義をしていればOpenAPIドキュメントも自動で生成されたり、設定値管理も簡単なのでおすすめです。 FastAPIのベースになっているStarlettePydanticも個人的に好みで不満点も少ないため、最近はWebAPIを作るときはFastAPIを第一候補にしています。

can't call await_() here エラー

SQLAlchemyのasync対応中に一番見たエラーがこのエラーです笑
特にNoteSchema.from_orm(note)NotebookSchema.from_orm(notebook)などモデルのインスタンスからレスポンスデータを組み立てる際に遭遇していました。

sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_() here. Was IO attempted in an unexpected place? (Background on this error at: http://sqlalche.me/e/14/xd2s)
スタックトレース全体はこちら

このエラーはrelationshipを使って定義した属性にアクセスした際に良く遭遇します。 relationshipとはSQLAlchemy ORMのモデルでRDBのリレーションを表現するもので、次のように使います。

class Note(Base):
    __tablename__ = "notes"
    id: int = Column(
        "id", Integer(), autoincrement=True, nullable=False, unique=True, primary_key=True
    )
    ...
    notebook_id: int = Column("notebook_id", Integer(), ForeignKey("notebooks.id"), nullable=False)
    notebook: Notebook = relationship("Notebook", back_populates="notes")

class Notebook(Base):
    __tablename__ = "notebooks"
    id: int = Column(
        "id", Integer(), autoincrement=True, nullable=False, unique=True, primary_key=True
    )
    ...
    notes: list[Note] = relationship(
        "Note",
        back_populates="notebook",
        order_by="Note.id",
        cascade="save-update, merge, refresh-expire, expunge, delete, delete-orphan",
    )

このrelationshipを使った属性はデフォルトがlazy loadingとなっていて、その属性が初めてアクセスされたタイミングでクエリが実行されます。 これはasyncioを使っていない世界では多くの場合に効率的に働きますが、asyncな世界では機能しません。
したがって、async対応を行う場合は必要になるデータはeager loadingと呼ばれる方法で最初から全て取得しておく必要があります。

Eager loadingはクエリ実行時のstatementのoption()、もしくは属性定義時にrelationshipの引数lazyで指定できます。 詳細はRelationship Loading Techniquesにありますが、基本は次のパターンとなります。

  1. リレーション先が1でnullable=Falseの場合はJoined Eager Loading + innerjoin=True
  2. リレーション先が1でnullable=Trueの場合はJoined Eager Loading
  3. リレーション先が多(Many)の場合はSelect IN loading

Note.notebookはパターン1に該当します。

class Note(Base):
    ...
    @classmethod
    async def read_by_id(cls, session: AsyncSession, note_id: int) -> Optional[Note]:
        # joinloadで指定
        stmt = select(cls).where(cls.id == note_id).options(joinedload(cls.notebook))
        result = (await session.execute(stmt.order_by(cls.id))).first()
        if result:
            return result.Note
        else:
            return None

Notebook.notesはパターン3に該当します。

class Notebook(Base):
    ...
    @classmethod
    async def read_by_id(
        cls, session: AsyncSession, notebook_id: int, include_notes: bool = False
    ) -> Optional[Notebook]:
        stmt = select(cls).where(cls.id == notebook_id)
        if include_notes:
            # コレクションはselectinloadで指定
            stmt = stmt.options(selectinload(cls.notes))
        result = (await session.execute(stmt.order_by(cls.id))).first()
        if result:
            return result.Notebook
        else:
            return None

わたしは途中までrelationshipの引数lazyで指定して進めていたのですが、途中でクエリの実行回数が気になり、selectinloadの指定は必要な時のみとしました。

非同期用のイベントが用意されていない

マイグレーションしていたアプリケーションではPostgreSQLのMATERIALIZED VIEWを使っていて、リフレッシュ実行のトリガーをSQLAlchemy ORMのSession Eventsを使って実装していました。 Session Events機能自体が無くなったわけではないのですが、これらの非同期版は現時点では用意されていません。

処理自体はasync_engine.sync_engineを使うと実現できそうですが、asyncな世界でsync_engineを使うことに気がのらず、Readがメインのアプリケーションなのでこんなデコレータで凌いでいます。

@asynccontextmanager
async def mv_refresh_session(db: sessionmaker) -> AsyncIterator[AsyncSession]:
    """ブロックを抜ける際にマテリアライズドビューをリフレッシュします

    :param db: sessionmaker
    """
    try:
        async with db.begin() as session:
            yield session
            await session.flush()
            await refresh(session)
    finally:
        pass

テストケースのasync対応

テストは大事です。
テストがなければ今回のような大規模なマイグレーションは難しかったと思います。

イベントループを使った非同期アプリケーションでは、テストもイベントループの中で動かさないといけません。 pytestを使っている場合はpytest-asyncioを使うと@pytest.mark.asyncioをつけるだけで簡単にイベントループ内でそのテストケースを実行できます。 fixtureに関してもasync defでコルーチンとして定義すれば普通に利用できます。 ただし指定できるscopeは基本的にfunctionだけなので注意が必要です。(一応scopeを変える方法もあるようです

また、イベントループ内ではHTTPリクエストなどもすべてawaitをつけて非同期で実行する必要があります。 async対応したHTTPクライアントであれば何でも良いですが、FastAPIのドキュメントでも使われているhttpxを使っています。 元々はrequestsを使ってテストケースを作っていましたが、APIはほとんど同じためほぼクライアントの差し替えだけで済みました。

import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_notebooks_create(ac: AsyncClient, session: AsyncSession) -> None:
    """Create a notebook"""
    # execute
    response = await ac.post("/api/notebooks", json={"title": "Test Notebook", "notes": []})

    print(response.content)
    assert 200 == response.status_code
    expected = {"id": ID_STRING, "title": "Test Notebook", "notes": []}
    assert expected == response.json()

テスト用のDB作成

DBを使う処理のテストではモックを使わずに実際にDBの読み書きをしたいものです。

FastAPIとSQLAlchemyでのDBを使ったテストに関しては、以前FastAPI+SQLAlchemyをpytestでテストという記事を書きました。 async対応後も基本的な流れは変わりませんが、テストケースに渡すセッションオブジェクトがAsyncSessionになるのでその作成方法が変わります。 これは少しハマったのですが、このIssueのコメントを参考に作成しました。

@pytest.fixture
async def session():
    # https://github.com/sqlalchemy/sqlalchemy/issues/5811#issuecomment-756269881
    async_engine = create_async_engine(f"{settings.DB_URI}/test")
    async with async_engine.connect() as conn:

        await conn.begin()
        await conn.begin_nested()
        AsyncSessionLocal = sessionmaker(
            autocommit=False,
            autoflush=False,
            bind=conn,
            future=True,
            class_=AsyncSession,
        )

        async_session = AsyncSessionLocal()

        @event.listens_for(async_session.sync_session, "after_transaction_end")
        def end_savepoint(session, transaction):
            if conn.closed:
                return
            if not conn.in_nested_transaction:
                conn.sync_connection.begin_nested()

        def test_get_session() -> Generator:
            try:
                yield AsyncSessionLocal
            except SQLAlchemyError as e:
                pass

        app.dependency_overrides[get_session] = test_get_session

        yield async_session
        await async_session.close()
        await conn.rollback()

なお、テスト用のDBスキーマ(ここでは/test)の作成はsession scopeで行う必要がありコルーチンでは動かせないため、同期処理であるcreate_engineを利用しています。

AsyncEngineでも同期処理を使いたいとき

SQLAlchemy ORMではBase.metadata.create_all(session)を実行するとORMで定義したモデルに対応するテーブルを作成してくれます。 このメソッドは同期処理のためそのままではAsyncEngine(AsyncSession)で扱えませんが、このようなときはrun_syncを使うと解決します。 run_syncの引数にConnection(Session)オブジェクトを受け取るCallableです。

(venv) $ APP_CONFIG_FILE=local python3 -m asyncio
>>> import asyncio
>>> from app.db import async_engine
>>> from app.models.base import Base
>>> async with async_engine.begin() as conn:
...   await conn.run_sync(Base.metadata.drop_all)
...   await conn.run_sync(Base.metadata.create_all)
>>>

余談ですが、このpython3コマンドの-m asyncioオプションはめっちゃ便利なのでもっと広まって欲しいです笑

0 件のコメント:

コメントを投稿