#!/usr/bin/env python # -*- coding: utf-8 -*- import requests import re import json import time import hashlib import base64 import random import os import logging import subprocess from typing import Optional, Dict # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("VideoDownloader") class Utils: """Utility class for X-Bogus generation and other helpers.""" def __init__(self): pass def getttwid(self): url = 'https://ttwid.bytedance.com/ttwid/union/register/' data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}' try: res = requests.post(url=url, data=data, timeout=10) for i, j in res.cookies.items(): return j except Exception as e: logging.error(f"Failed to get ttwid: {e}") return None def getXbogus(self, payload, form='', ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'): xbogus = self.get_xbogus(payload, ua, form) params = payload + "&X-Bogus=" + xbogus return params def get_xbogus(self, payload, ua, form): short_str = "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=" arr2 = self.get_arr2(payload, ua, form) garbled_string = self.get_garbled_string(arr2) xbogus = "" for i in range(0, 21, 3): char_code_num0 = garbled_string[i] char_code_num1 = garbled_string[i + 1] char_code_num2 = garbled_string[i + 2] base_num = char_code_num2 | char_code_num1 << 8 | char_code_num0 << 16 str1 = short_str[(base_num & 16515072) >> 18] str2 = short_str[(base_num & 258048) >> 12] str3 = short_str[(base_num & 4032) >> 6] str4 = short_str[base_num & 63] xbogus += str1 + str2 + str3 + str4 return xbogus def get_garbled_string(self, arr2): p = [ arr2[0], arr2[10], arr2[1], arr2[11], arr2[2], arr2[12], arr2[3], arr2[13], arr2[4], arr2[14], arr2[5], arr2[15], arr2[6], arr2[16], arr2[7], arr2[17], arr2[8], arr2[18], arr2[9] ] char_array = [chr(i) for i in p] f = [] f.extend([2, 255]) tmp = ['ÿ'] bytes_ = self._0x30492c(tmp, "".join(char_array)) for i in range(len(bytes_)): f.append(bytes_[i]) return f def get_arr2(self, payload, ua, form): salt_payload_bytes = hashlib.md5(hashlib.md5(payload.encode()).digest()).digest() salt_payload = [byte for byte in salt_payload_bytes] salt_form_bytes = hashlib.md5(hashlib.md5(form.encode()).digest()).digest() salt_form = [byte for byte in salt_form_bytes] ua_key = ['\u0000', '\u0001', '\u000e'] salt_ua_bytes = hashlib.md5(base64.b64encode(self._0x30492c(ua_key, ua))).digest() salt_ua = [byte for byte in salt_ua_bytes] timestamp = int(time.time()) canvas = 1489154074 arr1 = [ 64, 0, 1, 14, salt_payload[14], salt_payload[15], salt_form[14], salt_form[15], salt_ua[14], salt_ua[15], (timestamp >> 24) & 255, (timestamp >> 16) & 255, (timestamp >> 8) & 255, (timestamp >> 0) & 255, (canvas >> 24) & 255, (canvas >> 16) & 255, (canvas >> 8) & 255, (canvas >> 0) & 255, 64 ] for i in range(1, len(arr1) - 1): arr1[18] ^= arr1[i] arr2 = [arr1[0], arr1[2], arr1[4], arr1[6], arr1[8], arr1[10], arr1[12], arr1[14], arr1[16], arr1[18], arr1[1], arr1[3], arr1[5], arr1[7], arr1[9], arr1[11], arr1[13], arr1[15], arr1[17]] return arr2 def _0x30492c(self, a, b): d = [i for i in range(256)] c = 0 result = bytearray(len(b)) for i in range(256): c = (c + d[i] + ord(a[i % len(a)])) % 256 e = d[i] d[i] = d[c] d[c] = e t = 0 c = 0 for i in range(len(b)): t = (t + 1) % 256 c = (c + d[t]) % 256 e = d[t] d[t] = d[c] d[c] = e result[i] = ord(b[i]) ^ d[(d[t] + d[c]) % 256] return result def clean_filename(self, filename: str) -> str: """Sanitize filename.""" return re.sub(r'[\\/*?:"<>|]', "", filename).strip()[:100] from playwright.sync_api import sync_playwright class VideoDownloader: """Standalone class for downloading Douyin videos by URL using Playwright.""" def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36', 'Referer': 'https://www.douyin.com/', }) self.utils = Utils() self.playwright = None self.browser = None self.context = None self._is_closed = False # Start Playwright self._start_browser() def _start_browser(self): try: self.playwright = sync_playwright().start() # Use headless=False with args=['--headless=new'] to avoid headless-shell dependency issues self.browser = self.playwright.chromium.launch(headless=False, args=["--headless=new"]) self.context = self.browser.new_context( user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36', viewport={'width': 1920, 'height': 1080} ) # Add stealth scripts if needed, but basic should work for now except Exception as e: logger.error(f"Failed to start Playwright: {e}") def close(self): if self._is_closed: return try: if self.context: self.context.close() if self.browser: self.browser.close() if self.playwright: self.playwright.stop() except Exception as e: pass finally: self._is_closed = True def __del__(self): if not hasattr(self, '_is_closed') or not self._is_closed: self.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def download(self, url: str, save_dir: str = "./downloads") -> bool: """Main method to download a video from a share URL.""" if not os.path.exists(save_dir): os.makedirs(save_dir) if not self.context: logger.error("Browser not initialized.") return False try: page = self.context.new_page() # Network interception to catch video URLs captured_video_urls = [] def handle_response(response): try: url = response.url # Filter for likely video URLs if 'douyinvod.com' in url and 'mime_type=audio' not in url: if ('mime_type=video_mp4' in url or '.mp4' in url or 'video' in url): if response.status == 200 or response.status == 206: captured_video_urls.append(url) elif 'video/mp4' in response.headers.get('content-type', ''): captured_video_urls.append(url) except: pass page.on("response", handle_response) # 1. Resolve URL and Go to Page logger.info(f"Processing URL: {url}") # Handle share text match = re.search(r'(https?://v\.douyin\.com/[a-zA-Z0-9\-_]+/?|https?://www\.douyin\.com/[^\s]+)', url) if match: url = match.group(0) page.goto(url, wait_until='domcontentloaded') logger.info("Page loaded.") # Wait for video element try: logger.info("Waiting for video selector (using timeout)...") # page.wait_for_selector('video', timeout=15000) page.wait_for_timeout(5000) logger.info("Timeout complete.") # Try to wait for a real video source (douyinvod.com) try: logger.info("Waiting for video source (skipped)...") # page.wait_for_function(...) # page.wait_for_timeout(2000) logger.info("Video source check skipped.") except: logger.info("Wait for video source timed out (continuing).") pass except Exception as e: logger.warning(f"Timeout waiting for video: {e}") if "验证码" in page.title() or "slider" in page.content(): logger.error("Encountered captcha. Skipping.") page.close() return False # 2. Extract Info from Page video_info = page.evaluate("""() => { const video = document.querySelector('video'); if (video) { const sources = Array.from(video.querySelectorAll('source')).map(s => s.src); return {src: video.src, sources: sources}; } return null; }""") candidates = [] if video_info: if video_info.get('src'): candidates.append(video_info['src']) if video_info.get('sources'): candidates.extend(video_info['sources']) video_src = None # Prioritize douyinvod.com links for c in candidates: if c and 'douyinvod.com' in c and not c.startswith('blob:'): video_src = c break # Check network captured URLs if DOM extraction failed or returned blob if not video_src or (video_src and video_src.startswith('blob:')): # Filter captured URLs for best match valid_captured = [u for u in captured_video_urls if 'douyinvod.com' in u] if valid_captured: video_src = valid_captured[0] logger.info(f"Found video via network interception: {video_src}") elif captured_video_urls: # Avoid m3u8 or other non-mp4 if possible, but for now take what we have video_src = captured_video_urls[0] logger.info(f"Found video via network interception (generic): {video_src}") # Fallback to any valid http link (excluding uuu_265 placeholder) if not video_src: for c in candidates: if c and c.startswith('http') and not c.startswith('blob:') and 'uuu_265' not in c: video_src = c break # Regex Fallback if still not found if not video_src: logger.info("Trying regex fallback for video URL...") content = page.content() import urllib.parse # Look for encoded douyinvod.com links commonly found in RENDER_DATA # Pattern: https%3A%2F%2F...douyinvod.com... # Using a broad pattern to catch encoded URLs regex_pattern = r'https(?:%3A%2F%2F|://)[a-zA-Z0-9\-\.]*douyinvod\.com(?:%2F|/)[^"&\s]+' matches = re.findall(regex_pattern, content) for m in matches: decoded = urllib.parse.unquote(m) if 'uuu_265' not in decoded and 'mime_type=audio' not in decoded: video_src = decoded logger.info(f"Found video via regex: {video_src}") break if not video_src: logger.error(f"No valid video source found. Candidates: {candidates}") page.close() return False if video_src and video_src.startswith('blob:'): logger.error("Video src is still a blob and network interception failed.") page.close() return False desc = page.title() # Clean title (remove " - 抖音" etc) desc = re.sub(r' - 抖音$', '', desc) desc = self.utils.clean_filename(desc) if not desc: desc = f"video_{int(time.time())}" # Append URL hash to ensure uniqueness url_hash = hashlib.md5(url.encode()).hexdigest()[:8] desc = f"{desc}_{url_hash}" logger.info(f"Found video: {desc}") logger.info(f"Video URL: {video_src}") # 3. Download Video filename = f"{desc}.mp4" filepath = os.path.join(save_dir, filename) if os.path.exists(filepath): logger.info(f"File exists: {filepath}") page.close() return True if video_src.startswith('//'): video_src = 'https:' + video_src # Use Playwright API Request to avoid 403 Forbidden try: response = self.context.request.get( video_src, headers={'Referer': 'https://www.douyin.com/'} ) if response.ok: with open(filepath, 'wb') as f: f.write(response.body()) logger.info(f"Saved to {filepath}") self._post_process_video(filepath) page.close() return True else: logger.error(f"Failed to download: {response.status} {response.status_text}") # Fallback to requests if Playwright API fails (unlikely if 403 is the issue) except Exception as e: logger.error(f"Playwright download failed: {e}") # Fallback to requests (old method) cookies = {c['name']: c['value'] for c in self.context.cookies()} with requests.get(video_src, cookies=cookies, headers={'User-Agent': self.session.headers['User-Agent'], 'Referer': 'https://www.douyin.com/'}, stream=True, timeout=60) as r: r.raise_for_status() with open(filepath, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) logger.info(f"Saved to {filepath}") self._post_process_video(filepath) page.close() return True except Exception as e: logger.error(f"Error processing {url}: {e}") return False def _post_process_video(self, filepath: str): """Check and convert video to H.264 if needed.""" try: # Check codec cmd = [ "ffprobe", "-v", "error", "-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", filepath ] result = subprocess.run(cmd, capture_output=True, text=True, check=True) codecs = result.stdout.strip().split('\n') if "hevc" in codecs: logger.info(f"HEVC codec detected in {filepath}. User has compatible player, skipping conversion.") # logger.info(f"HEVC codec detected in {filepath}. Converting to H.264...") # directory = os.path.dirname(filepath) # filename = os.path.basename(filepath) # name, ext = os.path.splitext(filename) # temp_filepath = os.path.join(directory, f"{name}_temp{ext}") # convert_cmd = [ # "ffmpeg", "-i", filepath, # "-c:v", "libx264", "-c:a", "copy", # "-y", temp_filepath # ] # subprocess.run(convert_cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # # Replace original # os.remove(filepath) # os.rename(temp_filepath, filepath) # logger.info("Conversion successful.") except Exception as e: logger.warning(f"Post-processing (conversion) failed: {e}. File might be unplayable on some devices.")