This commit is contained in:
HuangHai
2026-01-20 19:06:36 +08:00
parent 733ff659fd
commit f2f7a38210
49 changed files with 1470 additions and 0 deletions

Binary file not shown.

View File

@@ -0,0 +1,166 @@
[1/30] Processing line...
Found URL: https://v.douyin.com/gHWfWVgDVRo/
SUCCESS: https://v.douyin.com/gHWfWVgDVRo/
[3/30] Processing line...
Found URL: https://v.douyin.com/w3LQC4t1f2A/
SUCCESS: https://v.douyin.com/w3LQC4t1f2A/
[5/30] Processing line...
Found URL: https://v.douyin.com/8y-r-kniwXY/
SUCCESS: https://v.douyin.com/8y-r-kniwXY/
[7/30] Processing line...
Found URL: https://v.douyin.com/_AYAw2SMXX4/
SUCCESS: https://v.douyin.com/_AYAw2SMXX4/
[9/30] Processing line...
Found URL: https://v.douyin.com/_TFLCp9kwKw/
SUCCESS: https://v.douyin.com/_TFLCp9kwKw/
[11/30] Processing line...
Found URL: https://v.douyin.com/CRVXcWcXj40/
SUCCESS: https://v.douyin.com/CRVXcWcXj40/
[13/30] Processing line...
Found URL: https://v.douyin.com/-x8xMg-rke8/
SUCCESS: https://v.douyin.com/-x8xMg-rke8/
[16/30] Processing line...
Found URL: https://v.douyin.com/wjnXK8g9K7s/
SUCCESS: https://v.douyin.com/wjnXK8g9K7s/
[19/30] Processing line...
Found URL: https://v.douyin.com/N_2XTr-C93g/
SUCCESS: https://v.douyin.com/N_2XTr-C93g/
[21/30] Processing line...
Found URL: https://v.douyin.com/aSE5j289oPM/
SUCCESS: https://v.douyin.com/aSE5j289oPM/
[23/30] Processing line...
Found URL: https://v.douyin.com/thSn_LBExrY/
SUCCESS: https://v.douyin.com/thSn_LBExrY/
[25/30] Processing line...
Found URL: https://v.douyin.com/IoSGYcAvQ4U/
SUCCESS: https://v.douyin.com/IoSGYcAvQ4U/
[27/30] Processing line...
Found URL: https://v.douyin.com/o7K6_gXUyHg/
SUCCESS: https://v.douyin.com/o7K6_gXUyHg/
[29/30] Processing line...
Found URL: https://v.douyin.com/EdWGe2eOe_M/
SUCCESS: https://v.douyin.com/EdWGe2eOe_M/
MXX4/
[9/30] Processing line...
Found URL: https://v.douyin.com/_TFLCp9kwKw/
SUCCESS: https://v.douyin.com/_TFLCp9kwKw/
[11/30] Processing line...
Found URL: https://v.douyin.com/CRVXcWcXj40/
SUCCESS: https://v.douyin.com/CRVXcWcXj40/
[13/30] Processing line...
Found URL: https://v.douyin.com/-x8xMg-rke8/
SUCCESS: https://v.douyin.com/-x8xMg-rke8/
[16/30] Processing line...
Found URL: https://v.douyin.com/wjnXK8g9K7s/
SUCCESS: https://v.douyin.com/wjnXK8g9K7s/
[19/30] Processing line...
Found URL: https://v.douyin.com/N_2XTr-C93g/
SUCCESS: https://v.douyin.com/N_2XTr-C93g/
[21/30] Processing line...
Found URL: https://v.douyin.com/aSE5j289oPM/
SUCCESS: https://v.douyin.com/aSE5j289oPM/
[23/30] Processing line...
Found URL: https://v.douyin.com/thSn_LBExrY/
SUCCESS: https://v.douyin.com/thSn_LBExrY/
[25/30] Processing line...
Found URL: https://v.douyin.com/IoSGYcAvQ4U/
SUCCESS: https://v.douyin.com/IoSGYcAvQ4U/
[27/30] Processing line...
Found URL: https://v.douyin.com/o7K6_gXUyHg/
SUCCESS: https://v.douyin.com/o7K6_gXUyHg/
[29/30] Processing line...
Found URL: https://v.douyin.com/EdWGe2eOe_M/
SUCCESS: https://v.douyin.com/EdWGe2eOe_M/
--- Batch Download Ended at 2026-01-20 15:15:11 ---
--- Batch Download Started at 2026-01-20 15:26:55 ---
[1/30] Processing line...
Found URL: https://v.douyin.com/gHWfWVgDVRo/
SUCCESS: https://v.douyin.com/gHWfWVgDVRo/
[3/30] Processing line...
Found URL: https://v.douyin.com/w3LQC4t1f2A/
SUCCESS: https://v.douyin.com/w3LQC4t1f2A/
[5/30] Processing line...
Found URL: https://v.douyin.com/8y-r-kniwXY/
SUCCESS: https://v.douyin.com/8y-r-kniwXY/
[7/30] Processing line...
Found URL: https://v.douyin.com/_AYAw2SMXX4/
SUCCESS: https://v.douyin.com/_AYAw2SMXX4/
[9/30] Processing line...
Found URL: https://v.douyin.com/_TFLCp9kwKw/
SUCCESS: https://v.douyin.com/_TFLCp9kwKw/
[11/30] Processing line...
Found URL: https://v.douyin.com/CRVXcWcXj40/
SUCCESS: https://v.douyin.com/CRVXcWcXj40/
[13/30] Processing line...
Found URL: https://v.douyin.com/-x8xMg-rke8/
SUCCESS: https://v.douyin.com/-x8xMg-rke8/
[16/30] Processing line...
Found URL: https://v.douyin.com/wjnXK8g9K7s/
Attempt 1 failed. Retrying in 2s...
Attempt 2 failed. Retrying in 2s...
SUCCESS: https://v.douyin.com/wjnXK8g9K7s/
[19/30] Processing line...
Found URL: https://v.douyin.com/N_2XTr-C93g/
SUCCESS: https://v.douyin.com/N_2XTr-C93g/
[21/30] Processing line...
Found URL: https://v.douyin.com/aSE5j289oPM/
SUCCESS: https://v.douyin.com/aSE5j289oPM/
[23/30] Processing line...
Found URL: https://v.douyin.com/thSn_LBExrY/
SUCCESS: https://v.douyin.com/thSn_LBExrY/
[25/30] Processing line...
Found URL: https://v.douyin.com/IoSGYcAvQ4U/
SUCCESS: https://v.douyin.com/IoSGYcAvQ4U/
[27/30] Processing line...
Found URL: https://v.douyin.com/o7K6_gXUyHg/
SUCCESS: https://v.douyin.com/o7K6_gXUyHg/
[29/30] Processing line...
Found URL: https://v.douyin.com/EdWGe2eOe_M/
SUCCESS: https://v.douyin.com/EdWGe2eOe_M/
--- Batch Download Ended at 2026-01-20 15:32:38 ---
--- Batch Download Started at 2026-01-20 16:56:45 ---
[1/28] Processing line...
Found URL: https://v.douyin.com/gHWfWVgDVRo/
SUCCESS: https://v.douyin.com/gHWfWVgDVRo/
[3/28] Processing line...
Found URL: https://v.douyin.com/w3LQC4t1f2A/
SUCCESS: https://v.douyin.com/w3LQC4t1f2A/
[5/28] Processing line...
Found URL: https://v.douyin.com/8y-r-kniwXY/
SUCCESS: https://v.douyin.com/8y-r-kniwXY/
[7/28] Processing line...
Found URL: https://v.douyin.com/_AYAw2SMXX4/
SUCCESS: https://v.douyin.com/_AYAw2SMXX4/
[9/28] Processing line...
Found URL: https://v.douyin.com/_TFLCp9kwKw/
SUCCESS: https://v.douyin.com/_TFLCp9kwKw/
[11/28] Processing line...
Found URL: https://v.douyin.com/CRVXcWcXj40/
SUCCESS: https://v.douyin.com/CRVXcWcXj40/
[13/28] Processing line...
Found URL: https://v.douyin.com/-x8xMg-rke8/
SUCCESS: https://v.douyin.com/-x8xMg-rke8/
[15/28] Processing line...
Found URL: https://v.douyin.com/wjnXK8g9K7s/
SUCCESS: https://v.douyin.com/wjnXK8g9K7s/
[17/28] Processing line...
Found URL: https://v.douyin.com/N_2XTr-C93g/
SUCCESS: https://v.douyin.com/N_2XTr-C93g/
[19/28] Processing line...
Found URL: https://v.douyin.com/aSE5j289oPM/
SUCCESS: https://v.douyin.com/aSE5j289oPM/
[21/28] Processing line...
Found URL: https://v.douyin.com/thSn_LBExrY/
SUCCESS: https://v.douyin.com/thSn_LBExrY/
[23/28] Processing line...
Found URL: https://v.douyin.com/IoSGYcAvQ4U/
SUCCESS: https://v.douyin.com/IoSGYcAvQ4U/
[25/28] Processing line...
Found URL: https://v.douyin.com/o7K6_gXUyHg/
SUCCESS: https://v.douyin.com/o7K6_gXUyHg/
[27/28] Processing line...
Found URL: https://v.douyin.com/EdWGe2eOe_M/
SUCCESS: https://v.douyin.com/EdWGe2eOe_M/
--- Batch Download Ended at 2026-01-20 16:59:36 ---

Binary file not shown.

28
DouYin/Url.txt Normal file
View File

@@ -0,0 +1,28 @@
3.00 12/28 d@n.dN VYZ:/ 复制打开抖音极速版看看【聚合能研的作品】2026年电力市场的 “大洗牌” 正式开始 告别... https://v.douyin.com/gHWfWVgDVRo/
8.76 TYZ:/ p@d.Nw 06/06 复制打开抖音极速版看看【东哥新能源real的作品】峰谷平电价取消 灵活分时电价将全面替代划定分时电价... https://v.douyin.com/w3LQC4t1f2A/
7.46 08/02 q@R.kP lPK:/ 复制打开抖音极速版,看看【马哥能源频道的作品】政策深度解读:取消行政分时电价,为何是电力市场化的... https://v.douyin.com/8y-r-kniwXY/
9.79 X@M.Jv 04/22 jpQ:/ 复制打开抖音极速版看看【学习笔记的作品】2026电力市场的变革 # 电力 https://v.douyin.com/_AYAw2SMXX4/
9.43 12/11 b@A.gb cAt:/ 复制打开抖音极速版看看【华电丹姐说电力的作品】必须认真听的136号文详细解读# 知识分享 # ... https://v.douyin.com/_TFLCp9kwKw/
5.84 vfb:/ 11/01 P@X.mQ 复制打开抖音极速版,看看【特哥来电的作品】# 新能源充电桩 # 新能源汽车 # 电价 # 汉... https://v.douyin.com/CRVXcWcXj40/
4.84 12/16 WzT:/ I@V.lC 复制打开抖音极速版,看看【售电小蛮腰的作品】国家电网直接降电费多省事?为什么非要搞售电公司来绕... https://v.douyin.com/-x8xMg-rke8/
5.61 uFH:/ R@K.jc 08/17 复制打开抖音极速版,看看【耀昇集团的作品】电力现货交易:随机波动,持续运行# 售电 # 电改... https://v.douyin.com/wjnXK8g9K7s/
4.64 C@u.se 05/20 pqR:/ 复制打开抖音极速版,看看【晓莹她与电的那些事儿的作品】国家能源局关于电力市场典型违规问题的通报!# 全国... https://v.douyin.com/N_2XTr-C93g/
5.33 zGi:/ N@w.sR 11/24 复制打开抖音极速版,看看【高照-企业智库的作品】售电公司是什么?售电政策红利 # 售电# 售电居间... https://v.douyin.com/aSE5j289oPM/
0.20 07/07 s@e.OK sRk:/ 复制打开抖音极速版,看看【售电咨询的作品】深度解读!什么是现货电价?# 电力 # 储能 # ... https://v.douyin.com/thSn_LBExrY/
1.51 M@J.II 04/29 trE:/ 复制打开抖音极速版看看【东哥新能源real的作品】峰谷电价正式取消 很多人误以为“取消峰谷电价”是要... https://v.douyin.com/IoSGYcAvQ4U/
4.61 mqe:/ 03/16 m@Q.kP 复制打开抖音极速版看看【老严聊售电的作品】售电报价“35几”别高兴太早这可能是你踩过最贵... https://v.douyin.com/o7K6_gXUyHg/
7.10 oDH:/ 06/04 o@d.Nj 复制打开抖音极速版,看看【华电丹姐说电力的作品】为什么售电公司的电比电网的便宜,售电公司怎么赚钱?... https://v.douyin.com/EdWGe2eOe_M/

418
DouYin/VideoDownloader.py Normal file
View File

@@ -0,0 +1,418 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import requests
import re
import json
import time
import hashlib
import base64
import random
import os
import logging
import subprocess
from typing import Optional, Dict
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("VideoDownloader")
class Utils:
"""Utility class for X-Bogus generation and other helpers."""
def __init__(self):
pass
def getttwid(self):
url = 'https://ttwid.bytedance.com/ttwid/union/register/'
data = '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
try:
res = requests.post(url=url, data=data, timeout=10)
for i, j in res.cookies.items():
return j
except Exception as e:
logging.error(f"Failed to get ttwid: {e}")
return None
def getXbogus(self, payload, form='', ua='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'):
xbogus = self.get_xbogus(payload, ua, form)
params = payload + "&X-Bogus=" + xbogus
return params
def get_xbogus(self, payload, ua, form):
short_str = "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe="
arr2 = self.get_arr2(payload, ua, form)
garbled_string = self.get_garbled_string(arr2)
xbogus = ""
for i in range(0, 21, 3):
char_code_num0 = garbled_string[i]
char_code_num1 = garbled_string[i + 1]
char_code_num2 = garbled_string[i + 2]
base_num = char_code_num2 | char_code_num1 << 8 | char_code_num0 << 16
str1 = short_str[(base_num & 16515072) >> 18]
str2 = short_str[(base_num & 258048) >> 12]
str3 = short_str[(base_num & 4032) >> 6]
str4 = short_str[base_num & 63]
xbogus += str1 + str2 + str3 + str4
return xbogus
def get_garbled_string(self, arr2):
p = [
arr2[0], arr2[10], arr2[1], arr2[11], arr2[2], arr2[12], arr2[3], arr2[13], arr2[4], arr2[14],
arr2[5], arr2[15], arr2[6], arr2[16], arr2[7], arr2[17], arr2[8], arr2[18], arr2[9]
]
char_array = [chr(i) for i in p]
f = []
f.extend([2, 255])
tmp = ['ÿ']
bytes_ = self._0x30492c(tmp, "".join(char_array))
for i in range(len(bytes_)):
f.append(bytes_[i])
return f
def get_arr2(self, payload, ua, form):
salt_payload_bytes = hashlib.md5(hashlib.md5(payload.encode()).digest()).digest()
salt_payload = [byte for byte in salt_payload_bytes]
salt_form_bytes = hashlib.md5(hashlib.md5(form.encode()).digest()).digest()
salt_form = [byte for byte in salt_form_bytes]
ua_key = ['\u0000', '\u0001', '\u000e']
salt_ua_bytes = hashlib.md5(base64.b64encode(self._0x30492c(ua_key, ua))).digest()
salt_ua = [byte for byte in salt_ua_bytes]
timestamp = int(time.time())
canvas = 1489154074
arr1 = [
64, 0, 1, 14,
salt_payload[14], salt_payload[15],
salt_form[14], salt_form[15],
salt_ua[14], salt_ua[15],
(timestamp >> 24) & 255, (timestamp >> 16) & 255, (timestamp >> 8) & 255, (timestamp >> 0) & 255,
(canvas >> 24) & 255, (canvas >> 16) & 255, (canvas >> 8) & 255, (canvas >> 0) & 255,
64
]
for i in range(1, len(arr1) - 1):
arr1[18] ^= arr1[i]
arr2 = [arr1[0], arr1[2], arr1[4], arr1[6], arr1[8], arr1[10], arr1[12], arr1[14], arr1[16], arr1[18], arr1[1],
arr1[3], arr1[5], arr1[7], arr1[9], arr1[11], arr1[13], arr1[15], arr1[17]]
return arr2
def _0x30492c(self, a, b):
d = [i for i in range(256)]
c = 0
result = bytearray(len(b))
for i in range(256):
c = (c + d[i] + ord(a[i % len(a)])) % 256
e = d[i]
d[i] = d[c]
d[c] = e
t = 0
c = 0
for i in range(len(b)):
t = (t + 1) % 256
c = (c + d[t]) % 256
e = d[t]
d[t] = d[c]
d[c] = e
result[i] = ord(b[i]) ^ d[(d[t] + d[c]) % 256]
return result
def clean_filename(self, filename: str) -> str:
"""Sanitize filename."""
return re.sub(r'[\\/*?:"<>|]', "", filename).strip()[:100]
from playwright.sync_api import sync_playwright
class VideoDownloader:
"""Standalone class for downloading Douyin videos by URL using Playwright."""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
'Referer': 'https://www.douyin.com/',
})
self.utils = Utils()
self.playwright = None
self.browser = None
self.context = None
self._is_closed = False
# Start Playwright
self._start_browser()
def _start_browser(self):
try:
self.playwright = sync_playwright().start()
# Use headless=False with args=['--headless=new'] to avoid headless-shell dependency issues
self.browser = self.playwright.chromium.launch(headless=False, args=["--headless=new"])
self.context = self.browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
viewport={'width': 1920, 'height': 1080}
)
# Add stealth scripts if needed, but basic should work for now
except Exception as e:
logger.error(f"Failed to start Playwright: {e}")
def close(self):
if self._is_closed:
return
try:
if self.context:
self.context.close()
if self.browser:
self.browser.close()
if self.playwright:
self.playwright.stop()
except Exception as e:
pass
finally:
self._is_closed = True
def __del__(self):
if not hasattr(self, '_is_closed') or not self._is_closed:
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def download(self, url: str, save_dir: str = "./downloads") -> bool:
"""Main method to download a video from a share URL."""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
if not self.context:
logger.error("Browser not initialized.")
return False
try:
page = self.context.new_page()
# Network interception to catch video URLs
captured_video_urls = []
def handle_response(response):
try:
url = response.url
# Filter for likely video URLs
if 'douyinvod.com' in url and 'mime_type=audio' not in url:
if ('mime_type=video_mp4' in url or '.mp4' in url or 'video' in url):
if response.status == 200 or response.status == 206:
captured_video_urls.append(url)
elif 'video/mp4' in response.headers.get('content-type', ''):
captured_video_urls.append(url)
except:
pass
page.on("response", handle_response)
# 1. Resolve URL and Go to Page
logger.info(f"Processing URL: {url}")
# Handle share text
match = re.search(r'(https?://v\.douyin\.com/[a-zA-Z0-9\-_]+/?|https?://www\.douyin\.com/[^\s]+)', url)
if match:
url = match.group(0)
page.goto(url, wait_until='domcontentloaded')
logger.info("Page loaded.")
# Wait for video element
try:
logger.info("Waiting for video selector (using timeout)...")
# page.wait_for_selector('video', timeout=15000)
page.wait_for_timeout(5000)
logger.info("Timeout complete.")
# Try to wait for a real video source (douyinvod.com)
try:
logger.info("Waiting for video source (skipped)...")
# page.wait_for_function(...)
# page.wait_for_timeout(2000)
logger.info("Video source check skipped.")
except:
logger.info("Wait for video source timed out (continuing).")
pass
except Exception as e:
logger.warning(f"Timeout waiting for video: {e}")
if "验证码" in page.title() or "slider" in page.content():
logger.error("Encountered captcha. Skipping.")
page.close()
return False
# 2. Extract Info from Page
video_info = page.evaluate("""() => {
const video = document.querySelector('video');
if (video) {
const sources = Array.from(video.querySelectorAll('source')).map(s => s.src);
return {src: video.src, sources: sources};
}
return null;
}""")
candidates = []
if video_info:
if video_info.get('src'): candidates.append(video_info['src'])
if video_info.get('sources'): candidates.extend(video_info['sources'])
video_src = None
# Prioritize douyinvod.com links
for c in candidates:
if c and 'douyinvod.com' in c and not c.startswith('blob:'):
video_src = c
break
# Check network captured URLs if DOM extraction failed or returned blob
if not video_src or (video_src and video_src.startswith('blob:')):
# Filter captured URLs for best match
valid_captured = [u for u in captured_video_urls if 'douyinvod.com' in u]
if valid_captured:
video_src = valid_captured[0]
logger.info(f"Found video via network interception: {video_src}")
elif captured_video_urls:
# Avoid m3u8 or other non-mp4 if possible, but for now take what we have
video_src = captured_video_urls[0]
logger.info(f"Found video via network interception (generic): {video_src}")
# Fallback to any valid http link (excluding uuu_265 placeholder)
if not video_src:
for c in candidates:
if c and c.startswith('http') and not c.startswith('blob:') and 'uuu_265' not in c:
video_src = c
break
# Regex Fallback if still not found
if not video_src:
logger.info("Trying regex fallback for video URL...")
content = page.content()
import urllib.parse
# Look for encoded douyinvod.com links commonly found in RENDER_DATA
# Pattern: https%3A%2F%2F...douyinvod.com...
# Using a broad pattern to catch encoded URLs
regex_pattern = r'https(?:%3A%2F%2F|://)[a-zA-Z0-9\-\.]*douyinvod\.com(?:%2F|/)[^"&\s]+'
matches = re.findall(regex_pattern, content)
for m in matches:
decoded = urllib.parse.unquote(m)
if 'uuu_265' not in decoded and 'mime_type=audio' not in decoded:
video_src = decoded
logger.info(f"Found video via regex: {video_src}")
break
if not video_src:
logger.error(f"No valid video source found. Candidates: {candidates}")
page.close()
return False
if video_src and video_src.startswith('blob:'):
logger.error("Video src is still a blob and network interception failed.")
page.close()
return False
desc = page.title()
# Clean title (remove " - 抖音" etc)
desc = re.sub(r' - 抖音$', '', desc)
desc = self.utils.clean_filename(desc)
if not desc:
desc = f"video_{int(time.time())}"
# Append URL hash to ensure uniqueness
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
desc = f"{desc}_{url_hash}"
logger.info(f"Found video: {desc}")
logger.info(f"Video URL: {video_src}")
# 3. Download Video
filename = f"{desc}.mp4"
filepath = os.path.join(save_dir, filename)
if os.path.exists(filepath):
logger.info(f"File exists: {filepath}")
page.close()
return True
if video_src.startswith('//'):
video_src = 'https:' + video_src
# Use Playwright API Request to avoid 403 Forbidden
try:
response = self.context.request.get(
video_src,
headers={'Referer': 'https://www.douyin.com/'}
)
if response.ok:
with open(filepath, 'wb') as f:
f.write(response.body())
logger.info(f"Saved to {filepath}")
self._post_process_video(filepath)
page.close()
return True
else:
logger.error(f"Failed to download: {response.status} {response.status_text}")
# Fallback to requests if Playwright API fails (unlikely if 403 is the issue)
except Exception as e:
logger.error(f"Playwright download failed: {e}")
# Fallback to requests (old method)
cookies = {c['name']: c['value'] for c in self.context.cookies()}
with requests.get(video_src, cookies=cookies, headers={'User-Agent': self.session.headers['User-Agent'], 'Referer': 'https://www.douyin.com/'}, stream=True, timeout=60) as r:
r.raise_for_status()
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Saved to {filepath}")
self._post_process_video(filepath)
page.close()
return True
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return False
def _post_process_video(self, filepath: str):
"""Check and convert video to H.264 if needed."""
try:
# Check codec
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "stream=codec_name",
"-of", "default=noprint_wrappers=1:nokey=1",
filepath
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
codecs = result.stdout.strip().split('\n')
if "hevc" in codecs:
logger.info(f"HEVC codec detected in {filepath}. User has compatible player, skipping conversion.")
# logger.info(f"HEVC codec detected in {filepath}. Converting to H.264...")
# directory = os.path.dirname(filepath)
# filename = os.path.basename(filepath)
# name, ext = os.path.splitext(filename)
# temp_filepath = os.path.join(directory, f"{name}_temp{ext}")
# convert_cmd = [
# "ffmpeg", "-i", filepath,
# "-c:v", "libx264", "-c:a", "copy",
# "-y", temp_filepath
# ]
# subprocess.run(convert_cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# # Replace original
# os.remove(filepath)
# os.rename(temp_filepath, filepath)
# logger.info("Conversion successful.")
except Exception as e:
logger.warning(f"Post-processing (conversion) failed: {e}. File might be unplayable on some devices.")

102
DouYin/batch_download.py Normal file
View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import re
from VideoDownloader import VideoDownloader
def main():
url_file = r"d:\dsWork\aiData\DouYin\Url.txt"
save_dir = r"d:\dsWork\aiData\DouYin\DownloadedVideos"
if not os.path.exists(url_file):
print(f"Error: File not found: {url_file}")
return
if not os.path.exists(save_dir):
os.makedirs(save_dir)
with VideoDownloader() as downloader:
with open(url_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
print(f"Found {len(lines)} lines in {url_file}")
count = 0
failed_urls = []
# Log to file
log_file = open(os.path.join(save_dir, "batch_log.txt"), "a", encoding="utf-8")
log_file.write(f"\n--- Batch Download Started at {time.strftime('%Y-%m-%d %H:%M:%S')} ---\n")
try:
for i, line in enumerate(lines):
line = line.strip()
if not line:
continue
msg = f"[{i+1}/{len(lines)}] Processing line..."
print(msg)
log_file.write(msg + "\n")
log_file.flush()
# Extract URL using regex (matches https://v.douyin.com/...)
match = re.search(r'https?://v\.douyin\.com/[a-zA-Z0-9\-_]+/?', line)
if match:
url = match.group(0)
msg = f" Found URL: {url}"
print(msg)
log_file.write(msg + "\n")
log_file.flush()
success = False
# Retry logic
for attempt in range(3):
try:
success = downloader.download(url, save_dir)
if success:
break
msg = f" Attempt {attempt+1} failed. Retrying in 2s..."
print(msg)
log_file.write(msg + "\n")
log_file.flush()
time.sleep(2)
except Exception as e:
msg = f" Error on attempt {attempt+1}: {e}"
print(msg)
log_file.write(msg + "\n")
log_file.flush()
time.sleep(2)
if success:
count += 1
log_file.write(f" SUCCESS: {url}\n")
else:
msg = f" FAILED to download: {url}"
print(msg)
failed_urls.append(url)
log_file.write(msg + "\n")
# Sleep to be nice
time.sleep(1)
else:
msg = f" No valid URL found in line: {line[:50]}..."
print(msg)
log_file.write(msg + "\n")
except Exception as e:
msg = f"CRITICAL ERROR in batch loop: {e}"
print(msg)
log_file.write(msg + "\n")
finally:
log_file.write(f"--- Batch Download Ended at {time.strftime('%Y-%m-%d %H:%M:%S')} ---\n")
log_file.close()
print(f"Batch download completed. Successfully downloaded {count} videos.")
if failed_urls:
print(f"Failed to download {len(failed_urls)} videos:")
for u in failed_urls:
print(f" {u}")
if __name__ == "__main__":
main()

38
DouYin/check_codecs.py Normal file
View File

@@ -0,0 +1,38 @@
import os
import subprocess
def check_codecs(directory):
files = [f for f in os.listdir(directory) if f.endswith(".mp4")]
print(f"Checking {len(files)} files in {directory}...")
hevc_count = 0
h264_count = 0
for filename in files:
filepath = os.path.join(directory, filename)
try:
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "stream=codec_name",
"-of", "default=noprint_wrappers=1:nokey=1",
filepath
]
result = subprocess.run(cmd, capture_output=True, text=True)
codecs = result.stdout.strip().split('\n')
if "hevc" in codecs:
print(f"[HEVC] {filename}")
hevc_count += 1
elif "h264" in codecs:
# print(f"[H264] {filename}")
h264_count += 1
else:
print(f"[UNKNOWN] {filename}: {codecs}")
except Exception as e:
print(f"Error checking {filename}: {e}")
print(f"\nSummary: H.264: {h264_count}, HEVC: {hevc_count}")
check_codecs(r"d:\dsWork\aiData\DouYin\DownloadedVideos")

15
DouYin/check_headers.py Normal file
View File

@@ -0,0 +1,15 @@
import os
def check_headers(directory):
for filename in os.listdir(directory):
if filename.endswith(".mp4"):
filepath = os.path.join(directory, filename)
try:
with open(filepath, "rb") as f:
header = f.read(16)
print(f"{filename[:30]}... : {header}")
except Exception as e:
print(f"Error reading {filename}: {e}")
check_headers(r"d:\dsWork\aiData\DouYin\DownloadedVideos")

15
DouYin/check_path_len.py Normal file
View File

@@ -0,0 +1,15 @@
import os
directory = r"d:\dsWork\aiData\DouYin\DownloadedVideos"
files = os.listdir(directory)
print(f"Checking {len(files)} files in {directory}")
for f in files:
path = os.path.join(directory, f)
if len(path) > 200:
print(f"[LONG] ({len(path)}) {f}")
else:
# print(f"[OK] ({len(path)}) {f}")
pass

28
DouYin/check_streams.py Normal file
View File

@@ -0,0 +1,28 @@
import os
import subprocess
import logging
def check_streams():
directory = r"d:\dsWork\aiData\DouYin\DownloadedVideos"
files = [
"必须认真听的136号文详细解读#知识分享 #136号文#干货分享 @图钉 YCsolar @少帅 YCsolar @电交所的秦老师_bd190d35.mp4",
"售电报价“35几”别高兴太早这可能是你踩过最贵的坑 “35几”、“36几”的售电报价别高兴太早这背后可能藏着违规价、阴阳合同、魔鬼条款三大“地雷阵”我那个化工客户就为一份3毛6的合同一_a496a291.mp4"
]
for filename in files:
filepath = os.path.join(directory, filename)
if not os.path.exists(filepath):
print(f"File not found: {filename}")
continue
print(f"Checking: {filename}")
cmd = ["ffprobe", "-v", "error", "-show_streams", filepath]
try:
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
print(result.stdout)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
check_streams()

View File

@@ -0,0 +1,93 @@
import os
import subprocess
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Cleanup")
def check_file(filepath):
try:
# Check streams
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "stream=codec_type,codec_name",
"-of", "default=noprint_wrappers=1:nokey=1",
filepath
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
lines = result.stdout.strip().split('\n')
has_video = False
is_hevc = False
# ffprobe output alternates: codec_type then codec_name (or vice versa depending on -show_entries order?)
# Actually -show_entries stream=codec_type,codec_name output is like:
# audio
# aac
# video
# h264
# Let's parse robustly
content = result.stdout
if "video" in content:
has_video = True
if "hevc" in content:
is_hevc = True
return has_video, is_hevc
except Exception as e:
logger.error(f"Error checking {filepath}: {e}")
return True, False # Assume ok to avoid deleting good files on error
def convert_to_h264(filepath):
try:
directory = os.path.dirname(filepath)
filename = os.path.basename(filepath)
name, ext = os.path.splitext(filename)
temp_filepath = os.path.join(directory, f"{name}_temp{ext}")
logger.info(f"Converting {filename} to H.264...")
cmd = [
"ffmpeg", "-i", filepath,
"-c:v", "libx264", "-c:a", "copy",
"-y", temp_filepath
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
os.remove(filepath)
os.rename(temp_filepath, filepath)
logger.info(f"Converted: {filename}")
return True
except Exception as e:
logger.error(f"Failed to convert {filepath}: {e}")
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
return False
def main():
directory = r"d:\dsWork\aiData\DouYin\DownloadedVideos"
files = [f for f in os.listdir(directory) if f.endswith(".mp4")]
for filename in files:
filepath = os.path.join(directory, filename)
has_video, is_hevc = check_file(filepath)
if not has_video:
logger.warning(f"[DELETE] Audio only (no video stream): {filename}")
try:
os.remove(filepath)
except Exception as e:
logger.error(f"Failed to delete {filename}: {e}")
elif is_hevc:
logger.info(f"[CONVERT] HEVC detected: {filename}")
convert_to_h264(filepath)
else:
# logger.info(f"[OK] {filename}")
pass
if __name__ == "__main__":
main()

77
DouYin/convert_to_mp4.py Normal file
View File

@@ -0,0 +1,77 @@
import os
import subprocess
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("VideoConverter")
def get_codec(filepath):
try:
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "stream=codec_name",
"-of", "default=noprint_wrappers=1:nokey=1",
filepath
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return result.stdout.strip().split('\n')
except Exception as e:
logger.error(f"Error checking codec for {filepath}: {e}")
return []
def convert_to_h264(filepath):
try:
directory = os.path.dirname(filepath)
filename = os.path.basename(filepath)
name, ext = os.path.splitext(filename)
temp_filepath = os.path.join(directory, f"{name}_temp{ext}")
logger.info(f"Converting {filename} to H.264...")
cmd = [
"ffmpeg",
"-i", filepath,
"-c:v", "libx264",
"-c:a", "copy",
"-y",
temp_filepath
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# Replace original file
os.remove(filepath)
os.rename(temp_filepath, filepath)
logger.info(f"Successfully converted: {filename}")
return True
except Exception as e:
logger.error(f"Failed to convert {filepath}: {e}")
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
return False
def main():
directory = r"d:\dsWork\aiData\DouYin\DownloadedVideos"
if not os.path.exists(directory):
logger.error(f"Directory not found: {directory}")
return
files = [f for f in os.listdir(directory) if f.endswith(".mp4")]
total = len(files)
logger.info(f"Found {total} video files.")
for i, filename in enumerate(files, 1):
filepath = os.path.join(directory, filename)
codecs = get_codec(filepath)
if "hevc" in codecs:
logger.info(f"[{i}/{total}] HEVC detected: {filename}")
convert_to_h264(filepath)
else:
logger.info(f"[{i}/{total}] Skipping (already compatible or unknown): {filename} ({codecs})")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,45 @@
import os
import subprocess
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("CleanupSilent")
def has_audio_stream(filepath):
try:
cmd = [
"ffprobe", "-v", "error",
"-show_entries", "stream=codec_type",
"-of", "default=noprint_wrappers=1:nokey=1",
filepath
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return "audio" in result.stdout
except Exception as e:
logger.error(f"Error checking {filepath}: {e}")
return False # Assume bad if error
def main():
directory = r"d:\dsWork\aiData\DouYin\DownloadedVideos"
files = [f for f in os.listdir(directory) if f.endswith(".mp4")]
count = 0
for filename in files:
filepath = os.path.join(directory, filename)
if not has_audio_stream(filepath):
logger.warning(f"[DELETE] No audio stream: {filename}")
try:
os.remove(filepath)
count += 1
except Exception as e:
logger.error(f"Failed to delete {filename}: {e}")
else:
# logger.info(f"[OK] {filename}")
pass
logger.info(f"Deleted {count} silent videos.")
if __name__ == "__main__":
main()

97
DouYin/extract_audio.py Normal file
View File

@@ -0,0 +1,97 @@
import os
import subprocess
import logging
import shutil
import time
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("AudioExtractor")
def extract_audio():
video_dir = r"d:\dsWork\aiData\DouYin\DownloadedVideos"
audio_dir = r"d:\dsWork\aiData\DouYin\Audios"
if not os.path.exists(audio_dir):
os.makedirs(audio_dir)
files = [f for f in os.listdir(video_dir) if f.endswith(".mp4")]
logger.info(f"Found {len(files)} videos to process.")
# Use a fixed temp name to avoid encoding issues with ffmpeg
temp_input = os.path.join(audio_dir, "temp_process_input.mp4")
temp_output = os.path.join(audio_dir, "temp_process_output.mp3")
for filename in files:
video_path = os.path.join(video_dir, filename)
name, _ = os.path.splitext(filename)
audio_filename = f"{name}.mp3"
audio_path = os.path.join(audio_dir, audio_filename)
if os.path.exists(audio_path):
logger.info(f"Skipping (already exists): {audio_filename}")
continue
logger.info(f"Processing: {filename}")
try:
# 1.5 Check if video has audio stream using ffprobe
try:
probe_cmd = [
"ffprobe", "-v", "error", "-select_streams", "a:0",
"-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1",
video_path
]
# If ffprobe returns empty output, there is no audio stream
probe_output = subprocess.check_output(probe_cmd, stderr=subprocess.DEVNULL).decode().strip()
if not probe_output:
logger.warning(f"Skipping {filename}: No audio stream found.")
continue
except subprocess.CalledProcessError:
logger.warning(f"Skipping {filename}: ffprobe failed (possibly corrupt).")
continue
# 1. Copy video to temp file (to handle special chars in filename that ffmpeg might dislike)
# Using shutil.copyfile is reasonably fast
shutil.copyfile(video_path, temp_input)
# 2. Run ffmpeg on temp file
# -ar 16000: set sample rate to 16k (required for ASR)
# -ac 1: set to mono (usually better for ASR)
cmd = [
"ffmpeg", "-i", temp_input,
"-vn", "-acodec", "libmp3lame", "-q:a", "2",
"-ar", "16000", "-ac", "1",
"-y", temp_output
]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
# 3. Rename output to final name
if os.path.exists(audio_path):
os.remove(audio_path)
os.rename(temp_output, audio_path)
logger.info(f"Success: {audio_filename}")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to process {filename}: {e}")
logger.error(f"FFmpeg stderr: {e.stderr.decode('utf-8', errors='ignore')}")
except Exception as e:
logger.error(f"Failed to process {filename}: {e}")
finally:
# Cleanup temp files
if os.path.exists(temp_input):
try:
os.remove(temp_input)
except:
pass
if os.path.exists(temp_output):
try:
os.remove(temp_output)
except:
pass
if __name__ == "__main__":
extract_audio()

20
DouYin/find_bad_url.py Normal file
View File

@@ -0,0 +1,20 @@
import hashlib
def find_url(target_hash):
with open(r"d:\dsWork\aiData\DouYin\Url.txt", "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line: continue
# Extract URL same as in VideoDownloader
import re
match = re.search(r'(https?://v\.douyin\.com/[a-zA-Z0-9\-_]+/?|https?://www\.douyin\.com/[^\s]+)', line)
if match:
url = match.group(0)
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
if url_hash == target_hash:
print(f"Found URL: {url}")
return
find_url("5ae04852")

35
DouYin/test_html.py Normal file
View File

@@ -0,0 +1,35 @@
import requests
import re
import json
def test_html():
url = "https://www.douyin.com/video/7592981059516583202"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36',
'Referer': 'https://www.douyin.com/',
'Cookie': 's_v_web_id=verify_...; ttwid=...' # I might need real cookies
}
try:
res = requests.get(url, headers=headers, timeout=10)
print(f"Status Code: {res.status_code}")
if res.status_code == 200:
# Look for RENDER_DATA
match = re.search(r'<script id="RENDER_DATA" type="application/json">(.+?)</script>', res.text)
if match:
print("Found RENDER_DATA!")
data = json.loads(requests.utils.unquote(match.group(1)))
print(data.keys())
else:
print("RENDER_DATA not found.")
# Save to file to inspect
with open("debug_douyin.html", "w", encoding="utf-8") as f:
f.write(res.text)
print("Saved to debug_douyin.html")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
test_html()

37
DouYin/test_original.py Normal file
View File

@@ -0,0 +1,37 @@
import sys
import os
import logging
# Add project root to path
sys.path.append(r'd:\dsWork\aiData\DouYin')
from apiproxy.douyin.douyin import Douyin
from apiproxy.common import utils
# Configure logging
logging.basicConfig(level=logging.INFO)
def test():
dy = Douyin()
url = "https://v.douyin.com/gHWfWVgDVRo/"
print(f"Testing URL: {url}")
# Get Key
key_type, key = dy.getKey(url)
print(f"Key Type: {key_type}, Key: {key}")
if key and key_type == 'aweme':
# Get Info
print("Fetching info...")
info = dy.getAwemeInfo(key)
if info:
print(f"Success! Title: {info.get('desc')}")
else:
print("Failed to get info.")
else:
print("Failed to get key.")
if __name__ == "__main__":
test()

44
DouYin/test_playwright.py Normal file
View File

@@ -0,0 +1,44 @@
from playwright.sync_api import sync_playwright
import time
def test_playwright():
print("Starting Playwright test...")
try:
with sync_playwright() as p:
print("Launching browser...")
# Try to launch with headless=False but force headless mode via args to use regular chromium
try:
browser = p.chromium.launch(headless=False, args=["--headless=new"])
except Exception as e:
print(f"Failed to launch headless=False with args: {e}")
browser = p.chromium.launch(headless=False)
print("Browser launched.")
page = browser.new_page()
url = "https://www.douyin.com/video/7592981059516583202"
print(f"Navigating to {url}...")
page.goto(url)
print("Page title:", page.title())
# Try to get __ac_signature cookie
cookies = page.context.cookies()
found = False
for cookie in cookies:
if cookie['name'] == '__ac_signature':
print(f"Found cookie: {cookie['name']}")
found = True
break
if not found:
print("Cookie __ac_signature NOT found.")
browser.close()
print("Test finished successfully.")
except Exception as e:
print(f"Playwright failed: {e}")
if __name__ == "__main__":
test_playwright()

135
DouYin/transcribe_videos.py Normal file
View File

@@ -0,0 +1,135 @@
import os
import asyncio
import logging
import sys
from dashscope import Files
# Ensure project root is in path
sys.path.append(r"d:\dsWork\aiData")
from Util.ASRClient import ASRClient
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("Transcriber")
async def transcribe_all():
audio_dir = r"d:\dsWork\aiData\DouYin\Audios"
transcript_dir = r"d:\dsWork\aiData\DouYin\Transcripts"
if not os.path.exists(transcript_dir):
os.makedirs(transcript_dir)
client = ASRClient()
files = [f for f in os.listdir(audio_dir) if f.endswith(".mp3")]
logger.info(f"Found {len(files)} audio files.")
for filename in files:
audio_path = os.path.join(audio_dir, filename)
name, _ = os.path.splitext(filename)
txt_filename = f"{name}.txt"
txt_path = os.path.join(transcript_dir, txt_filename)
if os.path.exists(txt_path):
logger.info(f"Skipping (already exists): {txt_filename}")
continue
logger.info(f"Processing: {filename}")
uploaded_file = None
try:
# 1. Upload file to DashScope
logger.info(f"Uploading {filename} to DashScope...")
# Use purpose='assistants' to bypass jsonl check
upload_resp = Files.upload(audio_path, purpose='assistants', description=filename)
if upload_resp.status_code == 200:
# Handle output structure (dict or object)
output_data = upload_resp.output
uploaded_files = None
if hasattr(output_data, 'uploaded_files'):
uploaded_files = output_data.uploaded_files
elif isinstance(output_data, dict):
uploaded_files = output_data.get('uploaded_files')
if not uploaded_files:
logger.error(f"No uploaded_files in response: {output_data}")
continue
uploaded_file = uploaded_files[0]
logger.info(f"Uploaded file info: {uploaded_file}")
# Handle uploaded_file structure
file_id = None
if hasattr(uploaded_file, 'file_id'):
file_id = uploaded_file.file_id
elif isinstance(uploaded_file, dict):
file_id = uploaded_file.get('file_id')
if not file_id:
logger.error(f"No file_id in uploaded file: {uploaded_file}")
continue
logger.info(f"Uploaded successfully. File ID: {file_id}")
# Try passing file_id. If that fails, we might need another approach.
# According to some docs, file_urls=["file-xxx"] works.
target_url = file_id
else:
logger.error(f"Upload failed: {upload_resp}")
continue
# 2. Transcribe
logger.info(f"Transcribing {file_id}...")
output = await client.transcribe_audio(file_urls=[target_url])
if output and output.task_status == 'SUCCEEDED':
# Parse results
results = output.results
if results:
for res in results:
transcription_url = res.get('transcription_url')
if transcription_url:
# Download result
trans_data = await client.download_transcription_result(transcription_url)
if trans_data:
# Extract text
# text_with_ts = await client.extract_transcript_with_timestamp(trans_data)
text_clean = await client.extract_transcript_without_timestamp(trans_data)
# Save to file
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(text_clean)
logger.info(f"Saved transcript to: {txt_filename}")
else:
logger.error(f"Failed to download transcript for {filename}")
else:
logger.error(f"No transcription_url in result for {filename}")
else:
logger.error(f"No results in output for {filename}")
else:
logger.error(f"Transcription failed for {filename}")
except Exception as e:
logger.error(f"Error processing {filename}: {e}")
finally:
# 3. Cleanup: Delete uploaded file
if uploaded_file:
try:
fid = None
if hasattr(uploaded_file, 'file_id'):
fid = uploaded_file.file_id
elif isinstance(uploaded_file, dict):
fid = uploaded_file.get('file_id')
if fid:
Files.delete(fid)
logger.info(f"Deleted remote file {fid}")
except Exception as e:
logger.warning(f"Failed to delete remote file: {e}")
if __name__ == "__main__":
asyncio.run(transcribe_all())