Event driven architectures have become quite popular over the last few years. This is mainly due to their flexibility and ability to handle complex business scenarios. In this post, we will create a FastAPI Event Driven Architecture example using Redis.
We will be covering the below aspects in this application
- FastAPI Redis integration
- Handling events in FastAPI
- Building a Finite State Machine
If you are totally new to FastAPI, you can refer to this post on getting started with FastAPI.
1 – FastAPI Redis Project Setup
If this is your first use of FastAPI, you will have to install FastAPI on your system. Execute the below command:
$ pip install fastapi[all]
This will also include uvicorn
. Basically, uvicorn
is the server we use to run our FastAPI application.
Since we are going to work with Redis, we also need to install another package known as redis_om
.
$ pip install redis_om
Basically, Redis OM is a library that provides high-level abstractions that make it easy to model and query data in Python applications. Some of the important features of Redis OM are as follows:
- Declarative object mapping for Redis objects
- APIs for querying Redis data
- Index generation
Apart from the packages, we also need to setup a Redis instance. For this demo purpose, we will be using a Redis instance running on a Docker container. You can also install Redis on your system or obtain a Redis instance on the cloud.
2 – FastAPI Event Driven Architecture Example Application
For our demo purpose, we will be building an application for handling home delivery of products. The idea is to model the entire journey from creating a delivery to actually delivering the products at the customer’s doorstep.
This entire journey can consist of many events. However, we will focus on a few key events as follows:
- CREATE_DELIVERY – This is the first event in the lifecycle. Basically, here we create a delivery in the system.
- START_DELIVERY – Once the delivery is present in the system, we start the delivery process.
- PICKUP_PRODUCTS – In this step, the delivery executive picks up the products that are part of the delivery.
- DELIVER_PRODUCTS – This is the final event when the delivery is completed.
With this plan ready, let us now start putting together the FastAPI application.
3 – FastAPI Redis Connection
The first step in creating our FastAPI application is to set it up for connecting to Redis.
See below code for the same:
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from redis_om import get_redis_connection
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['http://localhost:3000'],
allow_methods=['*'],
allow_headers=['*']
)
redis = get_redis_connection(
host="localhost",
port=6379,
decode_responses=True
)
This is the bare minimum piece of code needed for establishing the connection to the Redis instance.
- First, we import
FastAPI
itself. - Also, we import the
CORSMiddleware
library from FastAPI. This is useful if we want to allow CORS for certain origins and accept connections from some sort of user interface application. - Next, we import the
get_redis_connection
fromredis_om
. - With the imports out of the way, we create a FastAPI application instance and add the CORS middleware to the application instance.
- Lastly, we obtain a connection to our Redis instance using
get_redis_connection
. As you can see, theget_redis_connection
takes the host and port of our Redis instance as input. In our case,host
islocalhost
andport
is6379
.
We can run this application using the below command.
$ uvicorn main:app --reload
Here, the --reload
flag makes sure that our application is redeployed whenever there is a code change.
4 – FastAPI Application Endpoints
Now that our basic structure is ready, we can create the actual API endpoints to interact with our FastAPI application.
See below the completed code for main.py
.
import json
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from redis_om import get_redis_connection, HashModel
import consumers
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['http://localhost:3000'],
allow_methods=['*'],
allow_headers=['*']
)
redis = get_redis_connection(
host="localhost",
port=6379,
decode_responses=True
)
class Delivery(HashModel):
budget: int = 0
notes: str = ''
class Meta:
database = redis
class Event(HashModel):
delivery_id: str = None
type: str
data: str
class Meta:
database = redis
@app.get('/deliveries/{pk}/status')
async def get_state(pk: str):
state = redis.get(f'delivery:{pk}')
if state is not None:
return json.loads(state)
state = build_state(pk)
redis.set(f'delivery:{pk}', json.dumps(state))
return {}
def build_state(pk: str):
pks = Event.all_pks()
all_events = [Event.get(pk) for pk in pks]
events = [event for event in all_events if event.delivery_id == pk]
state = {}
for event in events:
state = consumers.CONSUMERS[event.type](state, event)
return state
@app.post('/deliveries/create')
async def create(request: Request):
body = await request.json()
delivery = Delivery(budget=body['data']['budget'], notes=body['data']['notes']).save()
event = Event(delivery_id=delivery.pk, type=body['type'], data=json.dumps(body['data'])).save()
state = consumers.CONSUMERS[event.type]({}, event)
redis.set(f'delivery:{delivery.pk}', json.dumps(state))
return state
@app.post('/event')
async def dispatch(request: Request):
body = await request.json()
delivery_id = body['delivery_id']
event = Event(delivery_id=delivery_id, type=body['type'], data=json.dumps(body['data'])).save()
state = await get_state(delivery_id)
new_state = consumers.CONSUMERS[event.type](state, event)
redis.set(f'delivery:{delivery_id}', json.dumps(new_state))
return new_state
As you may have noticed, our imports have changed a little. We are also importing json
library and FastAPI Request
. Also, we are import HashModel
from redis_om
.
Let us go through the various parts of the application:
- The
Delivery
class will manage the delivery information. Second, we also have theEvent
class for managing the events. TheEvent
class also has thedelivery_id
field to keep track of which delivery the event is associated to. Both of these will be stored in our Redis instance. - Then, we have an end-point (
/deliveries/{pk}/status
) to get the current state for a particular delivery. In this case, we first check whether the state is present in Redis. But if Redis is down for some reason or the state is not available, we can reconstruct the events by replaying the events that have already occurred. - The next endpoint (
/deliveries/create
) is to create a new delivery in the system. Here, we extract the FastAPI request body from the overallRequest
object. Then, we create a newDelivery
object followed by a newEvent
object. Both are persisted into Redis. Then, we construct the state object using another helper function. Finally, we also store the state for the delivery in Redis as a key-value pair. - The last endpoint (
/event
) is to basically add further events for a particular delivery. Basically, here we create a new event and store it in Redis. Next, we fetch the existing state from Redis using thedelivery_id
as key and update the new state. While the previous endpoint took care of the delivery creation case, the last endpoint takes care of all the further events for an existing delivery.
5 – Creating the Helper Utility for Event APIs
In the endpoints, we used a few helper methods to create the state as follows:
state = consumers.CONSUMERS[event.type]({}, event)
new_state = consumers.CONSUMERS[event.type](state, event)
To manage our application logic in a better way, we separated the functions for building state into a separate Python file consumers.py
.
See below the contents of the consumers.py
file.
from fastapi import HTTPException
import json
def create_delivery(state, event):
data = json.loads(event.data)
return {
"id": event.delivery_id,
"budget": int(data['budget']),
"notes": data["notes"],
"status": "ready"
}
def start_delivery(state, event):
if state['status'] != 'ready':
raise HTTPException(status_code=400, detail="Delivery already started")
return state | {
"status": "active"
}
def pickup_products(state, event):
data = json.loads(event.data)
new_budget = state["budget"] - int(data["purchase_price"]) * int(data['quantity'])
if new_budget < 0:
raise HTTPException(status_code=400, detail="Not enough budget")
return state | {
"budget": new_budget,
"purchase_price": int(data["purchase_price"]),
"quantity": int(data["quantity"]),
"status": "collected"
}
def deliver_products(state, event):
data = json.loads(event.data)
new_budget = state["budget"] - int(data["sell_price"]) * int(data['quantity'])
new_quantity = state["quantity"] - int(data['quantity'])
if new_quantity < 0:
raise HTTPException(status_code=400, detail="Not enough quantity")
return state | {
"budget": new_budget,
"purchase_price": int(data["sell_price"]),
"quantity": new_quantity,
"status": "completed"
}
def increase_budget(state, event):
data = json.loads(event.data)
state['budget'] += int(data['budget'])
return state
CONSUMERS = {
"CREATE_DELIVERY": create_delivery,
"START_DELIVERY": start_delivery,
"PICKUP_PRODUCTS": pickup_products,
"DELIVER_PRODUCTS": deliver_products,
"INCREASE_BUDGET": increase_budget
}
Let us walkthrough this module in detail:
- This module supports a bunch of functions for the various possible events. At the end of the file, we define all the events such as CREATE_DELIVERY, START_DELIVERY, PICKUP_PRODUCTS, DELIVERY_PRODUCTS and INCREASE_BUDGET. For each event, we declare the supporting function.
- The
create_delivery
function builds the state for the first step. It returns a state withstatus
set toready
. - The
start_delivery
function validates if the delivery already started. If not, it makes the delivery status asactive
. For a failed validation, we raiseHTTPException
. - Next, we have
pickup_products
function. In this function, we check ifbudget
is sufficient for the products. In case of an error, we raise an exception. Else, we set the status tocollected
. - Next, we have the
deliver_products
function. Here, we basically sell the products to the customer and mark the delivery status ascompleted
. - In case there is a situation where budget is not sufficient, we can also have the function
increase_budget
to modify the budget for the delivery.
The great part about this approach of handling various events is that we can dynamically call the appropriate function based on the value of event.type
from the main.py
file. Adding a new event to the workflow depending on the business requirement is quite easy.
The application is now ready and we can test it by triggering the various events and seeing the response. Also, we can check the data in Redis for additional confirmation.
Conclusion
With this, we have successfully implemented a FastAPI Event Driven architecture example using Redis. Basically, Redis is very versatile for this type of use-case as we can use it as a database and also for storing key-value pairs.
Once the backend state and event management system is ready, we can also build a UI wrapper to support the various operations.
If you have any comments or queries about this post, please feel free to mention them in the comments section below.
0 Comments