151 lines
5.6 KiB
Python
151 lines
5.6 KiB
Python
# coding=utf-8
|
||
import os
|
||
import sys
|
||
import time
|
||
import threading
|
||
import logging
|
||
|
||
# 添加项目根目录到 sys.path 以便导入 Util
|
||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
if project_root not in sys.path:
|
||
sys.path.append(project_root)
|
||
|
||
from Util.AlyTtsKit import QwenTTSManager
|
||
from WeiXin import WxUtil
|
||
|
||
# ======= 配置 =======
|
||
# 这里的 Voice ID 是从 T2_PlayVoice.py 中复制过来的
|
||
MY_VOICE_ID = "qwen-tts-vc-guanyu-voice-20260131160431051-8e51"
|
||
|
||
# 目标文本
|
||
REPLY_TEXT = "我是少惠林的大张老师,您的孩子几年级了?我们周六周日上班,您可以带孩子过来试听一下。"
|
||
|
||
# 设置日志
|
||
# 强制重新配置日志,确保输出到文件
|
||
root_logger = logging.getLogger()
|
||
for handler in root_logger.handlers[:]:
|
||
root_logger.removeHandler(handler)
|
||
|
||
file_handler = logging.FileHandler(os.path.join(WxUtil.LOG_DIR, "T7_TTS_VoiceReply.log"), mode='w', encoding='utf-8')
|
||
stream_handler = logging.StreamHandler(sys.stdout)
|
||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||
file_handler.setFormatter(formatter)
|
||
stream_handler.setFormatter(formatter)
|
||
root_logger.addHandler(file_handler)
|
||
root_logger.addHandler(stream_handler)
|
||
root_logger.setLevel(logging.INFO)
|
||
|
||
logger = logging.getLogger("T7_TTS")
|
||
|
||
def tts_worker(tts_manager, voice_id, text, finished_event, start_event):
|
||
"""TTS 播音工作线程"""
|
||
try:
|
||
logger.info(f"[TTS] 启动语音合成,音色ID: {voice_id}")
|
||
# 增加缓冲区到 1.0 秒以解决连贯性问题
|
||
tts_manager.start_synthesis(voice_id, [text], wait_finished=False, buffer_seconds=1.0)
|
||
|
||
# 等待播放实际开始并通知主线程
|
||
tts_manager.wait_for_playback_start()
|
||
start_event.set()
|
||
|
||
# 此时等待全部播放完成
|
||
if tts_manager.callback:
|
||
tts_manager.callback.wait_for_finished()
|
||
|
||
logger.info("[TTS] 语音播放完成")
|
||
except Exception as e:
|
||
logger.error(f"[TTS] 播放过程中发生异常: {e}")
|
||
finally:
|
||
finished_event.set()
|
||
|
||
def run_t7_task():
|
||
logger.info("开始执行 T7: 集成 TTS 克隆引擎与微信语音回复 (优化延迟版)")
|
||
|
||
# 1. 连接设备
|
||
d = WxUtil.connect_device()
|
||
if not d:
|
||
logger.error("无法连接设备,任务终止")
|
||
return
|
||
|
||
# 2. 初始化 TTS 管理器
|
||
try:
|
||
tts_manager = QwenTTSManager()
|
||
except Exception as e:
|
||
logger.error(f"初始化 TTS 失败: {e}")
|
||
return
|
||
|
||
# 3. 准备微信环境:确保处于语音输入模式
|
||
logger.info("步骤1: 准备微信环境...")
|
||
tmp_screen = os.path.join(WxUtil.OUTPUT_DIR, "t7_check_mode.jpg")
|
||
d.screenshot(tmp_screen)
|
||
press_say_template = os.path.join(WxUtil.TEMPLATE_DIR, "press_say.jpg")
|
||
pos = WxUtil.match_template_center(tmp_screen, press_say_template, threshold=0.8)
|
||
|
||
if not pos:
|
||
logger.info(">>> [状态] 未发现 '按住说话' 按钮,尝试切换模式...")
|
||
audio_reply_template = os.path.join(WxUtil.TEMPLATE_DIR, "audio_reply.jpg")
|
||
switch_pos = WxUtil.match_template_center(tmp_screen, audio_reply_template, threshold=0.8)
|
||
if switch_pos:
|
||
logger.info(f">>> [切换] 找到切换按钮 {switch_pos},点击切换...")
|
||
d.click(switch_pos[0], switch_pos[1])
|
||
time.sleep(1.5)
|
||
d.screenshot(tmp_screen)
|
||
pos = WxUtil.match_template_center(tmp_screen, press_say_template, threshold=0.8)
|
||
|
||
if not pos:
|
||
logger.error("无法定位到 '按住说话' 按钮,请检查手机界面")
|
||
return
|
||
|
||
logger.info(f">>> [定位] 成功定位到语音按钮中心: {pos}")
|
||
|
||
# 4. 同步执行:先按住 -> 等待2秒 -> 播放
|
||
finished_event = threading.Event()
|
||
playback_start_event = threading.Event()
|
||
tts_thread = threading.Thread(target=tts_worker, args=(tts_manager, MY_VOICE_ID, REPLY_TEXT, finished_event, playback_start_event))
|
||
|
||
logger.info("步骤2: 开始同步录音与播放 (先按住 2 秒再播放)...")
|
||
|
||
# A. 先按住按钮
|
||
logger.info(">>> [发送] 1. 立即按住发送语音按钮...")
|
||
d.touch.down(pos[0], pos[1])
|
||
|
||
# B. 明确等待 2 秒(解决最前面语音丢失问题)
|
||
logger.info(">>> [等待] 2. 录音已启动,等待 2 秒确保微信进入录音状态...")
|
||
time.sleep(2.0)
|
||
|
||
# C. 启动 TTS 线程(开始合成并播放)
|
||
logger.info(">>> [播放] 3. 启动 TTS 播音...")
|
||
tts_thread.start()
|
||
|
||
# D. 等待音频真正开始播放(用于日志同步)
|
||
if playback_start_event.wait(timeout=10):
|
||
logger.info(">>> [同步] 检测到音频已开始从扬声器输出")
|
||
|
||
# 模拟计时
|
||
start_time = time.time()
|
||
last_second = 0
|
||
while not finished_event.is_set():
|
||
elapsed = int(time.time() - start_time)
|
||
if elapsed > last_second:
|
||
logger.info(f">>> [录音中] {elapsed}s")
|
||
last_second = elapsed
|
||
time.sleep(0.1)
|
||
if elapsed > 40: # 调大超时时间
|
||
logger.warning("录音时间过长,强制结束")
|
||
break
|
||
|
||
# E. 释放按钮
|
||
d.touch.up(pos[0], pos[1])
|
||
total_duration = time.time() - start_time + 2.0 # 加上最开始等待的2秒
|
||
logger.info(f">>> [完成] 录音结束,微信录音总时长约 {total_duration:.2f}s")
|
||
|
||
tts_thread.join()
|
||
|
||
# 5. 截图保存结果
|
||
final_screen = os.path.join(WxUtil.OUTPUT_DIR, "T7_Final_Result_Optimized.jpg")
|
||
d.screenshot(final_screen)
|
||
logger.info(f"任务结束,结果截图已保存至: {final_screen}")
|
||
|
||
if __name__ == "__main__":
|
||
run_t7_task()
|