Skip to main content

二、录制麦克风并向FunASR API 发送录音实现语音识别

 目前需要执行以下操作:

  1. 列出可用的麦克风设备,并让您选择一个。
  2. 持续监听所选麦克风的音频输入。
  3. 当检测到音量超过预设阈值并持续一段时间(例如0.3秒)后,开始录制音频。
  4. 当音量低于预设阈值并持续一段时间(例如0.5秒)后,停止录制。
  5. 将录制好的音频发送到 FunASR API (http://localhost:5000/transcribe) 进行语音识别。
  6. 打印 API 返回的识别文本。

重要前提:

  • FunASR API 服务正在运行:请确保您之前创建的 funasr_api_service Flask 应用正在本地 http://localhost:5000 上运行。
  • Python 库:您需要安装以下 Python 库。如果尚未安装,请使用 pip 安装:
    Bash

    pip install sounddevice numpy requests
    

关于0.3毫秒的说明: 您提到的“0.3毫秒”对于语音活动检测来说是一个非常短的时间(0.0003秒),通常不足以可靠地判断语音的开始或结束。在此脚本中,我将其理解为 0.3秒 (300毫秒) 用于判断语音开始,以及 0.5秒 (500毫秒) 用于判断语音结束。这些值可以在代码中调整。

import sounddevice as sd
import numpy as np
import requests
import wave
import io
import time
import threading
import queue

# --- 配置参数 ---
API_URL = "http://localhost:5000/transcribe"  # FunASR API 服务地址
# TARGET_SAMPLE_RATE 仍然用于尝试以该速率打开麦克风
# 但如果失败,将使用设备的默认采样率,并且该采样率将用于发送
TARGET_SAMPLE_RATE = 16000  # 尝试使用的目标采样率 (Hz)
CHANNELS = 1  # 声道数 (单声道)
BLOCK_DURATION_S = 0.05  # 每次回调处理的音频块时长(秒)
VOLUME_THRESHOLD = 0.02  # 音量阈值 (RMS, 范围 0.0 到 1.0)

SPEECH_CONFIRM_S = 0.3  # 声音持续超过阈值多少秒后开始录制
SILENCE_CONFIRM_S = 0.8  # 声音持续低于阈值多少秒后停止录制

# --- 全局变量 ---
recording_frames = []
is_currently_recording = False
consecutive_silent_blocks = 0
consecutive_speech_blocks = 0
audio_processing_queue = queue.Queue()
selected_device_id = None
actual_sample_rate = TARGET_SAMPLE_RATE # 麦克风实际使用的采样率
block_size_frames = 0
speech_trigger_blocks_count = 0
silence_trigger_blocks_count = 0


def list_microphones():
    """列出所有可用的输入设备 (麦克风) 并让用户选择。"""
    print("可用的麦克风设备:")
    devices = sd.query_devices()
    input_devices_info = []
    
    idx_counter = 0
    for i, device in enumerate(devices):
        if device['max_input_channels'] > 0:
            default_sr = device.get('default_samplerate', 'N/A')
            device_name = device['name']
            host_api_name = sd.query_hostapis(device['hostapi'])['name'] if 'hostapi' in device and device['hostapi'] is not None else 'N/A'
            print(f"  {idx_counter}) {device_name} (ID: {i}, 默认采样率: {default_sr} Hz, HostAPI: {host_api_name})")
            input_devices_info.append({'id': i, 'name': device_name, 'default_samplerate': default_sr})
            idx_counter += 1

    if not input_devices_info:
        print("错误:未找到可用的麦克风设备。")
        return None

    while True:
        try:
            choice = int(input("请选择麦克风设备的序号: "))
            if 0 <= choice < len(input_devices_info):
                return input_devices_info[choice]['id']
            else:
                print("无效的选择,请输入列表中的序号。")
        except ValueError:
            print("无效的输入,请输入数字。")

def calculate_rms(data):
    """计算音频数据的均方根 (RMS)。"""
    return np.sqrt(np.mean(data**2))

def save_and_send_audio(audio_data_np, recorded_sample_rate, api_url):
    """
    将音频数据保存为WAV格式,以其录制采样率发送到API,并打印结果。
    注意:API可能期望特定的采样率(如16000Hz)。以不同采样率发送可能导致识别问题。
    """
    print("\n正在处理录音数据...")
    if audio_data_np.size == 0:
        print("没有录到音频数据。")
        return

    final_sample_rate_for_api = recorded_sample_rate # 直接使用录制采样率

    try:
        # 将 float32 (-1.0 到 1.0) 转换为 int16 (PCM)
        audio_data_int16 = (audio_data_np * 32767).astype(np.int16)

        wav_buffer = io.BytesIO()
        with wave.open(wav_buffer, 'wb') as wf:
            wf.setnchannels(CHANNELS)
            wf.setsampwidth(2)  # int16 需要 2 字节
            wf.setframerate(int(final_sample_rate_for_api)) # 使用录制采样率
            wf.writeframes(audio_data_int16.tobytes())
        wav_buffer.seek(0)

        files = {'audio_file': ('recording.wav', wav_buffer, 'audio/wav')}
        duration_sec = len(audio_data_int16) / final_sample_rate_for_api
        print(f"发送 {duration_sec:.2f} 秒的音频 (采样率 {final_sample_rate_for_api}Hz) 到 API: {api_url}")
        
        response = requests.post(api_url, files=files, timeout=30)
        response.raise_for_status()

        result = response.json()
        if "transcription" in result and result["transcription"]:
            print(f"识别结果: {result['transcription']}")
        elif "error" in result:
            print(f"API 返回错误: {result['error']}")
        else:
            print(f"API 未返回有效识别结果或错误信息: {result}")

    except requests.exceptions.Timeout:
        print("错误: API 请求超时。")
    except requests.exceptions.ConnectionError:
        print(f"错误: 无法连接到 API ({api_url})。请确保服务正在运行。")
    except requests.exceptions.RequestException as e:
        print(f"发送到 API 时出错: {e}")
    except Exception as e:
        print(f"处理录音或 API 响应时发生意外错误: {e}")
    finally:
        if 'wav_buffer' in locals() and wav_buffer:
            wav_buffer.close()


def audio_processing_worker():
    """工作线程,从队列中获取录音数据并处理。"""
    while True:
        try:
            audio_data_np, rate = audio_processing_queue.get(timeout=1)
            save_and_send_audio(audio_data_np, rate, API_URL)
            audio_processing_queue.task_done()
        except queue.Empty:
            continue
        except Exception as e:
            print(f"音频处理工作线程发生错误: {e}")


def audio_callback(indata, frames, callback_time, status):
    """音频输入流的回调函数。"""
    global is_currently_recording, recording_frames
    global consecutive_silent_blocks, consecutive_speech_blocks
    global speech_trigger_blocks_count, silence_trigger_blocks_count, actual_sample_rate

    if status:
        print(f"音频流状态信息: {status}", flush=True)

    volume_norm = calculate_rms(indata[:, 0])

    if is_currently_recording:
        recording_frames.append(indata.copy())
        if volume_norm < VOLUME_THRESHOLD:
            consecutive_silent_blocks += 1
            if consecutive_silent_blocks >= silence_trigger_blocks_count:
                print(f"\n检测到静音 ({SILENCE_CONFIRM_S}s),停止录音。", flush=True)
                is_currently_recording = False
                if recording_frames:
                    full_recording = np.concatenate(recording_frames)
                    # 将录制的音频(以 actual_sample_rate 录制)和其采样率放入队列
                    audio_processing_queue.put((full_recording, actual_sample_rate))
                recording_frames = []
                consecutive_speech_blocks = 0
        else:
            consecutive_silent_blocks = 0
            print(".", end="", flush=True)
    else: 
        if volume_norm >= VOLUME_THRESHOLD:
            consecutive_speech_blocks += 1
            if consecutive_speech_blocks >= speech_trigger_blocks_count:
                print(f"\n检测到语音 ({SPEECH_CONFIRM_S}s),开始录音...", flush=True)
                is_currently_recording = True
                recording_frames = [indata.copy()]
                consecutive_silent_blocks = 0
        else:
            consecutive_speech_blocks = 0

def main():
    global selected_device_id, actual_sample_rate, block_size_frames
    global speech_trigger_blocks_count, silence_trigger_blocks_count

    selected_device_id = list_microphones()
    if selected_device_id is None:
        return

    print(f"\n您选择了麦克风设备 ID: {selected_device_id}")

    try:
        # 1. 尝试使用目标采样率 (e.g., 16000 Hz)
        sd.check_input_settings(device=selected_device_id, samplerate=TARGET_SAMPLE_RATE, channels=CHANNELS)
        actual_sample_rate = TARGET_SAMPLE_RATE
        print(f"设备 ID {selected_device_id} 支持目标采样率 {TARGET_SAMPLE_RATE} Hz 和 {CHANNELS} 声道。")
        print(f"将以 {actual_sample_rate} Hz 进行录制和发送。")
    except sd.PortAudioError:
        print(f"设备 ID {selected_device_id} 不直接支持目标采样率 {TARGET_SAMPLE_RATE} Hz。")
        try:
            # 2. 尝试使用设备的默认采样率
            device_info = sd.query_devices(selected_device_id)
            default_sr_str = device_info.get('default_samplerate', '')
            if not default_sr_str:
                 raise ValueError("设备未提供有效的默认采样率。")
            default_sr = int(float(default_sr_str))
            
            if default_sr <= 0:
                raise ValueError(f"设备的默认采样率 ({default_sr}) 无效。")

            print(f"尝试使用设备的默认采样率: {default_sr} Hz。")
            sd.check_input_settings(device=selected_device_id, samplerate=default_sr, channels=CHANNELS)
            actual_sample_rate = default_sr
            print(f"将以设备的默认采样率 {actual_sample_rate} Hz 进行录制和发送。")
            if actual_sample_rate != TARGET_SAMPLE_RATE:
                 print(f"警告: FunASR API 通常期望 {TARGET_SAMPLE_RATE} Hz。以 {actual_sample_rate} Hz 发送可能导致识别效果不佳。")


        except (sd.PortAudioError, ValueError, TypeError) as e_default:
            print(f"错误: 设备 ID {selected_device_id} 也不支持其默认采样率,或默认采样率无效/无法使用。")
            print(f"错误信息: {e_default}")
            print("请尝试选择其他麦克风设备,或检查您的麦克风驱动程序和操作系统音频设置。")
            return
        except Exception as e_check:
            print(f"检查设备默认采样率时发生未知错误: {e_check}")
            return
            
    except Exception as e:
        print(f"检查设备能力时发生未知错误: {e}")
        return

    block_size_frames = int(actual_sample_rate * BLOCK_DURATION_S)
    speech_trigger_blocks_count = int(SPEECH_CONFIRM_S / BLOCK_DURATION_S)
    silence_trigger_blocks_count = int(SILENCE_CONFIRM_S / BLOCK_DURATION_S)

    print(f"实际使用麦克风采样率: {actual_sample_rate} Hz")
    print(f"音频块大小: {block_size_frames} 帧 ({BLOCK_DURATION_S*1000:.0f} ms)")
    print(f"音量阈值 (RMS): {VOLUME_THRESHOLD}")
    print(f"语音确认时长: {SPEECH_CONFIRM_S} 秒 ({speech_trigger_blocks_count} 个连续音频块)")
    print(f"静音确认时长: {SILENCE_CONFIRM_S} 秒 ({silence_trigger_blocks_count} 个连续音频块)")
    print("\n正在监听麦克风... (按 Ctrl+C 退出)")

    worker_thread = threading.Thread(target=audio_processing_worker, daemon=True)
    worker_thread.start()

    try:
        with sd.InputStream(device=selected_device_id,
                             channels=CHANNELS,
                             samplerate=actual_sample_rate,
                             blocksize=block_size_frames,
                             callback=audio_callback,
                             dtype='float32'):
            while True:
                time.sleep(0.1)
    except KeyboardInterrupt:
        print("\n程序被用户中断。正在退出...")
    except Exception as e:
        print(f"打开音频流或主循环时发生错误: {e}")
    finally:
        print("程序结束。")
        if audio_processing_queue and not audio_processing_queue.empty():
            print("等待剩余音频处理完成...")
            audio_processing_queue.join() 
        print("所有任务已处理完毕。")

if __name__ == "__main__":
    main()

使用的注意事项:

  1. 确保 FunASR API 服务正在运行:在另一个终端中启动 funasr_api_service.py
  2. 保存脚本:将上面的代码保存为一个 Python 文件(例如 mic_client.py)。
  3. 运行脚本:在终端中执行 python mic_client.py
  4. 选择麦克风:脚本会列出可用的麦克风设备,您需要输入相应设备的序号并按 Enter。
  5. 开始说话:脚本会开始监听。当您说话的声音足够大并持续一段时间后,它会自动开始录音(您会看到提示信息和 . 字符表示正在录音)。
  6. 停止说话:当您停止说话一段时间后,录音会自动结束,音频数据会被发送到 API 进行识别,并在终端打印出识别结果。
  7. 重复:您可以继续说话,脚本会重复上述检测、录制和识别过程。
  8. 退出:按 Ctrl+C 可以停止脚本。

可调整参数:

  • VOLUME_THRESHOLD:这个值可能需要根据您的麦克风灵敏度和环境噪音进行调整。如果录音过于频繁或难以触发,请尝试调整此值。
  • SPEECH_CONFIRM_SSILENCE_CONFIRM_S:调整这些时长可以改变语音活动检测的灵敏度。
  • API_URL:如果您的 FunASR API 服务运行在不同的地址或端口,请修改此项。

这个脚本提供了一个基本的语音活动检测和通过 API 进行识别的框架。您可以根据需要进一步优化和扩展。