Server-Sent Events (SSE) is a standard that describes how servers can initiate data transmission towards browser clients once an initial client connection has been established. It’s particularly useful for creating a one-way communication channel from the server to the client, such as for real-time notifications, live updates, and streaming data.

Server-Sent Events (SSE) can be enabled and configured independently for each model using the IS_STREAMING_OUTPUT property in the model configuration. You can use the ‘stream_output_handler’ to send each event and close the event stream. There are some limitations in streaming the type of input

  • Only INT, STRING, BOOLEAN are supported as the datatypes in the INPUT
  • The shape of the parameter should be [1], multiple inputs or objects are by using “json.dumps(object)” and then passed as string
  • Output should have the same schema in all the iterative responses

You can use the below repo for example:

https://github.com/inferless/inferless_template_streaming

You will also need to be on cuda_version: “12.4.1” for using streaming make sure you use the below CUDA version in the custom runtime

build:
  cuda_version: "12.4.1" # This cuda version 
  system_packages:
    - "libssl-dev"
  python_packages:
    - "transformers==4.41.1"
    - "torch==2.1.2"
    - "autoawq==0.1.8"

Define IS_STREAMING_OUTPUT in the input_schema.py

/
├── app.py
├── input_schema.py 

input_schema.py

INPUT_SCHEMA = {
    "TEXT": {
        'datatype': 'STRING',
        'required': True,
        'shape': [1],
        'example': ["How to make a omellete"]
    }
}
IS_STREAMING_OUTPUT = True

in app.py

import json
import numpy as np
import torch
from transformers import pipeline
from threading import Thread
from transformers import AutoTokenizer, TextIteratorStreamer
from awq import AutoAWQForCausalLM

MODEL_NAME = "TheBloke/zephyr-7B-beta-AWQ"

class InferlessPythonModel:

    def initialize(self):
        self.model = AutoAWQForCausalLM.from_quantized(MODEL_NAME, fuse_layers=False, version="GEMV")
        self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
        self.streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)

    def infer(self, inputs, stream_output_handler):

        prompt = inputs["TEXT"]
        messages = [{ "role": "system", "content": "You are an agent that know about about cooking." }] 
        messages.append({ "role": "user", "content": prompt })
        tokenized_chat = self.tokenizer.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").cuda()
        generation_kwargs = dict(
            inputs=tokenized_chat,
            streamer=self.streamer,
            do_sample=True,
            temperature=0.9,
            top_p=0.95,
            repetition_penalty=1.2,
            max_new_tokens=1024,
        )
        thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
        thread.start()

        for new_text in self.streamer:
            output_dict = {}
            output_dict["OUT"] = new_text
            # Sent the partial response as an event 
            stream_output_handler.send_streamed_output(output_dict)
        thread.join()

        # Call this to close the stream, If not called can lead to the issue of request not being released
        stream_output_handler.finalise_streamed_output()



    # perform any cleanup activity here
    def finalize(self,args):
        self.pipe = None

Key Advantages of Using SSE

1.Simplicity: SSE is straightforward to implement both on the server and the client side. Unlike WebSockets, which require a special protocol and server setup, SSE works over standard HTTP and can be handled by traditional web servers without any special configuration.

2.Efficient Real-time Communication: SSE is designed for scenarios where the server needs to push data to the client. It’s very efficient for use cases like live notifications, feeds, and real-time analytics dashboards where updates are frequent and originate from the server.

  1. Built-in Reconnection: SSE has automatic reconnection support. If the connection between the client and server is lost, the client will automatically attempt to reestablish the connection after a timeout. This makes it resilient and ensures continuous data flow without manual intervention.