https://www.rhoboro.com/2021/06/12/async-fastapi-sqlalchemy.html
Not long ago SQLAlchemy 1.4 was released . This SQLAlchemy 1.4 has the following two major features.
- SQLAlchemy ORM supports a new notation called 2.0 Style in addition to the old notation (1.x Style)
- Supports asynchronous processing using event loops in both Core and ORM
In future roadmaps, SQLAlchemy 2.0 will only support 2.0 Style. Asynchronous processing using event loops has also become widespread in the last few years. I think that SQLAlchemy's support will further accelerate this trend.
Therefore, I prepared a sample project of asynchronous Web API using SQLAlchemy 2.0 Style in combination with FastAPI which is often used recently. The code is in the rhoboro / async-fastapi-sqlalchemy repository.
This is based on my experience when actually migrating an existing application. The migration was first migrated to 2.0 Style with synchronous processing, and then async was supported. Looking back, the range of changes was large and difficult, so I think the answer was correct in two steps.
Support for 2.0 Style
Please check the official documentation for details .
The big change is the SELECT statement in ORM. In the 1.x Style ORM, I wrote the SELECT statement as follows.
Base = declarative_base ()
class Note ( Base ):
__tablename__ = "notes"
id = Column ( Integer , priamry_key = True )
results = session .query ( Note ) .filter_by ( id = 1 ) .all ( ) _ _
Also, I think some people have assigned to scoped_session.query_property
the class attribute query
of the model and used it as follows. I felt ORM-like in this way of writing and I liked it quite a lot lol
session = scoped_session ( ... )
Base .query = session .query_property ( ) _
results = Note .query .filter_by ( id = 1 ) .all ( ) _ _
If you write the equivalent process in 2.0 Style, it will be as follows. In 2.0 Style statement
you create something like this and then session.execute()
pass it to and run it.
statement = session .select ( Note ) .filter_by ( id == 1 ) result = session .execute ( statement ) .all ( ) _ _ _
Actually, this writing style is almost the same as Core. In 2.0 Style, some processing that was different between ORM and Core is standardized.
The biggest changes in SQLAlchemy 2.0 are targeting the residual assumptions left over from this early period in SQLAlchemy's development as well as the leftover artifacts resulting from the incremental introduction of key API features such as Query and Declarative. It also hopes standardize some newer capabilities that have proven to be very effective. ── Migrating to SQLAlchemy 2.0
Looking at this, it seems that the writing style is a little different, but Note.query...
as a person who used a lot of writing style, it became necessary to hand over the session object that was done behind the scenes by myself, so migration is rather difficult. did.
After migration, the with statement is used every time a scene that requires a session object. In the sample project, it is unified at the start of processing in each use_case module . (This code also includes async support.)
class ReadNote :
...
async def execute ( self , note_id : int ) -> NoteSchema :
async with self . async_session () as session :
note = await Note . read_by_id ( session , note_id )
if not note :
raise HTTPException ( status_code = ) 404 )
return NoteSchema. from_orm ( note )
Use with session () and with session.begin () properly
Some use the with session () format and some use the with session.begin () format to get the Session . In the latter case, a transaction has been started and a commit (or rollback) is automatically performed when exiting the with block.
async def execute ( self , note_id : int ) -> NoteSchema :
async with self . async_session () as session :
note = await Note . read_by_id ( session , note_id )
if not note :
raise HTTPException ( status_code = 404 )
return NoteSchema . from_orm ( note)
async def execute ( self , notebook_id : int , title : str , content : str ) -> NoteSchema :
async with self . async_session .begin ( ) as session : notebook = await Notebook . read_by_id ( session , notebook_id ) if not notebook : raise HTTPException (
status_code = 404 )
note = await Note .create ( session , notebook .id , title , content ) return NoteSchema . from_orm ( note ) _ _
async support
Here are some points about async support.
Support with FastAPI
You can support async just by defining a request handler with async def . It's really easy.
@router .post ( " / " , response_model = CreateNoteResponse ) async def create ( request : Request , data : CreateNoteRequest , use_case : CreateNote = Depends ( CreateNote ), ) - > NoteSchema : return await use_case .exe ( data .notebook_id , data ) . title , data
. content )
FastAPI has abundant documents, and if you have a schema definition, OpenAPI documents are automatically generated and setting value management is easy, so it is recommended. I personally like Starlette and Pydantic , which are the bases of FastAPI, and have few complaints, so recently when creating WebAPI, FastAPI is the first choice.
can't call await_ () here error
The error I saw most while supporting SQLAlchemy's async is this error lol
Especially when I encountered it when assembling response data from an instance of the model NoteSchema.from_orm(note)
.NotebookSchema.from_orm(notebook)
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_ () here. Was IO attempted in an unexpected place? (Background on this error at: http://sqlalche.me/e/14/xd2s)
Click here for the entire stack trace
This error is relationship
often encountered when accessing attributes defined using. relationship
Is a SQLAlchemy ORM model that expresses RDB relations and is used as follows.
class Note ( Base ):
__tablename__ = "notes"
id : int = Column (
"id" , Integer (), autoincrement = True , nullable = False , unique = True , primary_key = True
)
...
notebook_id : int = Column ( "notebook_id" , Integer (), ForeignKey ( "notebooks.id" ), nullable = False )
notebook : Notebook = relationship ( "Notebook" , back_populates = "notes" )
class Notebook ( Base ):
__tablename__ = "notebooks"
id : int = Column (
"id" , Integer (), autoincrement = True , nullable = False , unique = True , primary_key = True
)
...
notes : list [ Note ] = relationship (
"Note" ,
back_populates = "notebook" ,
order_by = "Note.id" ,
cascade = "save-update, merge, refresh-expire, expunge, delete, delete-orphan" ,
)
The relationship
default attribute using this is lazy loading , and the query is executed when the attribute is accessed for the first time. This often works well in a world without asyncio, but it doesn't work in an async world.
Therefore, when supporting async, it is necessary to acquire all the necessary data from the beginning by a method called eager loading .
Eager loading can be specified by the statement at the time of query execution or by the argument at the time of option()
attribute definition . Details can be found in Relationship Loading Techniques , but the basic pattern is as follows.relationship
lazy
- Joined Eager Loading + if the relation destination is 1 and nullable = False
innerjoin=True
- Joined Eager Loading if the relation destination is 1 and nullable = True
- Select IN loading when there are many relations (Many)
Note.notebook corresponds to pattern 1.
class Note ( Base ):
...
@classmethod
async def read_by_id ( cls , session : AsyncSession , note_id : int ) -> Optional [ Note ]:
# specified by
joinload stmt = select ( cls ) .where ( cls .id == note_id ) .options ( joinloaded ( cls .notebook ) ) _
result = ( await session .execute ( stmt .order_by ( cls .id ) ) ) . first ( ) if result : return result . Note else : return None
Notebook.notes corresponds to pattern 3.
class Notebook ( Base ):
...
@classmethod
async def read_by_id (
cls , session : AsyncSession , notebook_id : int , include_notes : bool = False
) -> Optional [ Notebook ]:
stmt = select ( cls ) . where ( cls .id ) == notebook_id ) if include_notes :
#Collection specified by selectinload stmt = stmt .options ( selectinload ( cls .notes ) ) result = ( await session .exe ( stmt .order_by ( cls .id ) ) ) . first ( ) if result : return result .Notebook else : return None
I was proceeding by specifying it with relationship
the argument lazy
halfway, but I was worried about the number of times the query was executed in the middle, so I specified select inload only when necessary .
No event for async
The application I was migrating used PostgreSQL's MATERIALIZED VIEW and implemented a refresh execution trigger using SQLAlchemy ORM's Session Events . The Session Events feature itself isn't gone, but these asynchronous versions aren't available at this time.
It seems that the processing itself can be realized by using async_engine.sync_engine, but I do not mind using the async world, sync_engine
and since Read is the main application, I surpass it with such a decorator.
@asynccontextmanager
async def mv_refresh_session ( db : sessionmaker ) -> AsyncIterator [ AsyncSession ]:
"" "Refresh materialized view as it exits the block
: param db: sessionmaker
" " "
try :
async with db .begin () as session : yield session await session .flush ( ) await refresh ( session ) finally : pass
Test case async support
Testing is important.
Without testing, I think it would have been difficult to do a large-scale migration like this one.
In an asynchronous application that uses an event loop, the test must also run inside the event loop. If you're using pytest , you can easily run that test case inside an event loop by simply adding pytest-asyncio . @pytest.mark.asyncio
If you define the fixture as async def
a coroutine, you can use it normally. However, note that the scope that can be specified is basically only function. ( There seems to be a way to change the scope )
Also, in the event loop, it is await
necessary to attach all HTTP requests etc. and execute them asynchronously. Any HTTP client that supports async is fine, but I am using httpx, which is also used in the FastAPI documentation . Originally I used requests to create test cases, but since the API is almost the same, I only had to replace the client.
import pytest
from httpx import AsyncClient
@pytest .mark . asyncio async def test_notebooks_create ( ac : AsyncClient , session : AsyncSession ) - > None : "" "Create a notebook" "" #execute response = await ac .post ( " / api / notebooks" , json = { "title" : "Test Notebook" , "notes" : []})
print ( response .content ) assert 200 == response .status_code expected = { " id" : ID_STRING , "title" : "Test Notebook" , "notes" : [ ]} assert expected == response .json ( )
DB creation for testing
In the test of processing using DB, I want to actually read and write DB without using mock.
Regarding testing with DB with FastAPI and SQLAlchemy, I wrote an article about testing FastAPI + SQLAlchemy with pytest before . The basic flow does not change even after async support, but since the session object passed to the test case is AsyncSession, the creation method will change. I was a little addicted to this, but I created it with reference to the comments in this issue.
@pytest . fixture
async def session ():
# https://github.com/sqlalchemy/sqlalchemy/issues/5811#issuecomment-756269881
async_engine = create_async_engine ( f " { settings.DB_URI} / test" )
async with async_engine .connect () as conn :
await conn .begin ( ) await conn .begin_nested ( ) AsyncSessionLocal = sessionmaker ( autocommit = False , autoflush = False , bind = conn , future = True , class_ = AsyncSession , )
async_session = AsyncSessionLocal ()
@event .listens_for ( async_session .sync_session , " after_transaction_end " ) def end_savepoint ( session , transaction ) : if conn .closed : return if not conn . in_nested_transaction : conn .sync_connection .begin_nested ( ) _
def test_get_session () -> Generator :
try :
yield AsyncSessionLocal
except SQLAlchemyError as e :
pass
app .dependency_overrides [ get_session ] = test_get_session _
yield async_session
await async_session .close ( ) await conn .rollback ( )
Note that the DB schema for testing (here / test) must be created in session scope and cannot be run in coroutines, so synchronous processing is create_engine
used.
When you want to use synchronization processing even with AsyncEngine
In SQLAlchemy ORM, Base.metadata.create_all(session)
when you execute it, the table corresponding to the model defined in ORM will be created. Since this method is a synchronous process, it cannot be handled by AsyncEngine (AsyncSession) as it is, but in such a case , it run_sync
can be solved by using . run_sync
A Callable that receives a Connection (Session) object as an argument to.
( venv ) $ APP_CONFIG_FILE = local python3 --m asyncio >>> import asyncio >>> from app.db import async_engine >>> from app.models.base import Base >>> async with async_engine .begin ( ) as conn : . _
.. await conn .run_sync ( Base .metadata .drop_all ) ... await conn _ _ _
.run_sync ( Base .metadata .create_all ) >>> _ _ _
As an aside, the -m asyncio
options of this python3 command are so convenient that I want them to spread more widely.
0 コメント:
コメントを投稿