In today's fast-paced digital world, the ability to process and understand live audio streams is invaluable. Real-Time Messaging Protocol (RTMP) is widely used for streaming audio and video content. Coupling RTMP with advanced Automatic Speech Recognition (ASR) like FunASR opens up possibilities for live captioning, content analysis, monitoring, and much more. This guide provides a comprehensive Python solution to perform real-time speech recognition on RTMP streams using FunASR on an Ubuntu 24.04 system with Python 3.9.
Diagram illustrating the RTMP streaming process.
The provided Python solution integrates several key technologies and design patterns to achieve robust real-time speech recognition:
FFmpeg, a powerful multimedia framework, is employed to connect to the RTMP stream. It extracts the audio component and transcodes it into a format suitable for FunASR. Specifically, the audio is converted to Pulse Code Modulation (PCM) format, typically at a 16kHz sample rate, single channel (mono), and 16-bit depth (s16le). This standardization ensures compatibility and optimal performance with the FunASR model.
While the script configures FFmpeg to output audio at a specific target sample rate (e.g., 16kHz) required by the FunASR Paraformer model, advanced implementations could use `ffprobe` (part of FFmpeg) to initially detect the source stream's characteristics if adaptive processing were needed. For this solution, we ensure the audio fed to FunASR is consistently formatted.
The FunASR `AutoModel` is initialized using the specified pre-trained model (e.g., "iic/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch"
). The model is loaded with Voice Activity Detection (VAD) and punctuation restoration enabled, which are crucial for accurate sentence segmentation and readability.
self.asr_model = AutoModel(
model=self.model_dir or "iic/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch",
model_type="offline", # Core model loaded, streaming handled by generate_stream
punc=True, # Use punctuation
vad=True # Enable VAD
)
Even with `model_type="offline"` for loading the core model, FunASR's `generate_stream()` method is utilized. This method is designed for real-time, continuous audio processing. Audio chunks are fed into this stream, and FunASR processes them incrementally, identifying speech segments and transcribing them. The `is_final` flag in the results indicates the completion of a sentence, enabling real-time, sentence-by-sentence output.
The system uses a multithreaded architecture to handle concurrent tasks efficiently:
A key feature is its resilience. If the RTMP stream is interrupted or the FFmpeg process terminates unexpectedly, the main control loop detects this condition. After a short delay, it attempts to re-establish the connection to the RTMP stream and restart the audio processing pipeline, ensuring the speech recognition service remains available with minimal downtime.
The following mindmap visualizes the interconnected components and data flow within the RTMP real-time speech recognition system:
This mindmap illustrates how the RTMP stream is ingested, processed by FFmpeg, buffered, and then transcribed by FunASR, all orchestrated by a Python script with multithreading and recovery mechanisms, finally outputting text in real-time.
The core of the solution is a Python class, `RTMPSpeechRecognizer`, which encapsulates all the logic for stream handling, speech recognition, and process management. Key methods include initialization of FunASR and FFmpeg parameters, worker threads for reading the stream and performing recognition, and a main loop for starting and automatically restarting the process.
Conceptual representation of speech recognition using Python.
import subprocess
import threading
import queue
import time
import sys
import logging
# import numpy as np # Not strictly needed if FunASR generate_stream takes bytes for PCM
from funasr import AutoModel
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
class RTMPSpeechRecognizer:
def __init__(self, rtmp_url, model_dir=None):
self.rtmp_url = rtmp_url
self.model_dir = model_dir
self.target_sample_rate = 16000 # FunASR Paraformer typically expects 16kHz
self.target_channels = 1
logging.info("Initializing FunASR model...")
try:
# User's specified snippet for model initialization
self.asr_model = AutoModel(
model=self.model_dir or "iic/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch",
model_type="offline",
punc=True,
vad=True,
# Depending on FunASR version and model, device selection might be here or global
# device="cuda:0" # Example for GPU, ensure PyTorch CUDA is set up
)
logging.info("FunASR model initialized successfully.")
except Exception as e:
logging.error(f"Failed to initialize FunASR model: {e}")
raise
self.audio_queue = queue.Queue(maxsize=100) # Buffer for audio chunks
self.main_running = threading.Event()
self.main_running.set() # Controls the main operational loop
self.ffmpeg_process = None
self.stream_thread = None
self.recognize_thread = None
self.ffmpeg_cmd = [] # To be populated by _configure_ffmpeg_command
def _configure_ffmpeg_command(self):
"""Configures the FFmpeg command for RTMP processing."""
logging.info(f"Configuring FFmpeg to output audio at {self.target_sample_rate}Hz, {self.target_channels} channel(s).")
self.ffmpeg_cmd = [
'ffmpeg',
'-hide_banner', # Suppress FFmpeg banner
'-nostats', # Suppress periodic statistics
'-i', self.rtmp_url,
'-f', 's16le', # Output format: signed 16-bit little-endian PCM
'-acodec', 'pcm_s16le', # Audio codec
'-ar', str(self.target_sample_rate), # Target sample rate
'-ac', str(self.target_channels), # Target channels (mono)
'-', # Output to stdout
'-loglevel', 'error' # Reduce FFmpeg log verbosity to errors only
]
def _read_rtmp_stream_worker(self):
"""Worker thread to read audio from RTMP stream via FFmpeg and put it into a queue."""
logging.info(f"Attempting to connect to RTMP stream: {self.rtmp_url}")
try:
self.ffmpeg_process = subprocess.Popen(self.ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logging.info(f"FFmpeg process started (PID: {self.ffmpeg_process.pid}) for RTMP stream.")
# Thread to monitor FFmpeg's stderr for errors without blocking stdout reading
stderr_thread = threading.Thread(target=self._log_ffmpeg_stderr, daemon=True)
stderr_thread.start()
# Calculate chunk size for approximately 0.1 seconds of audio
# Bytes per sample = 2 (for 16-bit)
chunk_size = int(self.target_sample_rate * self.target_channels * 2 * 0.1)
while self.main_running.is_set():
in_bytes = self.ffmpeg_process.stdout.read(chunk_size)
if not in_bytes:
# This indicates that FFmpeg's stdout stream has closed.
# It could be due to the RTMP stream ending or an FFmpeg error.
exit_code = self.ffmpeg_process.poll()
if exit_code is not None:
logging.warning(f"FFmpeg process exited with code {exit_code}. Stream might have ended or an error occurred.")
else:
logging.warning("FFmpeg did not return audio data. Stream might be interrupted.")
break
self.audio_queue.put(in_bytes)
except FileNotFoundError:
logging.error("FFmpeg command not found. Please ensure FFmpeg is installed and in your system's PATH.")
self.main_running.clear() # Signal main loop to stop if FFmpeg is not found
except Exception as e:
logging.error(f"Error in FFmpeg stream reading worker: {e}")
finally:
if self.ffmpeg_process:
if self.ffmpeg_process.poll() is None: # If process still running
logging.info("Terminating FFmpeg process.")
self.ffmpeg_process.terminate()
try:
self.ffmpeg_process.wait(timeout=3) # Wait for FFmpeg to terminate
except subprocess.TimeoutExpired:
logging.warning("FFmpeg process did not terminate gracefully, killing.")
self.ffmpeg_process.kill()
# Ensure stderr is read fully if thread didn't catch it all
if self.ffmpeg_process.stderr:
remaining_stderr = self.ffmpeg_process.stderr.read()
if remaining_stderr:
logging.error(f"FFmpeg STDERR (final): {remaining_stderr.decode('utf-8', errors='ignore').strip()}")
self.ffmpeg_process.stderr.close()
if self.ffmpeg_process.stdout:
self.ffmpeg_process.stdout.close()
self.ffmpeg_process = None
logging.info("FFmpeg stream reading worker finished.")
# Signal recognizer that this stream part is done
if self.main_running.is_set(): # Only if not shutting down globally
self.audio_queue.put(None) # Sentinel value for the recognizer
def _log_ffmpeg_stderr(self):
"""Logs FFmpeg's stderr output in a separate thread."""
if self.ffmpeg_process and self.ffmpeg_process.stderr:
try:
for line in iter(self.ffmpeg_process.stderr.readline, b''):
if not self.main_running.is_set(): break # Stop if service is stopping
logging.error(f"FFmpeg STDERR: {line.decode('utf-8', errors='ignore').strip()}")
except Exception as e:
logging.warning(f"Error reading FFmpeg stderr: {e}")
def _recognize_audio_worker(self):
"""Worker thread to get audio from queue and perform speech recognition."""
logging.info("Starting audio recognition worker...")
try:
# Initialize FunASR's stream generator
# param_dict specifies the format of the audio being pushed
asr_stream_generator = self.asr_model.generate_stream(
param_dict={
"audio_format": "pcm", # Or "wav" if headers were included; "pcm" for raw s16le
"sample_rate": self.target_sample_rate,
"channel": self.target_channels,
"dtype": "int16" # Corresponds to s16le PCM
}
)
# Loop while the main service is running or there's data in the queue
while self.main_running.is_set() or not self.audio_queue.empty():
try:
audio_chunk_bytes = self.audio_queue.get(timeout=1)
if audio_chunk_bytes is None: # Sentinel: stream part ended
logging.info("Recognizer received end-of-stream part signal.")
# If main_running is still set, it means we expect a new stream soon.
# If not, we are shutting down.
if not self.main_running.is_set(): break # Exit if global shutdown
else: continue # Wait for more data from a new stream
# Push audio chunk to FunASR stream
# FunASR's generate_stream().push() typically takes bytes for PCM
results = asr_stream_generator.push(audio_in=audio_chunk_bytes)
if results: # FunASR may return a list of results
for res_dict in results:
if isinstance(res_dict, dict) and "text" in res_dict and res_dict["text"]:
sentence = res_dict["text"]
# Check for 'is_final' which indicates a complete sentence segment
is_final = res_dict.get("is_final", False)
if sentence.strip() and is_final:
print(f"识别结果: {sentence}")
logging.info(f"实时识别到句子: {sentence}")
except queue.Empty:
# Queue is empty. If main_running is false, it means we are shutting down.
if not self.main_running.is_set() and self.audio_queue.empty():
break
continue # Otherwise, just continue waiting for more data
except Exception as e:
logging.error(f"Error during speech recognition processing: {e}")
# Consider if this error should break the loop or attempt recovery
# After the loop, finalize the FunASR stream to get any remaining buffered audio
logging.info("Finalizing FunASR stream...")
final_results = asr_stream_generator.close()
if final_results:
for res_dict in final_results:
if isinstance(res_dict, dict) and "text" in res_dict and res_dict["text"]:
sentence = res_dict["text"]
print(f"最终识别结果 (on close): {sentence}")
logging.info(f"流结束, 最终识别结果 (on close): {sentence}")
except Exception as e:
logging.error(f"Critical error in recognition worker setup or finalization: {e}")
finally:
logging.info("Audio recognition worker finished.")
def start(self):
"""Starts the RTMP speech recognition service with automatic recovery."""
self._configure_ffmpeg_command() # Prepare the FFmpeg command
# Start the recognition thread once. It will process data from the queue.
self.recognize_thread = threading.Thread(target=self._recognize_audio_worker, name="RecognizerThread", daemon=True)
self.recognize_thread.start()
logging.info("Recognition thread started.")
# Main loop for managing the FFmpeg stream reading process and auto-recovery
while self.main_running.is_set():
logging.info("Starting/Restarting RTMP stream reading process.")
self.stream_thread = threading.Thread(target=self._read_rtmp_stream_worker, name="StreamReaderThread", daemon=True)
self.stream_thread.start()
# Wait for the stream_thread to finish
# This happens if FFmpeg exits (e.g., stream ends, error, or manual stop)
self.stream_thread.join()
if self.main_running.is_set(): # If not an intentional stop triggered by self.stop()
logging.warning("RTMP stream connection lost or FFmpeg process ended. Attempting to reconnect in 5 seconds...")
# Clear any potentially stale data from the queue from the previous connection
# Be cautious not to discard data too aggressively if recognizer is slow
# This simple clear assumes new stream is independent
with self.audio_queue.mutex:
self.audio_queue.queue.clear()
time.sleep(5) # Wait before attempting to reconnect
else:
logging.info("Main service loop is stopping. Not attempting reconnect.")
break # Exit the while loop if main_running is cleared
logging.info("Main service loop ended.")
def stop(self):
"""Stops the RTMP speech recognition service gracefully."""
logging.info("Attempting to stop RTMP speech recognition service...")
self.main_running.clear() # Signal all loops and threads to stop
# Terminate FFmpeg process if it's still running
if self.ffmpeg_process and self.ffmpeg_process.poll() is None:
logging.info("Terminating active FFmpeg process...")
self.ffmpeg_process.terminate()
try:
self.ffmpeg_process.wait(timeout=3)
except subprocess.TimeoutExpired:
logging.warning("FFmpeg did not terminate in time, killing.")
self.ffmpeg_process.kill()
# Wait for the stream reading thread to finish
if self.stream_thread and self.stream_thread.is_alive():
logging.info("Waiting for stream reader thread to stop...")
self.stream_thread.join(timeout=5)
# Signal recognition thread to stop and wait for it to finish
if self.recognize_thread and self.recognize_thread.is_alive():
logging.info("Signaling recognition thread to stop and waiting...")
self.audio_queue.put(None) # Ensure it wakes up if waiting on queue.get()
self.recognize_thread.join(timeout=10) # Give it time to process remaining queue items
logging.info("RTMP speech recognition service stopped successfully.")
# Example usage:
if __name__ == "__main__":
# IMPORTANT: Replace with your actual RTMP stream URL
rtmp_stream_url = "rtmp://your-rtmp-server/app/stream_key"
# Optional: If you have downloaded the FunASR model locally, specify its directory path
# model_local_path = "/path/to/your/downloaded/funasr_model"
model_local_path = None # Set to None to use default download from ModelScope
recognizer = RTMPSpeechRecognizer(rtmp_url=rtmp_stream_url, model_dir=model_local_path)
try:
recognizer.start() # Start the recognition service
except KeyboardInterrupt:
logging.info("Keyboard interrupt detected. Shutting down gracefully...")
except Exception as e:
# Catch any unexpected errors in the main execution flow
logging.critical(f"An unhandled critical error occurred in the main application: {e}", exc_info=True)
finally:
# Ensure stop() is called to clean up resources
recognizer.stop()
logging.info("Application exited.")
To run this Python script, you'll need to prepare your Ubuntu 24.04 environment:
sudo apt update && sudo apt install ffmpeg
pip install funasr numpy ffmpeg-python
(Note: `ffmpeg-python` is not directly used in the provided script, which uses `subprocess` for FFmpeg. However, it's a useful library for FFmpeg interactions. The script uses `funasr` and `numpy` might be an indirect dependency or useful for audio manipulation if extending.) The script provided uses `subprocess` directly so `ffmpeg-python` is not a strict dependency for *this* specific script version. `numpy` is often a dependency for FunASR or audio processing libraries.
iic/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch
) from ModelScope. This requires an internet connection. Alternatively, you can pre-download the model and provide the local path.This radar chart provides an opinionated comparison of the implemented FunASR solution against other conceptual ASR approaches across several key metrics. The values range from 1 (low/poor) to 10 (high/excellent), with higher being generally better (except for Setup Complexity and Resource Usage where lower might be preferred depending on context).
This chart helps visualize how the custom Python solution balances various aspects, offering high robustness and good real-time performance with moderate setup complexity and resource usage, compared to simpler offline methods or comprehensive cloud services.
.py
file (e.g., rtmp_asr.py
).rtmp_stream_url
variable in the if __name__ == "__main__":
block of the script to point to your actual RTMP stream.python3.9 rtmp_asr.py
The script's behavior is governed by several key parameters, some hardcoded for optimal performance with the target FunASR model and others configurable:
Parameter | Value / Setting | Description |
---|---|---|
RTMP URL | User-defined (in script) | The network address of the input RTMP stream to be transcribed. |
FunASR Model | iic/speech_paraformer-large_asr_nat-zh-cn-16k-common-vocab8404-pytorch or local path |
The specific pre-trained speech recognition model used by FunASR. Can be a ModelScope ID or a path to a downloaded model. |
Model Loading Type (`model_type`) | offline |
Specifies how FunASR's `AutoModel` loads the core model. Streaming capabilities are then accessed via the generate_stream() method. |
Punctuation (PUNC) | True (in `AutoModel` init) |
Enables the model to automatically insert punctuation marks into the transcribed text. |
Voice Activity Detection (VAD) | True (in `AutoModel` init) |
Allows the model to detect speech segments and ignore periods of silence, improving accuracy and efficiency. |
Target Audio Sample Rate | 16000 Hz | The sample rate FFmpeg converts the audio to, matching the expectation of the FunASR model. |
Target Audio Channels | 1 (Mono) | The number of audio channels FFmpeg converts to, typically mono for ASR. |
Audio Chunk Duration (for FFmpeg read) | ~0.1 seconds | The approximate duration of audio data read from FFmpeg's output pipe in each iteration. Affects responsiveness. |
Automatic Recovery | Enabled | The system is designed to automatically attempt reconnection and restart processing if the RTMP stream is interrupted. |
Logging Level | INFO (configurable) |
Controls the verbosity of log messages printed to the console. Useful for debugging. |
While the following video provides a general introduction to creating speech-to-text programs in Python, the concepts of audio input, processing, and text output are fundamental to understanding the more specialized RTMP streaming solution detailed here. Our script builds upon these basics by incorporating real-time stream handling and advanced ASR models.
General tutorial on creating Speech-to-Text programs with Python.
This Python-based solution provides a robust and efficient way to perform real-time speech recognition on RTMP streams using FunASR. By integrating FFmpeg for stream processing, leveraging FunASR's advanced ASR capabilities, and implementing multithreading with automatic recovery, the system meets the demands for continuous, accurate, and timely transcription. It serves as a solid foundation that can be further customized or scaled for various real-world applications requiring live speech-to-text conversion.