Event driven architectures have become quite popular over the last few years. This is mainly due to their flexibility and ability to handle complex business scenarios. In this post, we will create a FastAPI Event Driven Architecture example using Redis.

We will be covering the below aspects in this application

  • FastAPI Redis integration
  • Handling events in FastAPI
  • Building a Finite State Machine

If you are totally new to FastAPI, you can refer to this post on getting started with FastAPI.

1 – FastAPI Redis Project Setup

If this is your first use of FastAPI, you will have to install FastAPI on your system. Execute the below command:

$ pip install fastapi[all]

This will also include uvicorn. Basically, uvicorn is the server we use to run our FastAPI application.

Since we are going to work with Redis, we also need to install another package known as redis_om.

$ pip install redis_om

Basically, Redis OM is a library that provides high-level abstractions that make it easy to model and query data in Python applications. Some of the important features of Redis OM are as follows:

  • Declarative object mapping for Redis objects
  • APIs for querying Redis data
  • Index generation

Apart from the packages, we also need to setup a Redis instance. For this demo purpose, we will be using a Redis instance running on a Docker container. You can also install Redis on your system or obtain a Redis instance on the cloud.

2 – FastAPI Event Driven Architecture Example Application

For our demo purpose, we will be building an application for handling home delivery of products. The idea is to model the entire journey from creating a delivery to actually delivering the products at the customer’s doorstep.

This entire journey can consist of many events. However, we will focus on a few key events as follows:

  • CREATE_DELIVERY – This is the first event in the lifecycle. Basically, here we create a delivery in the system.
  • START_DELIVERY – Once the delivery is present in the system, we start the delivery process.
  • PICKUP_PRODUCTS – In this step, the delivery executive picks up the products that are part of the delivery.
  • DELIVER_PRODUCTS – This is the final event when the delivery is completed.

With this plan ready, let us now start putting together the FastAPI application.

3 – FastAPI Redis Connection

The first step in creating our FastAPI application is to set it up for connecting to Redis.

See below code for the same:

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from redis_om import get_redis_connection

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=['http://localhost:3000'],
    allow_methods=['*'],
    allow_headers=['*']
)

redis = get_redis_connection(
    host="localhost",
    port=6379,
    decode_responses=True
)

This is the bare minimum piece of code needed for establishing the connection to the Redis instance.

  • First, we import FastAPI itself.
  • Also, we import the CORSMiddleware library from FastAPI. This is useful if we want to allow CORS for certain origins and accept connections from some sort of user interface application.
  • Next, we import the get_redis_connection from redis_om.
  • With the imports out of the way, we create a FastAPI application instance and add the CORS middleware to the application instance.
  • Lastly, we obtain a connection to our Redis instance using get_redis_connection. As you can see, the get_redis_connection takes the host and port of our Redis instance as input. In our case, host is localhost and port is 6379.

We can run this application using the below command.

$ uvicorn main:app --reload

Here, the --reload flag makes sure that our application is redeployed whenever there is a code change.

4 – FastAPI Application Endpoints

Now that our basic structure is ready, we can create the actual API endpoints to interact with our FastAPI application.

See below the completed code for main.py.

import json
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from redis_om import get_redis_connection, HashModel

import consumers

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=['http://localhost:3000'],
    allow_methods=['*'],
    allow_headers=['*']
)

redis = get_redis_connection(
    host="localhost",
    port=6379,
    decode_responses=True
)

class Delivery(HashModel):
    budget: int = 0
    notes: str = ''

    class Meta:
        database = redis

class Event(HashModel):
    delivery_id: str = None
    type: str
    data: str

    class Meta:
        database = redis

@app.get('/deliveries/{pk}/status')
async def get_state(pk: str):
    state = redis.get(f'delivery:{pk}')

    if state is not None:
     return json.loads(state)

    state = build_state(pk)
    redis.set(f'delivery:{pk}', json.dumps(state))

    return {}

def build_state(pk: str):
    pks = Event.all_pks()
    all_events = [Event.get(pk) for pk in pks]
    events = [event for event in all_events if event.delivery_id == pk]

    state = {}

    for event in events:
        state = consumers.CONSUMERS[event.type](state, event)

    return state

@app.post('/deliveries/create')
async def create(request: Request):
    body = await request.json()
    delivery = Delivery(budget=body['data']['budget'], notes=body['data']['notes']).save()
    event = Event(delivery_id=delivery.pk, type=body['type'], data=json.dumps(body['data'])).save()
    state = consumers.CONSUMERS[event.type]({}, event)
    redis.set(f'delivery:{delivery.pk}', json.dumps(state))
    return state

@app.post('/event')
async def dispatch(request: Request):
    body = await request.json()
    delivery_id = body['delivery_id']
    event = Event(delivery_id=delivery_id, type=body['type'], data=json.dumps(body['data'])).save()
    state = await get_state(delivery_id)
    new_state = consumers.CONSUMERS[event.type](state, event)
    redis.set(f'delivery:{delivery_id}', json.dumps(new_state))
    return new_state

As you may have noticed, our imports have changed a little. We are also importing json library and FastAPI Request. Also, we are import HashModel from redis_om.

Let us go through the various parts of the application:

  • The Delivery class will manage the delivery information. Second, we also have the Event class for managing the events. The Event class also has the delivery_id field to keep track of which delivery the event is associated to. Both of these will be stored in our Redis instance.
  • Then, we have an end-point (/deliveries/{pk}/status) to get the current state for a particular delivery. In this case, we first check whether the state is present in Redis. But if Redis is down for some reason or the state is not available, we can reconstruct the events by replaying the events that have already occurred.
  • The next endpoint (/deliveries/create) is to create a new delivery in the system. Here, we extract the FastAPI request body from the overall Request object. Then, we create a new Delivery object followed by a new Event object. Both are persisted into Redis. Then, we construct the state object using another helper function. Finally, we also store the state for the delivery in Redis as a key-value pair.
  • The last endpoint (/event) is to basically add further events for a particular delivery. Basically, here we create a new event and store it in Redis. Next, we fetch the existing state from Redis using the delivery_id as key and update the new state. While the previous endpoint took care of the delivery creation case, the last endpoint takes care of all the further events for an existing delivery.

5 – Creating the Helper Utility for Event APIs

In the endpoints, we used a few helper methods to create the state as follows:

state = consumers.CONSUMERS[event.type]({}, event)
new_state = consumers.CONSUMERS[event.type](state, event)

To manage our application logic in a better way, we separated the functions for building state into a separate Python file consumers.py.

See below the contents of the consumers.py file.

from fastapi import HTTPException
import json

def create_delivery(state, event):
    data = json.loads(event.data)
    return {
        "id": event.delivery_id,
        "budget": int(data['budget']),
        "notes": data["notes"],
        "status": "ready"
    }

def start_delivery(state, event):

    if state['status'] != 'ready':
        raise HTTPException(status_code=400, detail="Delivery already started")

    return state | {
        "status": "active"
    }

def pickup_products(state, event):
    data = json.loads(event.data)

    new_budget = state["budget"] - int(data["purchase_price"]) * int(data['quantity'])

    if new_budget < 0:
        raise HTTPException(status_code=400, detail="Not enough budget")

    return state | {
        "budget": new_budget,
        "purchase_price": int(data["purchase_price"]),
        "quantity": int(data["quantity"]),
        "status": "collected"
    }

def deliver_products(state, event):
    data = json.loads(event.data)

    new_budget = state["budget"] - int(data["sell_price"]) * int(data['quantity'])
    new_quantity = state["quantity"] - int(data['quantity'])

    if new_quantity < 0:
        raise HTTPException(status_code=400, detail="Not enough quantity")

    return state | {
        "budget": new_budget,
        "purchase_price": int(data["sell_price"]),
        "quantity": new_quantity,
        "status": "completed"
    }

def increase_budget(state, event):
    data = json.loads(event.data)
    state['budget'] += int(data['budget'])
    return state

CONSUMERS = {
    "CREATE_DELIVERY": create_delivery,
    "START_DELIVERY": start_delivery,
    "PICKUP_PRODUCTS": pickup_products,
    "DELIVER_PRODUCTS": deliver_products,
    "INCREASE_BUDGET": increase_budget
}

Let us walkthrough this module in detail:

  • This module supports a bunch of functions for the various possible events. At the end of the file, we define all the events such as CREATE_DELIVERY, START_DELIVERY, PICKUP_PRODUCTS, DELIVERY_PRODUCTS and INCREASE_BUDGET. For each event, we declare the supporting function.
  • The create_delivery function builds the state for the first step. It returns a state with status set to ready.
  • The start_delivery function validates if the delivery already started. If not, it makes the delivery status as active. For a failed validation, we raise HTTPException.
  • Next, we have pickup_products function. In this function, we check if budget is sufficient for the products. In case of an error, we raise an exception. Else, we set the status to collected.
  • Next, we have the deliver_products function. Here, we basically sell the products to the customer and mark the delivery status as completed.
  • In case there is a situation where budget is not sufficient, we can also have the function increase_budget to modify the budget for the delivery.

The great part about this approach of handling various events is that we can dynamically call the appropriate function based on the value of event.type from the main.py file. Adding a new event to the workflow depending on the business requirement is quite easy.

The application is now ready and we can test it by triggering the various events and seeing the response. Also, we can check the data in Redis for additional confirmation.

Conclusion

With this, we have successfully implemented a FastAPI Event Driven architecture example using Redis. Basically, Redis is very versatile for this type of use-case as we can use it as a database and also for storing key-value pairs.

Once the backend state and event management system is ready, we can also build a UI wrapper to support the various operations.

If you have any comments or queries about this post, please feel free to mention them in the comments section below.

Categories: BlogFastAPI

Saurabh Dashora

Saurabh is a Software Architect with over 12 years of experience. He has worked on large-scale distributed systems across various domains and organizations. He is also a passionate Technical Writer and loves sharing knowledge in the community.

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *