419 lines
17 KiB
Python
419 lines
17 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import requests
|
|
import re
|
|
import json
|
|
import time
|
|
import hashlib
|
|
import base64
|
|
import random
|
|
import os
|
|
import logging
|
|
import subprocess
|
|
from typing import Optional, Dict
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger("VideoDownloader")
|
|
|
|
class Utils:
|
|
"""Utility class for X-Bogus generation and other helpers."""
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def getttwid(self):
|
|
url = 'https://ttwid.bytedance.com/ttwid/union/register/'
|
|
data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
|
|
try:
|
|
res = requests.post(url=url, data=data, timeout=10)
|
|
for i, j in res.cookies.items():
|
|
return j
|
|
except Exception as e:
|
|
logging.error(f"Failed to get ttwid: {e}")
|
|
return None
|
|
|
|
def getXbogus(self, payload, form='', ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'):
|
|
xbogus = self.get_xbogus(payload, ua, form)
|
|
params = payload + "&X-Bogus=" + xbogus
|
|
return params
|
|
|
|
def get_xbogus(self, payload, ua, form):
|
|
short_str = "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe="
|
|
arr2 = self.get_arr2(payload, ua, form)
|
|
garbled_string = self.get_garbled_string(arr2)
|
|
xbogus = ""
|
|
for i in range(0, 21, 3):
|
|
char_code_num0 = garbled_string[i]
|
|
char_code_num1 = garbled_string[i + 1]
|
|
char_code_num2 = garbled_string[i + 2]
|
|
base_num = char_code_num2 | char_code_num1 << 8 | char_code_num0 << 16
|
|
str1 = short_str[(base_num & 16515072) >> 18]
|
|
str2 = short_str[(base_num & 258048) >> 12]
|
|
str3 = short_str[(base_num & 4032) >> 6]
|
|
str4 = short_str[base_num & 63]
|
|
xbogus += str1 + str2 + str3 + str4
|
|
return xbogus
|
|
|
|
def get_garbled_string(self, arr2):
|
|
p = [
|
|
arr2[0], arr2[10], arr2[1], arr2[11], arr2[2], arr2[12], arr2[3], arr2[13], arr2[4], arr2[14],
|
|
arr2[5], arr2[15], arr2[6], arr2[16], arr2[7], arr2[17], arr2[8], arr2[18], arr2[9]
|
|
]
|
|
char_array = [chr(i) for i in p]
|
|
f = []
|
|
f.extend([2, 255])
|
|
tmp = ['ÿ']
|
|
bytes_ = self._0x30492c(tmp, "".join(char_array))
|
|
for i in range(len(bytes_)):
|
|
f.append(bytes_[i])
|
|
return f
|
|
|
|
def get_arr2(self, payload, ua, form):
|
|
salt_payload_bytes = hashlib.md5(hashlib.md5(payload.encode()).digest()).digest()
|
|
salt_payload = [byte for byte in salt_payload_bytes]
|
|
|
|
salt_form_bytes = hashlib.md5(hashlib.md5(form.encode()).digest()).digest()
|
|
salt_form = [byte for byte in salt_form_bytes]
|
|
|
|
ua_key = ['\u0000', '\u0001', '\u000e']
|
|
salt_ua_bytes = hashlib.md5(base64.b64encode(self._0x30492c(ua_key, ua))).digest()
|
|
salt_ua = [byte for byte in salt_ua_bytes]
|
|
|
|
timestamp = int(time.time())
|
|
canvas = 1489154074
|
|
|
|
arr1 = [
|
|
64, 0, 1, 14,
|
|
salt_payload[14], salt_payload[15],
|
|
salt_form[14], salt_form[15],
|
|
salt_ua[14], salt_ua[15],
|
|
(timestamp >> 24) & 255, (timestamp >> 16) & 255, (timestamp >> 8) & 255, (timestamp >> 0) & 255,
|
|
(canvas >> 24) & 255, (canvas >> 16) & 255, (canvas >> 8) & 255, (canvas >> 0) & 255,
|
|
64
|
|
]
|
|
|
|
for i in range(1, len(arr1) - 1):
|
|
arr1[18] ^= arr1[i]
|
|
|
|
arr2 = [arr1[0], arr1[2], arr1[4], arr1[6], arr1[8], arr1[10], arr1[12], arr1[14], arr1[16], arr1[18], arr1[1],
|
|
arr1[3], arr1[5], arr1[7], arr1[9], arr1[11], arr1[13], arr1[15], arr1[17]]
|
|
return arr2
|
|
|
|
def _0x30492c(self, a, b):
|
|
d = [i for i in range(256)]
|
|
c = 0
|
|
result = bytearray(len(b))
|
|
for i in range(256):
|
|
c = (c + d[i] + ord(a[i % len(a)])) % 256
|
|
e = d[i]
|
|
d[i] = d[c]
|
|
d[c] = e
|
|
t = 0
|
|
c = 0
|
|
for i in range(len(b)):
|
|
t = (t + 1) % 256
|
|
c = (c + d[t]) % 256
|
|
e = d[t]
|
|
d[t] = d[c]
|
|
d[c] = e
|
|
result[i] = ord(b[i]) ^ d[(d[t] + d[c]) % 256]
|
|
return result
|
|
|
|
def clean_filename(self, filename: str) -> str:
|
|
"""Sanitize filename."""
|
|
return re.sub(r'[\\/*?:"<>|]', "", filename).strip()[:100]
|
|
|
|
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
class VideoDownloader:
|
|
"""Standalone class for downloading Douyin videos by URL using Playwright."""
|
|
|
|
def __init__(self):
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
|
|
'Referer': 'https://www.douyin.com/',
|
|
})
|
|
self.utils = Utils()
|
|
self.playwright = None
|
|
self.browser = None
|
|
self.context = None
|
|
self._is_closed = False
|
|
|
|
# Start Playwright
|
|
self._start_browser()
|
|
|
|
def _start_browser(self):
|
|
try:
|
|
self.playwright = sync_playwright().start()
|
|
# Use headless=False with args=['--headless=new'] to avoid headless-shell dependency issues
|
|
self.browser = self.playwright.chromium.launch(headless=False, args=["--headless=new"])
|
|
self.context = self.browser.new_context(
|
|
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
|
|
viewport={'width': 1920, 'height': 1080}
|
|
)
|
|
# Add stealth scripts if needed, but basic should work for now
|
|
except Exception as e:
|
|
logger.error(f"Failed to start Playwright: {e}")
|
|
|
|
def close(self):
|
|
if self._is_closed:
|
|
return
|
|
|
|
try:
|
|
if self.context:
|
|
self.context.close()
|
|
if self.browser:
|
|
self.browser.close()
|
|
if self.playwright:
|
|
self.playwright.stop()
|
|
except Exception as e:
|
|
pass
|
|
finally:
|
|
self._is_closed = True
|
|
|
|
def __del__(self):
|
|
if not hasattr(self, '_is_closed') or not self._is_closed:
|
|
self.close()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
def download(self, url: str, save_dir: str = "./downloads") -> bool:
|
|
"""Main method to download a video from a share URL."""
|
|
if not os.path.exists(save_dir):
|
|
os.makedirs(save_dir)
|
|
|
|
if not self.context:
|
|
logger.error("Browser not initialized.")
|
|
return False
|
|
|
|
try:
|
|
page = self.context.new_page()
|
|
|
|
# Network interception to catch video URLs
|
|
captured_video_urls = []
|
|
def handle_response(response):
|
|
try:
|
|
url = response.url
|
|
# Filter for likely video URLs
|
|
if 'douyinvod.com' in url and 'mime_type=audio' not in url:
|
|
if ('mime_type=video_mp4' in url or '.mp4' in url or 'video' in url):
|
|
if response.status == 200 or response.status == 206:
|
|
captured_video_urls.append(url)
|
|
elif 'video/mp4' in response.headers.get('content-type', ''):
|
|
captured_video_urls.append(url)
|
|
except:
|
|
pass
|
|
|
|
page.on("response", handle_response)
|
|
|
|
# 1. Resolve URL and Go to Page
|
|
logger.info(f"Processing URL: {url}")
|
|
|
|
# Handle share text
|
|
match = re.search(r'(https?://v\.douyin\.com/[a-zA-Z0-9\-_]+/?|https?://www\.douyin\.com/[^\s]+)', url)
|
|
if match:
|
|
url = match.group(0)
|
|
|
|
page.goto(url, wait_until='domcontentloaded')
|
|
logger.info("Page loaded.")
|
|
|
|
# Wait for video element
|
|
try:
|
|
logger.info("Waiting for video selector (using timeout)...")
|
|
# page.wait_for_selector('video', timeout=15000)
|
|
page.wait_for_timeout(5000)
|
|
logger.info("Timeout complete.")
|
|
|
|
# Try to wait for a real video source (douyinvod.com)
|
|
try:
|
|
logger.info("Waiting for video source (skipped)...")
|
|
# page.wait_for_function(...)
|
|
# page.wait_for_timeout(2000)
|
|
logger.info("Video source check skipped.")
|
|
except:
|
|
logger.info("Wait for video source timed out (continuing).")
|
|
pass
|
|
except Exception as e:
|
|
logger.warning(f"Timeout waiting for video: {e}")
|
|
if "验证码" in page.title() or "slider" in page.content():
|
|
logger.error("Encountered captcha. Skipping.")
|
|
page.close()
|
|
return False
|
|
|
|
# 2. Extract Info from Page
|
|
video_info = page.evaluate("""() => {
|
|
const video = document.querySelector('video');
|
|
if (video) {
|
|
const sources = Array.from(video.querySelectorAll('source')).map(s => s.src);
|
|
return {src: video.src, sources: sources};
|
|
}
|
|
return null;
|
|
}""")
|
|
|
|
candidates = []
|
|
if video_info:
|
|
if video_info.get('src'): candidates.append(video_info['src'])
|
|
if video_info.get('sources'): candidates.extend(video_info['sources'])
|
|
|
|
video_src = None
|
|
# Prioritize douyinvod.com links
|
|
for c in candidates:
|
|
if c and 'douyinvod.com' in c and not c.startswith('blob:'):
|
|
video_src = c
|
|
break
|
|
|
|
# Check network captured URLs if DOM extraction failed or returned blob
|
|
if not video_src or (video_src and video_src.startswith('blob:')):
|
|
# Filter captured URLs for best match
|
|
valid_captured = [u for u in captured_video_urls if 'douyinvod.com' in u]
|
|
if valid_captured:
|
|
video_src = valid_captured[0]
|
|
logger.info(f"Found video via network interception: {video_src}")
|
|
elif captured_video_urls:
|
|
# Avoid m3u8 or other non-mp4 if possible, but for now take what we have
|
|
video_src = captured_video_urls[0]
|
|
logger.info(f"Found video via network interception (generic): {video_src}")
|
|
|
|
# Fallback to any valid http link (excluding uuu_265 placeholder)
|
|
if not video_src:
|
|
for c in candidates:
|
|
if c and c.startswith('http') and not c.startswith('blob:') and 'uuu_265' not in c:
|
|
video_src = c
|
|
break
|
|
|
|
# Regex Fallback if still not found
|
|
if not video_src:
|
|
logger.info("Trying regex fallback for video URL...")
|
|
content = page.content()
|
|
import urllib.parse
|
|
|
|
# Look for encoded douyinvod.com links commonly found in RENDER_DATA
|
|
# Pattern: https%3A%2F%2F...douyinvod.com...
|
|
# Using a broad pattern to catch encoded URLs
|
|
regex_pattern = r'https(?:%3A%2F%2F|://)[a-zA-Z0-9\-\.]*douyinvod\.com(?:%2F|/)[^"&\s]+'
|
|
matches = re.findall(regex_pattern, content)
|
|
|
|
for m in matches:
|
|
decoded = urllib.parse.unquote(m)
|
|
if 'uuu_265' not in decoded and 'mime_type=audio' not in decoded:
|
|
video_src = decoded
|
|
logger.info(f"Found video via regex: {video_src}")
|
|
break
|
|
|
|
if not video_src:
|
|
logger.error(f"No valid video source found. Candidates: {candidates}")
|
|
page.close()
|
|
return False
|
|
|
|
if video_src and video_src.startswith('blob:'):
|
|
logger.error("Video src is still a blob and network interception failed.")
|
|
page.close()
|
|
return False
|
|
|
|
desc = page.title()
|
|
# Clean title (remove " - 抖音" etc)
|
|
desc = re.sub(r' - 抖音$', '', desc)
|
|
desc = self.utils.clean_filename(desc)
|
|
if not desc:
|
|
desc = f"video_{int(time.time())}"
|
|
|
|
# Append URL hash to ensure uniqueness
|
|
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
|
|
desc = f"{desc}_{url_hash}"
|
|
|
|
logger.info(f"Found video: {desc}")
|
|
logger.info(f"Video URL: {video_src}")
|
|
|
|
# 3. Download Video
|
|
filename = f"{desc}.mp4"
|
|
filepath = os.path.join(save_dir, filename)
|
|
|
|
if os.path.exists(filepath):
|
|
logger.info(f"File exists: {filepath}")
|
|
page.close()
|
|
return True
|
|
|
|
if video_src.startswith('//'):
|
|
video_src = 'https:' + video_src
|
|
|
|
# Use Playwright API Request to avoid 403 Forbidden
|
|
try:
|
|
response = self.context.request.get(
|
|
video_src,
|
|
headers={'Referer': 'https://www.douyin.com/'}
|
|
)
|
|
if response.ok:
|
|
with open(filepath, 'wb') as f:
|
|
f.write(response.body())
|
|
logger.info(f"Saved to {filepath}")
|
|
self._post_process_video(filepath)
|
|
page.close()
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to download: {response.status} {response.status_text}")
|
|
# Fallback to requests if Playwright API fails (unlikely if 403 is the issue)
|
|
except Exception as e:
|
|
logger.error(f"Playwright download failed: {e}")
|
|
|
|
# Fallback to requests (old method)
|
|
cookies = {c['name']: c['value'] for c in self.context.cookies()}
|
|
with requests.get(video_src, cookies=cookies, headers={'User-Agent': self.session.headers['User-Agent'], 'Referer': 'https://www.douyin.com/'}, stream=True, timeout=60) as r:
|
|
r.raise_for_status()
|
|
with open(filepath, 'wb') as f:
|
|
for chunk in r.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
|
|
logger.info(f"Saved to {filepath}")
|
|
self._post_process_video(filepath)
|
|
page.close()
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing {url}: {e}")
|
|
return False
|
|
|
|
def _post_process_video(self, filepath: str):
|
|
"""Check and convert video to H.264 if needed."""
|
|
try:
|
|
# Check codec
|
|
cmd = [
|
|
"ffprobe", "-v", "error",
|
|
"-show_entries", "stream=codec_name",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
filepath
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
codecs = result.stdout.strip().split('\n')
|
|
|
|
if "hevc" in codecs:
|
|
logger.info(f"HEVC codec detected in {filepath}. User has compatible player, skipping conversion.")
|
|
# logger.info(f"HEVC codec detected in {filepath}. Converting to H.264...")
|
|
# directory = os.path.dirname(filepath)
|
|
# filename = os.path.basename(filepath)
|
|
# name, ext = os.path.splitext(filename)
|
|
# temp_filepath = os.path.join(directory, f"{name}_temp{ext}")
|
|
|
|
# convert_cmd = [
|
|
# "ffmpeg", "-i", filepath,
|
|
# "-c:v", "libx264", "-c:a", "copy",
|
|
# "-y", temp_filepath
|
|
# ]
|
|
# subprocess.run(convert_cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
# # Replace original
|
|
# os.remove(filepath)
|
|
# os.rename(temp_filepath, filepath)
|
|
# logger.info("Conversion successful.")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Post-processing (conversion) failed: {e}. File might be unplayable on some devices.")
|
|
|