Files
aiData/Util/AlyTtsKit.py

194 lines
7.2 KiB
Python
Raw Permalink Normal View History

2026-01-31 15:44:39 +08:00
# coding=utf-8
import pyaudio
import os
import sys
import requests
import base64
import pathlib
import threading
import time
import dashscope
from dashscope.audio.qwen_tts_realtime import QwenTtsRealtime, QwenTtsRealtimeCallback, AudioFormat
# 添加项目根目录到 sys.path 以便导入 Config
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if root_dir not in sys.path:
sys.path.append(root_dir)
try:
from Config.Config import ALY_LLM_API_KEY
except ImportError:
ALY_LLM_API_KEY = os.getenv("DASHSCOPE_API_KEY")
# ======= 常量配置 =======
DEFAULT_TARGET_MODEL = "qwen3-tts-vc-realtime-2026-01-15"
DEFAULT_PREFERRED_NAME = "guanyu"
DEFAULT_AUDIO_MIME_TYPE = "audio/mpeg"
class MyCallback(QwenTtsRealtimeCallback):
"""
2026-01-31 16:31:22 +08:00
自定义 TTS 流式回调增加了播放缓冲区以解决语音断续问题
2026-01-31 15:44:39 +08:00
"""
2026-01-31 16:31:22 +08:00
def __init__(self, buffer_seconds=0.5):
2026-01-31 15:44:39 +08:00
self.complete_event = threading.Event()
2026-01-31 16:31:22 +08:00
self.playback_start_event = threading.Event() # 新增:播放开始事件
2026-01-31 15:44:39 +08:00
self._player = pyaudio.PyAudio()
self._stream = self._player.open(
2026-01-31 16:31:22 +08:00
format=pyaudio.paInt16, channels=1, rate=24000, output=True,
frames_per_buffer=1024
2026-01-31 15:44:39 +08:00
)
2026-01-31 16:31:22 +08:00
self.audio_queue = [] # 缓冲区
self.buffer_threshold = int(24000 * 2 * buffer_seconds) # 字节数 (16bit=2bytes)
self.is_playing = False
self.playback_thread = None
self.all_data_received = False
2026-01-31 15:44:39 +08:00
def on_open(self) -> None:
print('[TTS] 连接已建立')
def on_close(self, close_status_code, close_msg) -> None:
2026-01-31 16:31:22 +08:00
self.all_data_received = True
if self.playback_thread:
self.playback_thread.join()
2026-01-31 15:44:39 +08:00
if self._stream:
self._stream.stop_stream()
self._stream.close()
if self._player:
self._player.terminate()
print(f'[TTS] 连接关闭 code={close_status_code}, msg={close_msg}')
2026-01-31 16:31:22 +08:00
def _playback_worker(self):
"""播放线程,从缓冲区读取并播放"""
print("[TTS] 缓冲区达到阈值,开始播放...")
self.playback_start_event.set() # 触发播放开始事件
chunk_count = 0
while not self.all_data_received or self.audio_queue:
if self.audio_queue:
data = self.audio_queue.pop(0)
self._stream.write(data)
chunk_count += 1
else:
if self.all_data_received:
break
time.sleep(0.01) # 等待数据
print(f"[TTS] 播放线程结束,共播放 {chunk_count} 个数据块")
2026-01-31 15:44:39 +08:00
def on_event(self, response: dict) -> None:
try:
event_type = response.get('type', '')
if event_type == 'session.created':
print(f'[TTS] 会话开始: {response["session"]["id"]}')
elif event_type == 'response.audio.delta':
audio_data = base64.b64decode(response['delta'])
2026-01-31 16:31:22 +08:00
self.audio_queue.append(audio_data)
# 计算当前已缓冲的总字节数
current_buffer_size = sum(len(d) for d in self.audio_queue)
# 如果还没开始播放且达到阈值,启动播放线程
if not self.is_playing and current_buffer_size >= self.buffer_threshold:
self.is_playing = True
self.playback_thread = threading.Thread(target=self._playback_worker)
self.playback_thread.start()
2026-01-31 15:44:39 +08:00
elif event_type == 'response.done':
2026-01-31 16:31:22 +08:00
total_size = sum(len(d) for d in self.audio_queue)
print(f'[TTS] 响应完成,总接收音频大小: {total_size} 字节')
2026-01-31 15:44:39 +08:00
elif event_type == 'session.finished':
print('[TTS] 会话结束')
2026-01-31 16:31:22 +08:00
self.all_data_received = True
2026-01-31 15:44:39 +08:00
self.complete_event.set()
2026-01-31 17:29:57 +08:00
# [Fix] 如果直到会话结束都还没达到缓冲阈值(短文本情况),强制启动播放
if not self.is_playing:
print("[TTS] 会话已结束但未达缓冲阈值,强制启动播放...")
self.is_playing = True
self.playback_thread = threading.Thread(target=self._playback_worker)
self.playback_thread.start()
2026-01-31 15:44:39 +08:00
except Exception as e:
print(f'[Error] 处理回调事件异常: {e}')
def wait_for_finished(self):
self.complete_event.wait()
2026-01-31 16:31:22 +08:00
# 确保播放线程也执行完毕
if self.playback_thread and self.playback_thread.is_alive():
self.playback_thread.join()
2026-01-31 15:44:39 +08:00
class QwenTTSManager:
"""
通义千问实时语音合成管理类
"""
def __init__(self, api_key=ALY_LLM_API_KEY, model=DEFAULT_TARGET_MODEL):
self.api_key = api_key
self.model = model
dashscope.api_key = self.api_key
self.callback = None
self.qwen_tts_realtime = None
def create_voice_enrollment(self, file_path, preferred_name=DEFAULT_PREFERRED_NAME):
"""
创建声音复刻音色
"""
file_path_obj = pathlib.Path(file_path)
if not file_path_obj.exists():
raise FileNotFoundError(f"音频文件不存在: {file_path}")
base64_str = base64.b64encode(file_path_obj.read_bytes()).decode()
data_uri = f"data:{DEFAULT_AUDIO_MIME_TYPE};base64,{base64_str}"
url = "https://dashscope.aliyuncs.com/api/v1/services/audio/tts/customization"
payload = {
"model": "qwen-voice-enrollment",
"input": {
"action": "create",
"target_model": self.model,
"preferred_name": preferred_name,
"audio": {"data": data_uri}
}
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
resp = requests.post(url, json=payload, headers=headers)
if resp.status_code != 200:
raise RuntimeError(f"创建 voice 失败: {resp.status_code}, {resp.text}")
return resp.json()["output"]["voice"]
2026-01-31 16:31:22 +08:00
def start_synthesis(self, voice_id, text_list, wait_finished=True, buffer_seconds=0.5):
2026-01-31 15:44:39 +08:00
"""
开始实时语音合成并播放
"""
2026-01-31 16:31:22 +08:00
self.callback = MyCallback(buffer_seconds=buffer_seconds)
2026-01-31 15:44:39 +08:00
self.qwen_tts_realtime = QwenTtsRealtime(
model=self.model,
callback=self.callback,
url='wss://dashscope.aliyuncs.com/api-ws/v1/realtime'
)
self.qwen_tts_realtime.connect()
self.qwen_tts_realtime.update_session(
voice=voice_id,
response_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
mode='server_commit'
)
for text in text_list:
print(f'[发送文本]: {text}')
self.qwen_tts_realtime.append_text(text)
time.sleep(0.1)
self.qwen_tts_realtime.finish()
2026-01-31 16:31:22 +08:00
if wait_finished:
self.callback.wait_for_finished()
return self.callback
def wait_for_playback_start(self):
"""等待播放实际开始"""
if self.callback:
self.callback.playback_start_event.wait()