2022年7月16日土曜日

Asynchronous WebAPI with Python, FastAPI + SQLAlchemy Posted by rhoboro on 2021-06-12

https://www.rhoboro.com/2021/06/12/async-fastapi-sqlalchemy.html

Not long ago SQLAlchemy 1.4 was released . This SQLAlchemy 1.4 has the following two major features.

  • SQLAlchemy ORM supports a new notation called 2.0 Style in addition to the old notation (1.x Style)
  • Supports asynchronous processing using event loops in both Core and ORM

In future roadmaps, SQLAlchemy 2.0 will only support 2.0 Style. Asynchronous processing using event loops has also become widespread in the last few years. I think that SQLAlchemy's support will further accelerate this trend.

Therefore, I prepared a sample project of asynchronous Web API using SQLAlchemy 2.0 Style in combination with FastAPI which is often used recently. The code is in the rhoboro / async-fastapi-sqlalchemy repository.

This is based on my experience when actually migrating an existing application. The migration was first migrated to 2.0 Style with synchronous processing, and then async was supported. Looking back, the range of changes was large and difficult, so I think the answer was correct in two steps.

Support for 2.0 Style

Please check the official documentation for details .
The big change is the SELECT statement in ORM. In the 1.x Style ORM, I wrote the SELECT statement as follows.

Base  =  declarative_base () 
class  Note ( Base ): 
  __tablename__  =  "notes" 
  id  =  Column ( Integer ,  priamry_key = True )


results  =  session .query ( Note ) .filter_by ( id = 1 ) .all ( ) _ _

Also, I think some people have assigned to scoped_session.query_propertythe class attribute queryof the model and used it as follows. I felt ORM-like in this way of writing and I liked it quite a lot lol

session  =  scoped_session ( ... ) 
Base .query = session .query_property ( ) _  

results  =  Note .query .filter_by ( id = 1 ) .all ( ) _ _

If you write the equivalent process in 2.0 Style, it will be as follows. In 2.0 Style statementyou create something like this and then session.execute()pass it to and run it.

statement  =  session .select ( Note ) .filter_by ( id == 1 ) result = session .execute ( statement ) .all ( ) _ _ _
  

Actually, this writing style is almost the same as Core. In 2.0 Style, some processing that was different between ORM and Core is standardized.

The biggest changes in SQLAlchemy 2.0 are targeting the residual assumptions left over from this early period in SQLAlchemy's development as well as the leftover artifacts resulting from the incremental introduction of key API features such as Query and Declarative. It also hopes standardize some newer capabilities that have proven to be very effective. ──  Migrating to SQLAlchemy 2.0

Looking at this, it seems that the writing style is a little different, but Note.query...as a person who used a lot of writing style, it became necessary to hand over the session object that was done behind the scenes by myself, so migration is rather difficult. did.

After migration, the with statement is used every time a scene that requires a session object. In the sample project, it is unified at the start of processing in each use_case module . (This code also includes async support.)

class  ReadNote : 
    ... 
    async  def  execute ( self ,  note_id :  int )  ->  NoteSchema : 
        async  with  self . async_session ()  as  session : 
            note  =  await  Note . read_by_id ( session ,  note_id ) 
            if  not  note : 
                raise  HTTPException ( status_code = ) 404 ) 
            return  NoteSchema. from_orm ( note )

Use with session () and with session.begin () properly

Some use the with session () format and some use the with session.begin () format to get the Session In the latter case, a transaction has been started and a commit (or rollback) is automatically performed when exiting the with block.

    async  def  execute ( self ,  note_id :  int )  ->  NoteSchema : 
        async  with  self . async_session ()  as  session : 
            note  =  await  Note . read_by_id ( session ,  note_id ) 
            if  not  note : 
                raise  HTTPException ( status_code = 404 ) 
            return  NoteSchema . from_orm ( note)
    async  def  execute ( self ,  notebook_id :  int ,  title :  str ,  content :  str )  ->  NoteSchema : 
        async  with  self . async_session .begin ( ) as session : notebook = await Notebook . read_by_id ( session , notebook_id ) if not notebook : raise HTTPException (  
                
              
                 status_code = 404 ) 
            note  =  await  Note .create ( session , notebook .id , title , content ) return NoteSchema . from_orm ( note ) _ _   
             

async support

Here are some points about async support.

Support with FastAPI

You can support async just by defining a request handler with async def . It's really easy.

@router .post ( " / " , response_model = CreateNoteResponse ) async def create ( request : Request , data : CreateNoteRequest , use_case : CreateNote = Depends ( CreateNote ), ) - > NoteSchema : return await use_case .exe ( data .notebook_id , data ) . title , data 
  
     
     
       
  
        . content )

FastAPI has abundant documents, and if you have a schema definition, OpenAPI documents are automatically generated and setting value management is easy, so it is recommended. I personally like Starlette and Pydantic , which are the bases of FastAPI, and have few complaints, so recently when creating WebAPI, FastAPI is the first choice.

can't call await_ () here error

The error I saw most while supporting SQLAlchemy's async is this error lol
Especially when I encountered it when assembling response data from an instance of the model NoteSchema.from_orm(note).NotebookSchema.from_orm(notebook)

sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_ () here. Was IO attempted in an unexpected place? (Background on this error at: http://sqlalche.me/e/14/xd2s)
Click here for the entire stack trace

This error is relationshipoften encountered when accessing attributes defined using. relationshipIs a SQLAlchemy ORM model that expresses RDB relations and is used as follows.

class  Note ( Base ): 
    __tablename__  =  "notes" 
    id :  int  =  Column ( 
        "id" ,  Integer (),  autoincrement = True ,  nullable = False ,  unique = True ,  primary_key = True 
    ) 
    ... 
    notebook_id :  int  =  Column ( "notebook_id" ,  Integer (),  ForeignKey ( "notebooks.id" ), nullable = False ) 
    notebook :  Notebook  =  relationship ( "Notebook" ,  back_populates = "notes" )

class  Notebook ( Base ): 
    __tablename__  =  "notebooks" 
    id :  int  =  Column ( 
        "id" ,  Integer (),  autoincrement = True ,  nullable = False ,  unique = True ,  primary_key = True 
    ) 
    ... 
    notes :  list [ Note ]  =  relationship ( 
        "Note" , 
        back_populates = "notebook" ,
        order_by = "Note.id" , 
        cascade = "save-update, merge, refresh-expire, expunge, delete, delete-orphan" , 
    )

The relationshipdefault attribute using this is lazy loading , and the query is executed when the attribute is accessed for the first time. This often works well in a world without asyncio, but it doesn't work in an async world.
Therefore, when supporting async, it is necessary to acquire all the necessary data from the beginning by a method called eager loading .

Eager loading can be specified by the statement at the time of query execution or by the argument at the time of option()attribute definition Details can be found in Relationship Loading Techniques , but the basic pattern is as follows.relationshiplazy

  1. Joined Eager Loading  +  if the relation destination is 1 and nullable = Falseinnerjoin=True
  2. Joined Eager Loading if the relation destination is 1 and nullable = True
  3. Select IN loading when there are many relations (Many)

Note.notebook corresponds to pattern 1.

class  Note ( Base ): 
    ... 
    @classmethod 
    async  def  read_by_id ( cls ,  session :  AsyncSession ,  note_id :  int )  ->  Optional [ Note ]: 
        # specified by 
        joinload stmt  =  select ( cls ) .where ( cls .id == note_id ) .options ( joinloaded ( cls .notebook ) ) _  
        result  =  ( await  session .execute ( stmt .order_by ( cls .id ) ) ) . first ( ) if result : return result . Note else : return None
         
             
        
             

Notebook.notes corresponds to pattern 3.

class  Notebook ( Base ): 
    ... 
    @classmethod 
    async  def  read_by_id ( 
        cls ,  session :  AsyncSession ,  notebook_id :  int ,  include_notes :  bool  =  False 
    )  ->  Optional [ Notebook ]: 
        stmt =  select  ( cls ) . where ( cls .id ) == notebook_id ) if include_notes :  
         
            
            #Collection specified by selectinload stmt  =  stmt .options ( selectinload ( cls .notes ) ) result = ( await session .exe ( stmt .order_by ( cls .id ) ) ) . first ( ) if result : return result .Notebook else : return None
           
         
             
        
             

I was proceeding by specifying it with relationshipthe argument lazyhalfway, but I was worried about the number of times the query was executed in the middle, so I specified select inload only when necessary .

No event for async

The application I was migrating used PostgreSQL's MATERIALIZED VIEW and implemented a refresh execution trigger using SQLAlchemy ORM's Session Events . The Session Events feature itself isn't gone, but these asynchronous versions aren't available at this time.

It seems that the processing itself can be realized by using async_engine.sync_engine, but I do not mind using the async world, sync_engineand since Read is the main application, I surpass it with such a decorator.

@asynccontextmanager 
async  def  mv_refresh_session ( db :  sessionmaker )  ->  AsyncIterator [ AsyncSession ]: 
    "" "Refresh materialized view as it exits the block

    : param db: sessionmaker 
"     " " 
    try : 
        async  with  db .begin () as session : yield session await session .flush ( ) await refresh ( session ) finally : pass  
             
             
             
    
        

Test case async support

Testing is important.
Without testing, I think it would have been difficult to do a large-scale migration like this one.

In an asynchronous application that uses an event loop, the test must also run inside the event loop. If you're using pytest you can easily run that test case inside an event loop by simply adding pytest-asyncio . @pytest.mark.asyncioIf you define the fixture as async defa coroutine, you can use it normally. However, note that the scope that can be specified is basically only function. There seems to be a way to change the scope )

Also, in the event loop, it is awaitnecessary to attach all HTTP requests etc. and execute them asynchronously. Any HTTP client that supports async is fine, but I am using httpx, which is also used in the FastAPI documentation . Originally I used requests to create test cases, but since the API is almost the same, I only had to replace the client.

import  pytest 
from  httpx  import  AsyncClient

@pytest .mark . asyncio async def test_notebooks_create ( ac : AsyncClient , session : AsyncSession ) - > None : "" "Create a notebook" "" #execute response = await ac .post ( " / api / notebooks" , json = { "title" : "Test Notebook" , "notes" : []})
       
    
    
           

    print ( response .content ) assert 200 == response .status_code expected = { " id" : ID_STRING , "title" : "Test Notebook" , "notes" : [ ]} assert expected == response .json ( )
       
           
       

DB creation for testing

In the test of processing using DB, I want to actually read and write DB without using mock.

Regarding testing with DB with FastAPI and SQLAlchemy, I wrote an article about testing FastAPI + SQLAlchemy with pytest before . The basic flow does not change even after async support, but since the session object passed to the test case is AsyncSession, the creation method will change. I was a little addicted to this, but I created it with reference to the comments in this issue.

@pytest . fixture 
async  def  session (): 
    # https://github.com/sqlalchemy/sqlalchemy/issues/5811#issuecomment-756269881 
    async_engine =  create_async_engine  ( f " { settings.DB_URI} / test" ) 
    async  with  async_engine .connect () as conn :  

        await  conn .begin ( ) await conn .begin_nested ( ) AsyncSessionLocal = sessionmaker ( autocommit = False , autoflush = False , bind = conn , future = True , class_ = AsyncSession , )
         
          
            
            
            
            
            
        

        async_session  =  AsyncSessionLocal ()

        @event .listens_for ( async_session .sync_session , " after_transaction_end " ) def end_savepoint ( session , transaction ) : if conn .closed : return if not conn . in_nested_transaction : conn .sync_connection .begin_nested ( ) _ 
          
             
                
              
                

        def  test_get_session ()  ->  Generator : 
            try : 
                yield  AsyncSessionLocal 
            except  SQLAlchemyError  as  e : 
                pass

        app .dependency_overrides [ get_session ] = test_get_session _  

        yield  async_session 
        await  async_session .close ( ) await conn .rollback ( )
         

Note that the DB schema for testing (here / test) must be created in session scope and cannot be run in coroutines, so synchronous processing is create_engineused.

When you want to use synchronization processing even with AsyncEngine

In SQLAlchemy ORM, Base.metadata.create_all(session)when you execute it, the table corresponding to the model defined in ORM will be created. Since this method is a synchronous process, it cannot be handled by AsyncEngine (AsyncSession) as it is, but in such a case , it run_synccan be solved by using . run_syncA Callable that receives a Connection (Session) object as an argument to.

( venv )  $  APP_CONFIG_FILE = local  python3  --m asyncio  >>> import asyncio >>> from app.db import async_engine >>> from app.models.base import Base >>> async with async_engine .begin ( ) as conn : . _ 
.. await conn .run_sync ( Base .metadata .drop_all ) ... await conn _ _ _  
    
    
     
    
    .run_sync ( Base .metadata .create_all ) >>> _ _ _

As an aside, the -m asynciooptions of this python3 command are so convenient that I want them to spread more widely.

tags:  python ,  fastapi ,  asyncio ,  sqlalchemy

0 コメント:

コメントを投稿