This commit is contained in:
HuangHai
2026-01-20 21:43:54 +08:00
parent 66cb0faeff
commit 55e88777d9
32 changed files with 1112 additions and 60 deletions

View File

@@ -0,0 +1,301 @@
import os
import logging
import uuid
import shutil
import subprocess
import asyncio
from datetime import datetime
from typing import List, Optional
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel
import pymysql
# Import custom modules
from Config.Config import OBS_CLOUD_PREFIX, OBS_BUCKET, OBS_TMP_PREFIX, DORIS_HOST, DORIS_PORT, DORIS_USER, DORIS_PWD, DORIS_DATABASE, OBS_SERVER
from Util.DouYinDownloader import DouYinDownloader
from Util.ObsUtil import ObsUploader
from Util.ASRClient import ASRClient
from Util.LlmUtil import get_llm_response
# Logger setup
logger = logging.getLogger(__name__)
router = APIRouter()
# Database connection
def get_db_connection():
return pymysql.connect(
host=DORIS_HOST,
port=DORIS_PORT,
user=DORIS_USER,
password=DORIS_PWD,
database=DORIS_DATABASE,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
class ParseRequest(BaseModel):
text: str
def update_status(id, status, error_msg=None):
try:
conn = get_db_connection()
cursor = conn.cursor()
if error_msg:
sql = "UPDATE t_douyin_record SET status=%s, error_msg=%s WHERE id=%s"
cursor.execute(sql, (status, error_msg, id))
else:
sql = "UPDATE t_douyin_record SET status=%s WHERE id=%s"
cursor.execute(sql, (status, id))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"DB Error update_status: {e}")
def update_record(id, title, obs_url, transcript, status):
try:
# Truncate title to 100 chars to fit DB schema (approx 400 bytes max for utf8mb4)
if title and len(title) > 100:
title = title[:100] + "..."
conn = get_db_connection()
cursor = conn.cursor()
sql = """
UPDATE t_douyin_record
SET video_name=%s, obs_url=%s, transcript=%s, status=%s
WHERE id=%s
"""
cursor.execute(sql, (title, obs_url, transcript, status, id))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"DB Error update_record: {e}")
async def process_video_task(url: str, request_id: str, share_text: str = ""):
logger.info(f"Processing task {request_id}")
# 1. Update status
await asyncio.to_thread(update_status, request_id, "PROCESSING")
temp_dir = os.path.abspath(f"temp_{request_id}")
try:
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
# 2. Parse & Download
downloader = DouYinDownloader()
# url is passed directly now
if not url:
raise Exception("No valid URL found")
logger.info(f"Downloading from {url}")
# Run download in thread to avoid blocking main loop
local_video_path, title = await asyncio.to_thread(downloader.download_video, url, temp_dir)
# Title handling strategy:
# Priority 1: Extracted from share text (if available and valid)
# Priority 2: Extracted from video download (often "Unknown Title")
# Priority 3: Generated by LLM (done later)
extracted_title = downloader.extract_title_from_text(share_text)
logger.info(f"Extracted title from text: {extracted_title}")
# If we have a valid extracted title, use it.
# But if we don't have a title yet (or it's Unknown), we definitely want to use extracted_title.
# Even if we have a title from yt-dlp, if it's just "Unknown Title", we prefer extracted one.
if extracted_title and extracted_title != "Unknown Title":
title = extracted_title
elif not title:
title = "Unknown Title"
if not local_video_path or not os.path.exists(local_video_path):
raise Exception("Download failed")
# 3. Upload Video to OBS (Long term storage)
logger.info("Uploading video to OBS...")
uploader = ObsUploader()
video_filename = os.path.basename(local_video_path)
obs_video_key = f"{OBS_CLOUD_PREFIX}/DouYin/{video_filename}"
success, _ = await asyncio.to_thread(uploader.upload_file, obs_video_key, local_video_path, OBS_BUCKET)
if not success:
raise Exception("OBS Upload failed")
# Construct public URL (Assuming standard OBS pattern or Config logic)
obs_url = f"https://{OBS_BUCKET}.{OBS_SERVER}/{obs_video_key}"
# 4. Convert to MP3
logger.info("Converting to MP3...")
mp3_path = os.path.splitext(local_video_path)[0] + ".mp3"
cmd = [
"ffmpeg", "-y", "-i", local_video_path,
"-acodec", "libmp3lame", "-ar", "16000", "-ac", "1", "-q:a", "2",
mp3_path
]
# Run ffmpeg in thread
result = await asyncio.to_thread(subprocess.run, cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
if result.returncode != 0:
raise Exception(f"FFmpeg failed: {result.stderr.decode()}")
# 5. ASR (Upload MP3 to tmp and transcribe)
logger.info("Transcribing...")
asr = ASRClient()
# Run ASR in thread
transcript = await asyncio.to_thread(asr.upload_and_transcribe_sync, mp3_path)
if not transcript:
raise Exception("Transcription failed (returned empty)")
# 6. LLM Title Generation (Enhancement)
# If the title is still Unknown or weak, OR if we just want to ensure we have a good title.
# The user said: "Alternatively, call LlmUtil.py to summarize title".
# Let's do it if title is Unknown or matches default filename pattern, OR if extracted title was also missing.
if (not title or title == "Unknown Title" or title == "Unknown"):
try:
logger.info("Generating title from transcript via LLM...")
prompt = f"请根据以下视频文案总结一个简短的标题20字以内不要包含任何解释性文字直接返回标题\n\n{transcript[:1000]}"
llm_title_chunks = []
# get_llm_response is already async
async for chunk in get_llm_response(prompt, stream=False):
llm_title_chunks.append(chunk)
llm_title = "".join(llm_title_chunks)
if llm_title:
# Clean up quotes if any
llm_title = llm_title.strip().strip('"').strip('').strip('')
logger.info(f"LLM generated title: {llm_title}")
# We overwrite the title if LLM succeeds
title = llm_title
except Exception as llm_e:
logger.warning(f"LLM Title generation failed: {llm_e}")
# 7. Save to DB (Update)
logger.info("Saving to DB...")
await asyncio.to_thread(update_record, request_id, title, obs_url, transcript, "COMPLETED")
logger.info(f"Task {request_id} completed successfully.")
except Exception as e:
logger.error(f"Task {request_id} failed: {e}", exc_info=True)
await asyncio.to_thread(update_status, request_id, "FAILED", str(e))
finally:
# 8. Cleanup
if os.path.exists(temp_dir):
try:
# shutil.rmtree is sync, wrap it
await asyncio.to_thread(shutil.rmtree, temp_dir, ignore_errors=True)
except Exception as e:
logger.error(f"Cleanup failed: {e}")
@router.post("/api/parse")
def parse(request: ParseRequest, background_tasks: BackgroundTasks):
downloader = DouYinDownloader()
urls = downloader.extract_urls(request.text)
if not urls:
# If no URLs found, try using the text as is (might be a direct link not caught by regex)
# But regex is quite broad. Let's just fail or try one.
# Let's assume text might be the URL if it's clean.
if request.text.startswith("http"):
urls = [request.text]
else:
raise HTTPException(status_code=400, detail="No valid URLs found")
created_ids = []
try:
conn = get_db_connection()
cursor = conn.cursor()
for url in urls:
req_id = str(uuid.uuid4())
sql = """
INSERT INTO t_douyin_record (id, original_text, status, create_time)
VALUES (%s, %s, 'PENDING', %s)
"""
cursor.execute(sql, (req_id, url, datetime.now()))
created_ids.append(req_id)
# Pass request.text (the full share text) so we can extract title from it
background_tasks.add_task(process_video_task, url, req_id, request.text)
conn.commit()
conn.close()
except Exception as e:
raise HTTPException(status_code=500, detail=f"DB Init Error: {e}")
return {"id": created_ids[0] if created_ids else None, "ids": created_ids, "status": "PENDING"}
@router.get("/api/records")
def get_records():
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM t_douyin_record ORDER BY create_time DESC LIMIT 50")
records = cursor.fetchall()
conn.close()
# Manually handle datetime serialization to be safe
for r in records:
if 'create_time' in r and r['create_time']:
r['create_time'] = r['create_time'].strftime("%Y-%m-%d %H:%M:%S")
if 'update_time' in r and r['update_time']:
r['update_time'] = r['update_time'].strftime("%Y-%m-%d %H:%M:%S")
return records
except Exception as e:
logger.error(f"Get records error: {e}", exc_info=True)
return []
@router.delete("/api/records/{id}")
def delete_record(id: str):
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM t_douyin_record WHERE id=%s", (id,))
conn.commit()
conn.close()
return {"status": "deleted"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def recover_pending_tasks():
"""
Check for tasks stuck in PENDING or PROCESSING state (due to server restart)
and restart them.
"""
logger.info("Scanning for interrupted Douyin tasks...")
try:
# Use asyncio.to_thread for DB operation
def fetch_pending():
conn = get_db_connection()
cursor = conn.cursor()
# Select recent pending/processing tasks (limit 20 to avoid storm)
sql = """
SELECT id, original_text, status
FROM t_douyin_record
WHERE status IN ('PENDING', 'PROCESSING')
ORDER BY create_time DESC LIMIT 20
"""
cursor.execute(sql)
tasks = cursor.fetchall()
conn.close()
return tasks
tasks = await asyncio.to_thread(fetch_pending)
if not tasks:
logger.info("No interrupted tasks found.")
return
logger.info(f"Found {len(tasks)} interrupted tasks. Restarting...")
for task in tasks:
req_id = task['id']
url = task['original_text']
# Restart task in background
# Note: We lost the original share text for title extraction,
# so we pass empty string. It will use the URL or 'Unknown Title'.
# If LLM is enabled, it might fix the title later.
asyncio.create_task(process_video_task(url, req_id, share_text=""))
except Exception as e:
logger.error(f"Failed to recover tasks: {e}", exc_info=True)