Certified PlabWare
#!/usr/bin/env python3
from kokoro import KPipeline
from IPython.display import display, Audio
import soundfile as sf
import torch
from datetime import datetime
import re
import subprocess
import numpy as np
from IPython.display import display, Audio
pipeline = KPipeline(lang_code='a')
def main():
filename = input("Enter the filename: ")
try:
with open(filename, 'r', encoding='utf-8') as file:
text = file.read()
except FileNotFoundError:
print(f"Error: File '{filename}' not found.")
return
# Step 1: Remove lines that do not contain any ASCII alphabetic characters
lines = text.splitlines()
filtered_lines = [line for line in lines if re.search(r'[a-zA-Z]', line)]
text = '\n'.join(filtered_lines)
# Step 2: Join hyphenated words split across lines
text = re.sub(r'-\s*\n', '', text)
# Step 3: Join lines that do NOT end with sentence-ending punctuation
text = re.sub(r'''([^.!?\"\'])''''(?:\s*\n)', r'\1 ', text)
# Step 4: Replace period followed by a digit with period + space
text = re.sub(r'\.\d','. ', text)
# Step 5: Remove leading digits at the start of a line if followed by a letter
text = re.sub(r'^(\d)(?=[a-zA-Z])', '', text, flags=re.MULTILINE)
# Step 6: Stream TTS audio to Ogg Opus file using FFmpeg
fdate = datetime.now().strftime("%Y-%m-%d-%S")
output_file = f"output-{fdate}.ogg"
# FFmpeg command to encode raw PCM (int16) to Ogg Opus
ffmpeg_cmd = [
'ffmpeg',
'-y', # Input from stdin
'-hide_banner', # No noise ppls
# '-f', 's16le', # Input format: signed 16-bit little-endian
'-f', 'f32le', # Input format: 32-bit float little-endian
'-ar', '24000', # Sample rate
'-ac', '1', # Mono audio
'-i', '-', # Input from stdin
'-f', 'ogg', # Output format
'-c:a', 'libopus', # Use libopus encoder
'-b:a', '14k', # Audio bitrate (adjust as needed)
'-compression_level', '10',
'-application','voip',
'-frame_duration', '60',
'-flags', 'low_delay', # For streaming
'-flush_packets', '1', # Force real-time output
output_file
]
# Start FFmpeg subprocess
process = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0 # No buffering
)
# Optional: Read stderr in a thread to catch errors in real time
import threading
def read_stderr(stream):
for line in iter(stream.readline, b''):
print("FFmpeg stderr:", line.decode(), end='')
threading.Thread(target=read_stderr, args=(process.stderr,), daemon=True).start()
# Generate audio Data
print("Streaming audio to", output_file)
generator = pipeline(text, voice='bm_lewis')
for i, (gs, ps, audio) in enumerate(generator):
print(f"Segment {i}: {gs}, {ps}")
# Convert tensor to raw PCM float32 bytes
pcm_data = audio.cpu().numpy().astype(np.float32).tobytes()
# Or convert to int16:
# pcm_data = (audio * 32767).cpu().numpy().astype(np.int16).tobytes()
try:
process.stdin.write(pcm_data)
except BrokenPipeError:
print("Error: FFmpeg process closed unexpectedly.")
break
display(Audio(data=audio.cpu().numpy(), rate=24000, autoplay=i == 0))
# Finalize the FFmpeg process
process.stdin.close()
process.wait()
print("Final audio saved to", output_file)
# Simulated TTS generator (replace with your real one)
def tts_generator(text):
"""Simulates a TTS generator yielding audio chunks (float32 arrays)"""
from numpy.random import rand
# Simulate 5 segments of 12000 samples each (24000 samples/second = 0.5 sec per chunk)
for _ in range(5):
yield 0.8 * (rand(12000) * 2 - 1).astype(np.float32) # Simulated audio
if __name__ == "__main__":
main()