Introduction

This tutorial implements a text-to-speech (TTS) streaming model, parler_tts_mini using Parler_TTS library. It will enable real-time TTS streaming, converting text inputs into speech and stream the audio chunk by chunk.

Defining Dependencies

We are using the Parler_TTS and Transformers libraries for the deployment.

Constructing the GitHub/GitLab Template

Now quickly construct the GitHub/GitLab template, this process is mandatory and make sure you don’t add any file named model.py

Parler-tts-streaming/
├── app.py
├── inferless-runtime-config.yaml
├── inferless.yaml
├── input_schema.py
└── parler.py

You can also add other files to this directory.

Create the Input Schema

Let’s begin by creating the input_schema.py file, which defines the input structure for our model. You can find the complete file in our GitHub repository.

For this tutorial, we’ll use two text inputs:

  1. prompt_value: The main text to be converted to speech
  2. input_value: The voice instructions for the TTS model

Both inputs are of string data type. The output will be streamed using Server-Sent Events (SSE), delivering audio chunks as base64 encoded strings. This approach allows for real-time audio playback as the speech is generated.

To enable streaming with SSE, it’s crucial to set the IS_STREAMING_OUTPUT property to True in your model configuration. This tells the system to expect and handle a continuous output stream rather than a single response.

It’s important to note the limitations when working with streaming inputs:

  1. Supported datatypes: Only INT, STRING, and BOOLEAN are supported as input datatypes.
  2. Input shape: The shape of each parameter should be [1]. For multiple inputs or complex objects, use json.dumps(object) to convert them to a string before passing.
  3. Consistent output schema: All iterative responses in the output stream must adhere to the same schema.

Now, let’s create the input_schema.py file with the following content:

INPUT_SCHEMA = {
    "input_value": {
        'datatype': 'STRING',
        'required': True,
        'shape': [1],
        'example': ["A male speaker with a low-pitched voice delivering his words at a fast pace in a small, confined space with a very clear audio and an animated tone."]
    },
    "prompt_value": {
        'datatype': 'STRING',
        'required': True,
        'shape': [1],
        'example': ["Remember - this is only the first iteration of the model! To improve the prosody and naturalness of the speech further, we're scaling up the amount of training data by a factor of five times."]
    }
}
IS_STREAMING_OUTPUT = True

Create the class for Text-to-Speech Streamer

In the parler.py file, we define the ParlerTTSStreamer class and import all the required functions.

import math
from queue import Queue
import numpy as np
import torch

from parler_tts import ParlerTTSForConditionalGeneration
from transformers import AutoTokenizer, AutoFeatureExtractor
from transformers.generation.streamers import BaseStreamer

class ParlerTTSStreamer(BaseStreamer):
    def __init__(self):
        self.device = "cuda:0"
        torch_dtype = torch.float16
       
        repo_id = "parler-tts/parler_tts_mini_v0.1"
        self.tokenizer = AutoTokenizer.from_pretrained(repo_id)
        self.feature_extractor = AutoFeatureExtractor.from_pretrained(repo_id)

        self.SAMPLE_RATE = self.feature_extractor.sampling_rate

        self.model = ParlerTTSForConditionalGeneration.from_pretrained(repo_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True).to(self.device)
        self.decoder = self.model.decoder
        self.audio_encoder = self.model.audio_encoder
        self.generation_config = self.model.generation_config

        self.sampling_rate = self.model.audio_encoder.config.sampling_rate
        frame_rate = self.model.audio_encoder.config.frame_rate

        play_steps_in_s=2.0
        play_steps = int(frame_rate * play_steps_in_s)

        # variables used in the streaming process
        self.play_steps = play_steps

        hop_length = math.floor(self.audio_encoder.config.sampling_rate / self.audio_encoder.config.frame_rate)
        self.stride = hop_length * (play_steps - self.decoder.num_codebooks) // 6
        self.token_cache = None
        self.to_yield = 0

        # varibles used in the thread process
        self.audio_queue = Queue()
        self.stop_signal = None
        self.timeout = None

    def apply_delay_pattern_mask(self, input_ids):
        # build the delay pattern mask for offsetting each codebook prediction by 1 (this behaviour is specific to Parler)
        _, delay_pattern_mask = self.decoder.build_delay_pattern_mask(
            input_ids[:, :1],
            bos_token_id=self.generation_config.bos_token_id,
            pad_token_id=self.generation_config.decoder_start_token_id,
            max_length=input_ids.shape[-1],
        )
        # apply the pattern mask to the input ids
        input_ids = self.decoder.apply_delay_pattern_mask(input_ids, delay_pattern_mask)

        # revert the pattern delay mask by filtering the pad token id
        mask = (delay_pattern_mask != self.generation_config.bos_token_id) & (delay_pattern_mask != self.generation_config.pad_token_id)
        input_ids = input_ids[mask].reshape(1, self.decoder.num_codebooks, -1)
        # append the frame dimension back to the audio codes
        input_ids = input_ids[None, ...]

        # send the input_ids to the correct device
        input_ids = input_ids.to(self.audio_encoder.device)

        decode_sequentially = (
            self.generation_config.bos_token_id in input_ids
            or self.generation_config.pad_token_id in input_ids
            or self.generation_config.eos_token_id in input_ids
        )
        if not decode_sequentially:
            output_values = self.audio_encoder.decode(
                input_ids,
                audio_scales=[None],
            )
        else:
            sample = input_ids[:, 0]
            sample_mask = (sample >= self.audio_encoder.config.codebook_size).sum(dim=(0, 1)) == 0
            sample = sample[:, :, sample_mask]
            output_values = self.audio_encoder.decode(sample[None, ...], [None])

        audio_values = output_values.audio_values[0, 0]
        return audio_values.cpu().float().numpy()

    def put(self, value):
        batch_size = value.shape[0] // self.decoder.num_codebooks
     
        if self.token_cache is None:
            self.token_cache = value
        else:
            self.token_cache = torch.concatenate([self.token_cache, value[:, None]], dim=-1)

        if self.token_cache.shape[-1] % self.play_steps == 0:
            audio_values = self.apply_delay_pattern_mask(self.token_cache)
            self.on_finalized_audio(audio_values[self.to_yield : -self.stride])
            self.to_yield += len(audio_values) - self.to_yield - self.stride

    def end(self):
        # Flushes any remaining cache and appends the stop symbol
        if self.token_cache is not None:
            audio_values = self.apply_delay_pattern_mask(self.token_cache)
        else:
            audio_values = np.zeros(self.to_yield)

        self.on_finalized_audio(audio_values[self.to_yield :], stream_end=True)

    def on_finalized_audio(self, audio: np.ndarray, stream_end: bool = False):
        # Put the new audio in the queue. If the stream is ending, also put a stop signal in the queue.
        self.audio_queue.put(audio, timeout=self.timeout)
        if stream_end:
            self.audio_queue.put(self.stop_signal, timeout=self.timeout)

    def __iter__(self):
        return self

    def __next__(self):
        value = self.audio_queue.get(timeout=self.timeout)
        if not isinstance(value, np.ndarray) and value == self.stop_signal:
            raise StopIteration()
        else:
            return value

Create the class for inference

In the app.py we will define the class and import all the required functions

  1. def initialize: In this function, we will create an object of the ParlerTTSStreamer class which will load the model. You can define any variable that you want to use during inference.

  2. def infer: The infer function is the core of your model’s inference process. It’s invoked for each incoming request and is responsible for processing the input and generating the streamed output. Here’s a breakdown of its key components:

    a. Output Streaming Setup:

    • We create a dictionary output_dict with a key 'OUT'.
    • This dictionary will hold each chunk of the generated audio as a base64-encoded string.

    b. Processing and Streaming:

    • As the model generates audio chunks, we encode each chunk to base64.
    • For each encoded chunk (mp3_str), we update the output_dict:
      output_dict['OUT'] = mp3_str
      
    • We will use the stream_output_handler for streaming the generated audio output chunks. It provides stream_output_handler.send_streamed_output() function to send this chunk to the client:
      stream_output_handler.send_streamed_output(output_dict)
      
    • This process repeats for each audio chunk, allowing real-time streaming of the generated speech.

    c. Finalizing the Stream:

    • After all chunks have been processed and sent, we call:
      stream_output_handler.finalise_streamed_output()
      
    • This function signals the end of the stream to the client, properly closing the event streamer.
  3. def finalize: This function cleans up all the allocated memory.

import io
import base64
import numpy as np
from threading import Thread
from pydub import AudioSegment
from parler import ParlerTTSStreamer

class InferlessPythonModel:
    def initialize(self):
        # Initialize the ParlerTTSStreamer object
        self.streamer = ParlerTTSStreamer()

    def numpy_to_mp3(self, audio_array, sampling_rate):
        # Convert numpy array to MP3 format
        if np.issubdtype(audio_array.dtype, np.floating):
            # Normalize floating-point audio data to 16-bit integer range
            max_val = np.max(np.abs(audio_array))
            audio_array = (audio_array / max_val) * 32767
            audio_array = audio_array.astype(np.int16)
        
        # Create an AudioSegment object from the numpy array
        audio_segment = AudioSegment(
            audio_array.tobytes(),
            frame_rate=sampling_rate,
            sample_width=audio_array.dtype.itemsize,
            channels=1
        )
        
        # Export the AudioSegment to MP3 format
        mp3_io = io.BytesIO()
        audio_segment.export(mp3_io, format="mp3", bitrate="320k")
        mp3_bytes = mp3_io.getvalue()
        mp3_io.close()
        
        return mp3_bytes

    def infer(self, inputs, stream_output_handler):
        # Reset streamer properties
        self.streamer.token_cache = None
        self.streamer.to_yield = 0
        
        # Extract input and prompt values from the inputs dictionary
        input_value = inputs["input_value"]
        prompt_value = inputs["prompt_value"]
        
        # Tokenize input and prompt
        inputs_ = self.streamer.tokenizer(input_value, return_tensors="pt").to(self.streamer.device)
        prompt = self.streamer.tokenizer(prompt_value, return_tensors="pt").to(self.streamer.device)
        
        # Set up generation kwargs for the model
        generation_kwargs = dict(
            input_ids=inputs_.input_ids,
            prompt_input_ids=prompt.input_ids,
            streamer=self.streamer,
            do_sample=True,
            temperature=1.0,
            min_new_tokens=10)
        
        # Start a new thread for model generation
        thread = Thread(target=self.streamer.model.generate, kwargs=generation_kwargs)
        thread.start()
        
        # Process and stream the generated audio
        for new_audio in self.streamer:
            # Convert numpy array to MP3 and encode as base64 string
            mp3_bytes = self.numpy_to_mp3(new_audio, sampling_rate=self.streamer.sampling_rate)
            mp3_str = base64.b64encode(mp3_bytes).decode('utf-8')
            
            # Prepare and send the output dictionary
            output_dict = {}
            output_dict["OUT"] = mp3_str
            stream_output_handler.send_streamed_output(output_dict)
        
        # Wait for the generation thread to complete
        thread.join()
        
        # Finalize the streamed output
        stream_output_handler.finalise_streamed_output()

    def finalize(self, args):
        # Clean up resources
        self.streamer = None

Creating the Custom Runtime

This is a mandatory step where we allow the users to upload their own custom runtime through inferless-runtime-config.yaml. To enable streaming functionality, ensure you are using CUDA version 12.4.1.

build:
  cuda_version: "12.4.1"
  system_packages:
    - "ffmpeg"
  python_packages:
    - "accelerate==0.31.0"
    - "pydub==0.25.1"
    - "git+https://github.com/huggingface/parler-tts@8b8c576e2dbdc29172e30be7d68fac9357cd92c5#egg=parler-tts"

Method A: Deploying the model on Inferless Platform

Inferless supports multiple ways of importing your model. For this tutorial, we will use GitHub.

Step 1: Login to the inferless dashboard can click on Import model button

Navigate to your desired workspace in Inferless and Click on Add a custom model button that you see on the top right. An import wizard will open up.

Step 2: Follow the UI to complete the model Import

  • Select the GitHub/GitLab Integration option to connect your source code repository with the deployment environment.
  • Navigate to the specific GitHub repository that contains your model’s code. Here, you will need to identify and enter the name of the model you wish to import.
  • Choose the appropriate type of machine that suits your model’s requirements. Additionally, specify the minimum and maximum number of replicas to define the scalability range for deploying your model.
  • Optionally, you have the option to enable automatic build and deployment. This feature triggers a new deployment automatically whenever there is a new code push to your repository.
  • If your model requires additional software packages, configure the Custom Runtime settings by including necessary pip or apt packages. Also, set up environment variables such as Inference Timeout, Container Concurrency, and Scale Down Timeout to tailor the runtime environment according to your needs.
  • Wait for the validation process to complete, ensuring that all settings are correct and functional. Once validation is successful, click on the “Import” button to finalize the import of your model.

Step 3: Wait for the model build to complete usually takes ~5-10 minutes

Step 4: Use the APIs to call the model

Once the model is in ‘Active’ status you can click on the ‘API’ page to call the model

Here is the Demo:

Method B: Deploying the model on Inferless CLI

Inferless allows you to deploy your model using Inferless-CLI. Follow the steps to deploy using Inferless CLI.

Initialization of the model

Create the app.py and inferless-runtime-config.yaml, move the files to the working directory. Run the following command to initialize your model:

inferless init

Upload the custom runtime

Once you have created the inferless-runtime-config.yaml file, you can run the following command:

inferless runtime upload

Upon entering this command, you will be prompted to provide the configuration file name. Enter the name and ensure to update it in the inferless.yaml file. Now you are ready for the deployment.

Deploy the Model

Execute the following command to deploy your model. Once deployed, you can track the build logs on the Inferless platform:

inferless deploy