import os import re import logging import yt_dlp import uuid import requests from typing import Optional, Tuple logger = logging.getLogger(__name__) class DouYinDownloader: def __init__(self): self.mobile_headers = { "User-Agent": "Mozilla/5.0 (Linux; Android 10; SM-G960F) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.181 Mobile Safari/537.36", "Referer": "https://www.douyin.com/" } self.ydl_opts = { 'format': 'best', # Download best quality 'outtmpl': '%(id)s.%(ext)s', 'quiet': True, 'no_warnings': True, 'http_headers': self.mobile_headers, # 'proxy': '...', # Add proxy if needed } def parse_share_text(self, text: str) -> Optional[str]: """Extract first URL from share text""" urls = self.extract_urls(text) if urls: return urls[0] return None def extract_urls(self, text: str) -> list[str]: """Extract all URLs from text""" return re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text) def extract_title_from_text(self, text: str) -> str: """ Extract title from share text by removing URLs and common prefixes """ # 1. Remove URLs clean_text = re.sub(r'http[s]?://\S+', '', text) # 2. Remove "Copy open Douyin..." prefix patterns # Example: "3.00 12/28 d@n.dN VYZ:/ 复制打开抖音极速版,看看【聚合能研的作品】..." # Pattern: Any chars + "复制打开抖音" + any chars + ",看看" clean_text = re.sub(r'.*?复制打开抖音.*?,看看', '', clean_text) # 3. Remove 【...】 if it's at the start (usually author name) clean_text = re.sub(r'^\s*【.*?】', '', clean_text) # 4. Clean up whitespace clean_text = clean_text.strip() # 5. If text is too long, truncate? No, keep it. # If empty, return "Unknown Title" return clean_text if clean_text else "Unknown Title" def get_video_info(self, url: str) -> Tuple[Optional[str], Optional[str]]: """ Get video title and real URL using yt-dlp Returns: (title, webpage_url) """ try: with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: info = ydl.extract_info(url, download=False) return info.get('title'), info.get('webpage_url') except Exception as e: logger.error(f"Error getting video info: {e}") return None, None def download_video_fallback(self, url: str, output_dir: str) -> Tuple[Optional[str], Optional[str]]: """ Fallback download method using requests and mobile User-Agent """ try: logger.info(f"Attempting fallback download for {url}") # 1. Get real URL (follow redirects) session = requests.Session() response = session.get(url, headers=self.mobile_headers, allow_redirects=True, timeout=10) final_url = response.url content = response.text # 2. Extract video URL video_url = None urls = re.findall(r'"url_list":\["(.*?)"\]', content) if urls: for u in urls: if "playwm" in u: video_url = u.replace("\\u002F", "/") break if not video_url: logger.error("Fallback: No video URL found in page content") return None, None # 3. Download video file_uuid = str(uuid.uuid4()) filename = os.path.join(output_dir, f'{file_uuid}.mp4') logger.info(f"Fallback downloading video from {video_url}") # Use stream to download r = requests.get(video_url, headers=self.mobile_headers, stream=True, timeout=30) if r.status_code == 200: with open(filename, 'wb') as f: for chunk in r.iter_content(chunk_size=1024*1024): if chunk: f.write(chunk) # Try to extract title title = "Unknown Title" title_match = re.search(r'