Files
aiData/DouYin/VideoDownloader.py

419 lines
17 KiB
Python
Raw Permalink Normal View History

2026-01-20 19:06:36 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import re
import json
import time
import hashlib
import base64
import random
import os
import logging
import subprocess
from typing import Optional, Dict
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("VideoDownloader")
class Utils:
"""Utility class for X-Bogus generation and other helpers."""
def __init__(self):
pass
def getttwid(self):
url = 'https://ttwid.bytedance.com/ttwid/union/register/'
data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
try:
res = requests.post(url=url, data=data, timeout=10)
for i, j in res.cookies.items():
return j
except Exception as e:
logging.error(f"Failed to get ttwid: {e}")
return None
def getXbogus(self, payload, form='', ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'):
xbogus = self.get_xbogus(payload, ua, form)
params = payload + "&X-Bogus=" + xbogus
return params
def get_xbogus(self, payload, ua, form):
short_str = "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe="
arr2 = self.get_arr2(payload, ua, form)
garbled_string = self.get_garbled_string(arr2)
xbogus = ""
for i in range(0, 21, 3):
char_code_num0 = garbled_string[i]
char_code_num1 = garbled_string[i + 1]
char_code_num2 = garbled_string[i + 2]
base_num = char_code_num2 | char_code_num1 << 8 | char_code_num0 << 16
str1 = short_str[(base_num & 16515072) >> 18]
str2 = short_str[(base_num & 258048) >> 12]
str3 = short_str[(base_num & 4032) >> 6]
str4 = short_str[base_num & 63]
xbogus += str1 + str2 + str3 + str4
return xbogus
def get_garbled_string(self, arr2):
p = [
arr2[0], arr2[10], arr2[1], arr2[11], arr2[2], arr2[12], arr2[3], arr2[13], arr2[4], arr2[14],
arr2[5], arr2[15], arr2[6], arr2[16], arr2[7], arr2[17], arr2[8], arr2[18], arr2[9]
]
char_array = [chr(i) for i in p]
f = []
f.extend([2, 255])
tmp = ['ÿ']
bytes_ = self._0x30492c(tmp, "".join(char_array))
for i in range(len(bytes_)):
f.append(bytes_[i])
return f
def get_arr2(self, payload, ua, form):
salt_payload_bytes = hashlib.md5(hashlib.md5(payload.encode()).digest()).digest()
salt_payload = [byte for byte in salt_payload_bytes]
salt_form_bytes = hashlib.md5(hashlib.md5(form.encode()).digest()).digest()
salt_form = [byte for byte in salt_form_bytes]
ua_key = ['\u0000', '\u0001', '\u000e']
salt_ua_bytes = hashlib.md5(base64.b64encode(self._0x30492c(ua_key, ua))).digest()
salt_ua = [byte for byte in salt_ua_bytes]
timestamp = int(time.time())
canvas = 1489154074
arr1 = [
64, 0, 1, 14,
salt_payload[14], salt_payload[15],
salt_form[14], salt_form[15],
salt_ua[14], salt_ua[15],
(timestamp >> 24) & 255, (timestamp >> 16) & 255, (timestamp >> 8) & 255, (timestamp >> 0) & 255,
(canvas >> 24) & 255, (canvas >> 16) & 255, (canvas >> 8) & 255, (canvas >> 0) & 255,
64
]
for i in range(1, len(arr1) - 1):
arr1[18] ^= arr1[i]
arr2 = [arr1[0], arr1[2], arr1[4], arr1[6], arr1[8], arr1[10], arr1[12], arr1[14], arr1[16], arr1[18], arr1[1],
arr1[3], arr1[5], arr1[7], arr1[9], arr1[11], arr1[13], arr1[15], arr1[17]]
return arr2
def _0x30492c(self, a, b):
d = [i for i in range(256)]
c = 0
result = bytearray(len(b))
for i in range(256):
c = (c + d[i] + ord(a[i % len(a)])) % 256
e = d[i]
d[i] = d[c]
d[c] = e
t = 0
c = 0
for i in range(len(b)):
t = (t + 1) % 256
c = (c + d[t]) % 256
e = d[t]
d[t] = d[c]
d[c] = e
result[i] = ord(b[i]) ^ d[(d[t] + d[c]) % 256]
return result
def clean_filename(self, filename: str) -> str:
"""Sanitize filename."""
return re.sub(r'[\\/*?:"<>|]', "", filename).strip()[:100]
from playwright.sync_api import sync_playwright
class VideoDownloader:
"""Standalone class for downloading Douyin videos by URL using Playwright."""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
'Referer': 'https://www.douyin.com/',
})
self.utils = Utils()
self.playwright = None
self.browser = None
self.context = None
self._is_closed = False
# Start Playwright
self._start_browser()
def _start_browser(self):
try:
self.playwright = sync_playwright().start()
# Use headless=False with args=['--headless=new'] to avoid headless-shell dependency issues
self.browser = self.playwright.chromium.launch(headless=False, args=["--headless=new"])
self.context = self.browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
viewport={'width': 1920, 'height': 1080}
)
# Add stealth scripts if needed, but basic should work for now
except Exception as e:
logger.error(f"Failed to start Playwright: {e}")
def close(self):
if self._is_closed:
return
try:
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
except Exception as e:
pass
finally:
self._is_closed = True
def __del__(self):
if not hasattr(self, '_is_closed') or not self._is_closed:
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def download(self, url: str, save_dir: str = "./downloads") -> bool:
"""Main method to download a video from a share URL."""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
if not self.context:
logger.error("Browser not initialized.")
return False
try:
page = self.context.new_page()
# Network interception to catch video URLs
captured_video_urls = []
def handle_response(response):
try:
url = response.url
# Filter for likely video URLs
if 'douyinvod.com' in url and 'mime_type=audio' not in url:
if ('mime_type=video_mp4' in url or '.mp4' in url or 'video' in url):
if response.status == 200 or response.status == 206:
captured_video_urls.append(url)
elif 'video/mp4' in response.headers.get('content-type', ''):
captured_video_urls.append(url)
except:
pass
page.on("response", handle_response)
# 1. Resolve URL and Go to Page
logger.info(f"Processing URL: {url}")
# Handle share text
match = re.search(r'(https?://v\.douyin\.com/[a-zA-Z0-9\-_]+/?|https?://www\.douyin\.com/[^\s]+)', url)
if match:
url = match.group(0)
page.goto(url, wait_until='domcontentloaded')
logger.info("Page loaded.")
# Wait for video element
try:
logger.info("Waiting for video selector (using timeout)...")
# page.wait_for_selector('video', timeout=15000)
page.wait_for_timeout(5000)
logger.info("Timeout complete.")
# Try to wait for a real video source (douyinvod.com)
try:
logger.info("Waiting for video source (skipped)...")
# page.wait_for_function(...)
# page.wait_for_timeout(2000)
logger.info("Video source check skipped.")
except:
logger.info("Wait for video source timed out (continuing).")
pass
except Exception as e:
logger.warning(f"Timeout waiting for video: {e}")
if "验证码" in page.title() or "slider" in page.content():
logger.error("Encountered captcha. Skipping.")
page.close()
return False
# 2. Extract Info from Page
video_info = page.evaluate("""() => {
const video = document.querySelector('video');
if (video) {
const sources = Array.from(video.querySelectorAll('source')).map(s => s.src);
return {src: video.src, sources: sources};
}
return null;
}""")
candidates = []
if video_info:
if video_info.get('src'): candidates.append(video_info['src'])
if video_info.get('sources'): candidates.extend(video_info['sources'])
video_src = None
# Prioritize douyinvod.com links
for c in candidates:
if c and 'douyinvod.com' in c and not c.startswith('blob:'):
video_src = c
break
# Check network captured URLs if DOM extraction failed or returned blob
if not video_src or (video_src and video_src.startswith('blob:')):
# Filter captured URLs for best match
valid_captured = [u for u in captured_video_urls if 'douyinvod.com' in u]
if valid_captured:
video_src = valid_captured[0]
logger.info(f"Found video via network interception: {video_src}")
elif captured_video_urls:
# Avoid m3u8 or other non-mp4 if possible, but for now take what we have
video_src = captured_video_urls[0]
logger.info(f"Found video via network interception (generic): {video_src}")
# Fallback to any valid http link (excluding uuu_265 placeholder)
if not video_src:
for c in candidates:
if c and c.startswith('http') and not c.startswith('blob:') and 'uuu_265' not in c:
video_src = c
break
# Regex Fallback if still not found
if not video_src:
logger.info("Trying regex fallback for video URL...")
content = page.content()
import urllib.parse
# Look for encoded douyinvod.com links commonly found in RENDER_DATA
# Pattern: https%3A%2F%2F...douyinvod.com...
# Using a broad pattern to catch encoded URLs
regex_pattern = r'https(?:%3A%2F%2F|://)[a-zA-Z0-9\-\.]*douyinvod\.com(?:%2F|/)[^"&\s]+'
matches = re.findall(regex_pattern, content)
for m in matches:
decoded = urllib.parse.unquote(m)
if 'uuu_265' not in decoded and 'mime_type=audio' not in decoded:
video_src = decoded
logger.info(f"Found video via regex: {video_src}")
break
if not video_src:
logger.error(f"No valid video source found. Candidates: {candidates}")
page.close()
return False
if video_src and video_src.startswith('blob:'):
logger.error("Video src is still a blob and network interception failed.")
page.close()
return False
desc = page.title()
# Clean title (remove " - 抖音" etc)
desc = re.sub(r' - 抖音$', '', desc)
desc = self.utils.clean_filename(desc)
if not desc:
desc = f"video_{int(time.time())}"
# Append URL hash to ensure uniqueness
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
desc = f"{desc}_{url_hash}"
logger.info(f"Found video: {desc}")
logger.info(f"Video URL: {video_src}")
# 3. Download Video
filename = f"{desc}.mp4"
filepath = os.path.join(save_dir, filename)
if os.path.exists(filepath):
logger.info(f"File exists: {filepath}")
page.close()
return True
if video_src.startswith('//'):
video_src = 'https:' + video_src
# Use Playwright API Request to avoid 403 Forbidden
try:
response = self.context.request.get(
video_src,
headers={'Referer': 'https://www.douyin.com/'}
)
if response.ok:
with open(filepath, 'wb') as f:
f.write(response.body())
logger.info(f"Saved to {filepath}")
self._post_process_video(filepath)
page.close()
return True
else:
logger.error(f"Failed to download: {response.status} {response.status_text}")
# Fallback to requests if Playwright API fails (unlikely if 403 is the issue)
except Exception as e:
logger.error(f"Playwright download failed: {e}")
# Fallback to requests (old method)
cookies = {c['name']: c['value'] for c in self.context.cookies()}
with requests.get(video_src, cookies=cookies, headers={'User-Agent': self.session.headers['User-Agent'], 'Referer': 'https://www.douyin.com/'}, stream=True, timeout=60) as r:
r.raise_for_status()
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Saved to {filepath}")
self._post_process_video(filepath)
page.close()
return True
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return False
def _post_process_video(self, filepath: str):
"""Check and convert video to H.264 if needed."""
try:
# Check codec
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "stream=codec_name",
"-of", "default=noprint_wrappers=1:nokey=1",
filepath
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
codecs = result.stdout.strip().split('\n')
if "hevc" in codecs:
logger.info(f"HEVC codec detected in {filepath}. User has compatible player, skipping conversion.")
# logger.info(f"HEVC codec detected in {filepath}. Converting to H.264...")
# directory = os.path.dirname(filepath)
# filename = os.path.basename(filepath)
# name, ext = os.path.splitext(filename)
# temp_filepath = os.path.join(directory, f"{name}_temp{ext}")
# convert_cmd = [
# "ffmpeg", "-i", filepath,
# "-c:v", "libx264", "-c:a", "copy",
# "-y", temp_filepath
# ]
# subprocess.run(convert_cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# # Replace original
# os.remove(filepath)
# os.rename(temp_filepath, filepath)
# logger.info("Conversion successful.")
except Exception as e:
logger.warning(f"Post-processing (conversion) failed: {e}. File might be unplayable on some devices.")