Streaming with SSE Events
Server-Sent Events (SSE) is a standard that describes how servers can initiate data transmission towards browser clients once an initial client connection has been established. It’s particularly useful for creating a one-way communication channel from the server to the client, such as for real-time notifications, live updates, and streaming data.
Server-Sent Events (SSE) can be enabled and configured independently for each model using the IS_STREAMING_OUTPUT property in the model configuration. You can use the ‘stream_output_handler’ to send each event and close the event stream. There are some limitations in streaming the type of input
- Only INT, STRING, BOOLEAN are supported as the datatypes in the INPUT
- The shape of the parameter should be [1], multiple inputs or objects are by using “json.dumps(object)” and then passed as string
- Output should have the same schema in all the iterative responses
You can use the below repo for example:
https://github.com/inferless/inferless_template_streaming
You will also need to be on cuda_version: “12.4.1” for using streaming make sure you use the below CUDA version in the custom runtime
build:
cuda_version: "12.4.1" # This cuda version
system_packages:
- "libssl-dev"
python_packages:
- "transformers==4.41.1"
- "torch==2.1.2"
- "autoawq==0.1.8"
Define IS_STREAMING_OUTPUT in the input_schema.py
/
├── app.py
├── input_schema.py
input_schema.py
INPUT_SCHEMA = {
"TEXT": {
'datatype': 'STRING',
'required': True,
'shape': [1],
'example': ["How to make a omellete"]
}
}
IS_STREAMING_OUTPUT = True
in app.py
import json
import numpy as np
import torch
from transformers import pipeline
from threading import Thread
from transformers import AutoTokenizer, TextIteratorStreamer
from awq import AutoAWQForCausalLM
MODEL_NAME = "TheBloke/zephyr-7B-beta-AWQ"
class InferlessPythonModel:
def initialize(self):
self.model = AutoAWQForCausalLM.from_quantized(MODEL_NAME, fuse_layers=False, version="GEMV")
self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
self.streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
def infer(self, inputs, stream_output_handler):
prompt = inputs["TEXT"]
messages = [{ "role": "system", "content": "You are an agent that know about about cooking." }]
messages.append({ "role": "user", "content": prompt })
tokenized_chat = self.tokenizer.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").cuda()
generation_kwargs = dict(
inputs=tokenized_chat,
streamer=self.streamer,
do_sample=True,
temperature=0.9,
top_p=0.95,
repetition_penalty=1.2,
max_new_tokens=1024,
)
thread = Thread(target=self.model.generate, kwargs=generation_kwargs)
thread.start()
for new_text in self.streamer:
output_dict = {}
output_dict["OUT"] = new_text
# Sent the partial response as an event
stream_output_handler.send_streamed_output(output_dict)
thread.join()
# Call this to close the stream, If not called can lead to the issue of request not being released
stream_output_handler.finalise_streamed_output()
# perform any cleanup activity here
def finalize(self,args):
self.pipe = None
Key Advantages of Using SSE
1.Simplicity: SSE is straightforward to implement both on the server and the client side. Unlike WebSockets, which require a special protocol and server setup, SSE works over standard HTTP and can be handled by traditional web servers without any special configuration.
2.Efficient Real-time Communication: SSE is designed for scenarios where the server needs to push data to the client. It’s very efficient for use cases like live notifications, feeds, and real-time analytics dashboards where updates are frequent and originate from the server.
- Built-in Reconnection: SSE has automatic reconnection support. If the connection between the client and server is lost, the client will automatically attempt to reestablish the connection after a timeout. This makes it resilient and ensures continuous data flow without manual intervention.