Real-Time Communication with SSE in FastAPI: Enhancing Task Processing Efficiency

Real-Time Communication with SSE :

Princy Prakash
2 min readOct 14, 2023

Introduction:

Efficient task processing not only involves managing background jobs but also ensuring real-time communication between the server and clients. In our project, we leverage the power of Server-Sent Events (SSE) and FastAPI to achieve this goal. SSE and WebSockets provide full-duplex communication channels that allow clients to receive real-time updates on the status of their requests.

SSE Implementation:

In this part, we’ll explore the implementation of SSE in our FastAPI application. The code below demonstrates how we use SSE to keep clients informed about the status of their tasks.

# Define a dependency for getting the database session
# SSE.py
from core.database import SessionLocal
from sqlalchemy.orm import Session
from fastapi import APIRouter,Depends
from fastapi.responses import JSONResponse

from core import models

def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
router = APIRouter()

from celery.result import AsyncResult
from fastapi import Request, Response
from sse_starlette.sse import EventSourceResponse
from .views import task_to_sse_mapping


def sse_event(event, data):
return {"event":event,"data":data}

import asyncio
@router.get('/sse')
async def sse_endpoint(request: Request, token: str, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.token == token).first()
if user is None:
return JSONResponse({"Message":"Invalid Token"})
async def generator():
user_email=user.email
while True:
if await request.is_disconnected():
print(f"User {user_email} disconnected ")
break
task_id_1=None
for task_id,task_user in task_to_sse_mapping.items():
if task_user==user_email:
task_id_1=task_id

if task_id_1:
result=AsyncResult(task_id_1)

if result.ready():
del task_to_sse_mapping[task_id_1]
yield sse_event(event="message",data=result.get()['output_url'])
else:
continue
await asyncio.sleep(1)

return EventSourceResponse(generator())

Before this Get into this SSE.py Follow the tutorial to configure and setup up the celery configuration.In the tutorial that uses RABBITMQ instead of redis that is upon your own wish regarding to the project requirements.

OVERVIEW OF THE SETUP :

WORKFLOW USING FASTAPI WITH MINIO FILE STORAGE AND DATABASE

If you come across any issues or encounter any problems in the FASTAPI SSE setup, please feel free to contact me on LINKEDIN without any hesitation.

--

--

Princy Prakash
Princy Prakash

Written by Princy Prakash

Hi I Am Princy. I recently discovered my passion for programming and excited to get deeper into the world of technologies also in problem-solving

Responses (1)