Real-Time Communication with SSE in FastAPI: Enhancing Task Processing Efficiency
Real-Time Communication with SSE :
Introduction:
Efficient task processing not only involves managing background jobs but also ensuring real-time communication between the server and clients. In our project, we leverage the power of Server-Sent Events (SSE) and FastAPI to achieve this goal. SSE and WebSockets provide full-duplex communication channels that allow clients to receive real-time updates on the status of their requests.
SSE Implementation:
In this part, we’ll explore the implementation of SSE in our FastAPI application. The code below demonstrates how we use SSE to keep clients informed about the status of their tasks.
# Define a dependency for getting the database session
# SSE.py
from core.database import SessionLocal
from sqlalchemy.orm import Session
from fastapi import APIRouter,Depends
from fastapi.responses import JSONResponse
from core import models
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
router = APIRouter()
from celery.result import AsyncResult
from fastapi import Request, Response
from sse_starlette.sse import EventSourceResponse
from .views import task_to_sse_mapping
def sse_event(event, data):
return {"event":event,"data":data}
import asyncio
@router.get('/sse')
async def sse_endpoint(request: Request, token: str, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.token == token).first()
if user is None:
return JSONResponse({"Message":"Invalid Token"})
async def generator():
user_email=user.email
while True:
if await request.is_disconnected():
print(f"User {user_email} disconnected ")
break
task_id_1=None
for task_id,task_user in task_to_sse_mapping.items():
if task_user==user_email:
task_id_1=task_id
if task_id_1:
result=AsyncResult(task_id_1)
if result.ready():
del task_to_sse_mapping[task_id_1]
yield sse_event(event="message",data=result.get()['output_url'])
else:
continue
await asyncio.sleep(1)
return EventSourceResponse(generator())
Before this Get into this SSE.py Follow the tutorial to configure and setup up the celery configuration.In the tutorial that uses RABBITMQ instead of redis that is upon your own wish regarding to the project requirements.
OVERVIEW OF THE SETUP :
If you come across any issues or encounter any problems in the FASTAPI SSE setup, please feel free to contact me on LINKEDIN without any hesitation.