Server-Sent Events (SSE) is a standard that describes how servers can initiate data transmission towards browser clients once an initial client connection has been established. It’s particularly useful for creating a one-way communication channel from the server to the client, such as for real-time notifications, live updates, and streaming data.Server-Sent Events (SSE) can be enabled and configured independently for each model using the IS_STREAMING_OUTPUT property in the model configuration. You can use the ‘stream_output_handler’ to send each event and close the event stream. There are some limitations in streaming the type of input
Only INT, STRING, BOOLEAN are supported as the datatypes in the INPUT
The shape of the parameter should be [1], multiple inputs or objects are by using “json.dumps(object)” and then passed as string
Output should have the same schema in all the iterative responses
You can use the below repo for example:https://github.com/inferless/inferless_template_streamingYou will also need to be on cuda_version: “12.4.1” for using streaming make sure you use the below CUDA version in the custom runtime
Copy
build: cuda_version: "12.4.1" # This cuda version system_packages: - "libssl-dev" python_packages: - "transformers==4.41.1" - "torch==2.1.2" - "autoawq==0.1.8"
Define IS_STREAMING_OUTPUT in the input_schema.py or in app.py if you are using pydantic.
INPUT_SCHEMA = { "TEXT": { 'datatype': 'STRING', 'required': True, 'shape': [1], 'example': ["How to make a omellete"] }}IS_STREAMING_OUTPUT = True
in app.py
Copy
import jsonimport numpy as npimport torchfrom transformers import pipelinefrom threading import Threadfrom transformers import AutoTokenizer, TextIteratorStreamerfrom awq import AutoAWQForCausalLMMODEL_NAME = "TheBloke/zephyr-7B-beta-AWQ"class InferlessPythonModel: def initialize(self): self.model = AutoAWQForCausalLM.from_quantized(MODEL_NAME, fuse_layers=False, version="GEMV") self.tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) self.streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True) def infer(self, inputs, stream_output_handler): prompt = inputs["TEXT"] messages = [{ "role": "system", "content": "You are an agent that know about about cooking." }] messages.append({ "role": "user", "content": prompt }) tokenized_chat = self.tokenizer.apply_chat_template(messages, tokenize=True, add_generation_prompt=True, return_tensors="pt").cuda() generation_kwargs = dict( inputs=tokenized_chat, streamer=self.streamer, do_sample=True, temperature=0.9, top_p=0.95, repetition_penalty=1.2, max_new_tokens=1024, ) thread = Thread(target=self.model.generate, kwargs=generation_kwargs) thread.start() for new_text in self.streamer: output_dict = {} output_dict["OUT"] = new_text # Sent the partial response as an event stream_output_handler.send_streamed_output(output_dict) thread.join() # Call this to close the stream, If not called can lead to the issue of request not being released stream_output_handler.finalise_streamed_output() # perform any cleanup activity here def finalize(self,args): self.pipe = None
1.Simplicity: SSE is straightforward to implement both on the server and the client side. Unlike WebSockets, which require a special protocol and server setup, SSE works over standard HTTP and can be handled by traditional web servers without any special configuration.2.Efficient Real-time Communication: SSE is designed for scenarios where the server needs to push data to the client. It’s very efficient for use cases like live notifications, feeds, and real-time analytics dashboards where updates are frequent and originate from the server.
Built-in Reconnection: SSE has automatic reconnection support. If the connection between the client and server is lost, the client will automatically attempt to reestablish the connection after a timeout. This makes it resilient and ensures continuous data flow without manual intervention.