__all__ = ["ParallelVoicebox"]
from abc import abstractmethod
from queue import Empty
from threading import Thread, Event
from typing import TypeVar, Iterable
from voicebox.audio import Audio
from voicebox.effects import Effects, default_effects
from voicebox.sinks import Sink, default_sink
from voicebox.tts import TTS, default_tts
from voicebox.types import StrOrSSML
from voicebox.voiceboxes.base import VoiceboxWithTextSplitter
from voicebox.voiceboxes.queue import Queue
from voicebox.voiceboxes.splitter import Splitter
T = TypeVar("T")
class _QueueThread(Thread):
queue_get_timeout: float
_queue: Queue
_stop_event: Event
def __init__(
self,
start: bool = True,
queue_max_size: int = 0,
queue_get_timeout: float = 1.0,
**kwargs,
):
super().__init__(**kwargs)
self.queue_get_timeout = queue_get_timeout
self._queue = Queue(queue_max_size)
self._stop_event = Event()
if start:
self.start()
def put(self, item: T) -> None:
self._queue.put(item)
def stop(self, wait: bool = False, timeout: float = None) -> None:
"""Notify the thread to stop running."""
self._stop_event.set()
if wait:
self.join(timeout=timeout)
def wait_until_done(self, timeout: float = None) -> None:
"""Wait until the queue is empty."""
self._queue.join(timeout=timeout)
def run(self):
for item in self._get_items():
try:
self._process_item(item)
finally:
self._queue.task_done()
def _get_items(self) -> Iterable[T]:
while not self._stop_event.is_set():
try:
yield self._queue.get(timeout=self.queue_get_timeout)
except Empty:
continue
@abstractmethod
def _process_item(self, item: T) -> None: ... # pragma: no cover
class _SinkQueueThread(_QueueThread):
sink: Sink
def __init__(self, sink: Sink, **kwargs):
self.sink = sink
super().__init__(**kwargs)
def _process_item(self, audio: Audio) -> None:
self.sink.play(audio)
class _TTSAndEffectsQueueThread(_QueueThread):
tts: TTS
effects: Effects
sink_queue_thread: _SinkQueueThread
def __init__(
self,
tts: TTS,
effects: Effects,
sink_queue_thread: _SinkQueueThread,
**kwargs,
):
self.tts = tts
self.effects = effects
self.sink_queue_thread = sink_queue_thread
super().__init__(**kwargs)
def _process_item(self, text: StrOrSSML) -> None:
audio = self.tts.get_speech(text)
for effect in self.effects:
audio = effect.apply(audio)
self.sink_queue_thread.put(audio)
[docs]
class ParallelVoicebox(VoiceboxWithTextSplitter):
"""
Handles speech on a separate thread so the main thread is not blocked
waiting for speech to complete.
Also eliminates loading time between messages by loading the audio for the
next message while the current message is playing.
Example:
>>> voicebox = ParallelVoicebox(...)
>>> voicebox.say('Hello, world!') # Does not block; speech handled by thread
>>> voicebox.say('How are you?') # Does not block
>>> # Do stuff in main thread while speech is happening...
>>> voicebox.wait_until_done() # Call before program end to prevent cutoff
>>>
>>> # Can be used as context manager
>>> with ParallelVoicebox(...) as voicebox:
>>> ...
>>> # Voicebox threads are stopped after exiting `with` block
Args:
tts:
The :class:`voicebox.tts.TTS` engine to use.
effects:
Sequence of :class:`voicebox.effects.Effect` instances to apply to
the audio before playing it.
sink:
The :class:`voicebox.sinks.Sink` to use to play the audio.
text_splitter:
The :class:`voicebox.voiceboxes.splitter.Splitter` to use to split
the text into chunks to be spoken. Defaults to no splitting.
start:
Whether to start the threads.
queue_get_timeout:
Seconds to wait for text to appear in the queue of things to say
between checks of the stop flag.
daemon:
Whether the thread is daemonic (i.e. dies when the main thread exits).
"""
_tts_and_effects_queue_thread: _TTSAndEffectsQueueThread
_sink_queue_thread: _SinkQueueThread
def __init__(
self,
tts: TTS = None,
effects: Effects = None,
sink: Sink = None,
text_splitter: Splitter = None,
start: bool = True,
queue_get_timeout: float = 1.0,
daemon: bool = True,
):
super().__init__(text_splitter)
tts = tts if tts is not None else default_tts()
effects = effects if effects is not None else default_effects()
sink = sink if sink is not None else default_sink()
self._sink_queue_thread = _SinkQueueThread(
sink,
queue_max_size=1,
queue_get_timeout=queue_get_timeout,
start=start,
daemon=daemon,
)
self._tts_and_effects_queue_thread = _TTSAndEffectsQueueThread(
tts=tts,
effects=effects,
sink_queue_thread=self._sink_queue_thread,
queue_get_timeout=queue_get_timeout,
start=start,
daemon=daemon,
)
@property
def tts(self) -> TTS:
return self._tts_and_effects_queue_thread.tts
@tts.setter
def tts(self, tts: TTS) -> None:
self._tts_and_effects_queue_thread.tts = tts
@property
def effects(self) -> Effects:
return self._tts_and_effects_queue_thread.effects
@effects.setter
def effects(self, effects: Effects) -> None:
self._tts_and_effects_queue_thread.effects = effects
@property
def sink(self) -> Sink:
return self._sink_queue_thread.sink
@sink.setter
def sink(self, sink: Sink) -> None:
self._sink_queue_thread.sink = sink
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def _say_chunk(self, chunk: str) -> None:
self._tts_and_effects_queue_thread.put(chunk)
[docs]
def start(self) -> None:
"""Start the threads."""
self._tts_and_effects_queue_thread.start()
self._sink_queue_thread.start()
[docs]
def is_alive(self) -> bool:
"""Return whether the threads are alive."""
return (
self._tts_and_effects_queue_thread.is_alive()
and self._sink_queue_thread.is_alive()
)
[docs]
def join(self, timeout: float = None) -> None:
"""
Wait until the threads terminate.
Args:
timeout:
Wait up to this many seconds. ``None`` waits forever.
"""
self._tts_and_effects_queue_thread.join(timeout=timeout)
self._sink_queue_thread.join(timeout=timeout)
[docs]
def stop(self, wait: bool = False, timeout: float = None) -> None:
"""
Notify the threads to stop running.
Args:
wait:
Whether to wait for the threads to stop.
timeout:
If waiting, then wait up to this many seconds.
``None`` waits forever.
"""
self._tts_and_effects_queue_thread.stop(wait=wait, timeout=timeout)
self._sink_queue_thread.stop(wait=wait, timeout=timeout)
[docs]
def wait_until_done(self, timeout: float = None) -> None:
"""
Wait until all speech is done.
Useful to run before program end to prevent speech from being cut off.
Raises:
NotFinished:
If ``timeout`` is not ``None`` and the timeout is reached
before all speech is done.
"""
self._tts_and_effects_queue_thread.wait_until_done(timeout=timeout)
self._sink_queue_thread.wait_until_done(timeout=timeout)