Files
aiData/DouYin/downloader.py
HuangHai 733ff659fd 'commit'
2026-01-20 14:21:50 +08:00

1505 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
pip install playwright
抖音下载器 - 统一增强版
支持视频、图文、用户主页、合集等多种内容的批量下载
"""
import asyncio
import json
import logging
import os
import re
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from urllib.parse import urlparse
import argparse
import yaml
# 第三方库
try:
import aiohttp
import requests
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeRemainingColumn
from rich.table import Table
from rich.panel import Panel
from rich.live import Live
from rich import print as rprint
except ImportError as e:
print(f"请安装必要的依赖: pip install aiohttp requests rich pyyaml")
sys.exit(1)
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 导入项目模块
from apiproxy.douyin import douyin_headers
from apiproxy.douyin.urls import Urls
from apiproxy.douyin.result import Result
from apiproxy.common.utils import Utils
from apiproxy.douyin.auth.cookie_manager import AutoCookieManager
from apiproxy.douyin.database import DataBase
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('downloader.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Rich console
console = Console()
class ContentType:
"""内容类型枚举"""
VIDEO = "video"
IMAGE = "image"
USER = "user"
MIX = "mix"
MUSIC = "music"
LIVE = "live"
class DownloadStats:
"""下载统计"""
def __init__(self):
self.total = 0
self.success = 0
self.failed = 0
self.skipped = 0
self.start_time = time.time()
@property
def success_rate(self):
return (self.success / self.total * 100) if self.total > 0 else 0
@property
def elapsed_time(self):
return time.time() - self.start_time
def to_dict(self):
return {
'total': self.total,
'success': self.success,
'failed': self.failed,
'skipped': self.skipped,
'success_rate': f"{self.success_rate:.1f}%",
'elapsed_time': f"{self.elapsed_time:.1f}s"
}
class RateLimiter:
"""速率限制器"""
def __init__(self, max_per_second: float = 2):
self.max_per_second = max_per_second
self.min_interval = 1.0 / max_per_second
self.last_request = 0
async def acquire(self):
"""获取许可"""
current = time.time()
time_since_last = current - self.last_request
if time_since_last < self.min_interval:
await asyncio.sleep(self.min_interval - time_since_last)
self.last_request = time.time()
class RetryManager:
"""重试管理器"""
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.retry_delays = [1, 2, 5] # 重试延迟
async def execute_with_retry(self, func, *args, **kwargs):
"""执行函数并自动重试"""
last_error = None
for attempt in range(self.max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < self.max_retries - 1:
delay = self.retry_delays[min(attempt, len(self.retry_delays) - 1)]
logger.warning(f"{attempt + 1} 次尝试失败: {e}, {delay}秒后重试...")
await asyncio.sleep(delay)
raise last_error
class UnifiedDownloader:
"""统一下载器"""
def __init__(self, config_path: str = "config.yml"):
self.config = self._load_config(config_path)
self.urls_helper = Urls()
self.result_helper = Result()
self.utils = Utils()
# 组件初始化
self.stats = DownloadStats()
self.rate_limiter = RateLimiter(max_per_second=2)
self.retry_manager = RetryManager(max_retries=self.config.get('retry_times', 3))
# Cookie与请求头延迟初始化支持自动获取
self.cookies = self.config.get('cookies') if 'cookies' in self.config else self.config.get('cookie')
self.auto_cookie = bool(self.config.get('auto_cookie')) or (isinstance(self.config.get('cookie'), str) and self.config.get('cookie') == 'auto') or (isinstance(self.config.get('cookies'), str) and self.config.get('cookies') == 'auto')
self.headers = {**douyin_headers}
# 避免服务端使用brotli导致aiohttp无法解压未安装brotli库时会出现空响应
self.headers['accept-encoding'] = 'gzip, deflate'
# 增量下载与数据库
self.increase_cfg: Dict[str, Any] = self.config.get('increase', {}) or {}
self.enable_database: bool = bool(self.config.get('database', True))
self.db: Optional[DataBase] = DataBase() if self.enable_database else None
# 保存路径
self.save_path = Path(self.config.get('path', './Downloaded'))
self.save_path.mkdir(parents=True, exist_ok=True)
def _load_config(self, config_path: str) -> Dict:
"""加载配置文件"""
if not os.path.exists(config_path):
# 兼容配置文件命名:优先 config.yml其次 config_simple.yml
alt_path = 'config_simple.yml'
if os.path.exists(alt_path):
config_path = alt_path
else:
# 返回一个空配置,由命令行参数决定
return {}
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 简化配置兼容links/link, output_dir/path, cookie/cookies
if 'links' in config and 'link' not in config:
config['link'] = config['links']
if 'output_dir' in config and 'path' not in config:
config['path'] = config['output_dir']
if 'cookie' in config and 'cookies' not in config:
config['cookies'] = config['cookie']
if isinstance(config.get('cookies'), str) and config.get('cookies') == 'auto':
config['auto_cookie'] = True
# 允许无 link通过命令行传入
# 如果两者都没有,后续会在运行时提示
return config
def _build_cookie_string(self) -> str:
"""构建Cookie字符串"""
if isinstance(self.cookies, str):
return self.cookies
elif isinstance(self.cookies, dict):
return '; '.join([f'{k}={v}' for k, v in self.cookies.items()])
elif isinstance(self.cookies, list):
# 支持来自AutoCookieManager的cookies列表
try:
kv = {c.get('name'): c.get('value') for c in self.cookies if c.get('name') and c.get('value')}
return '; '.join([f'{k}={v}' for k, v in kv.items()])
except Exception:
return ''
return ''
async def _initialize_cookies_and_headers(self):
"""初始化Cookie与请求头支持自动获取"""
# 若配置为字符串 'auto',视为未提供,触发自动获取
if isinstance(self.cookies, str) and self.cookies.strip().lower() == 'auto':
self.cookies = None
# 若已显式提供cookies则直接使用
cookie_str = self._build_cookie_string()
if cookie_str:
self.headers['Cookie'] = cookie_str
# 同时设置到全局 douyin_headers确保所有 API 请求都能使用
from apiproxy.douyin import douyin_headers
douyin_headers['Cookie'] = cookie_str
return
# 自动获取Cookie
if self.auto_cookie:
try:
console.print("[cyan]🔐 正在自动获取Cookie...[/cyan]")
async with AutoCookieManager(cookie_file='cookies.pkl', headless=False) as cm:
cookies_list = await cm.get_cookies()
if cookies_list:
self.cookies = cookies_list
cookie_str = self._build_cookie_string()
if cookie_str:
self.headers['Cookie'] = cookie_str
# 同时设置到全局 douyin_headers确保所有 API 请求都能使用
from apiproxy.douyin import douyin_headers
douyin_headers['Cookie'] = cookie_str
console.print("[green]✅ Cookie获取成功[/green]")
return
console.print("[yellow]⚠️ 自动获取Cookie失败或为空继续尝试无Cookie模式[/yellow]")
except Exception as e:
logger.warning(f"自动获取Cookie失败: {e}")
console.print("[yellow]⚠️ 自动获取Cookie失败继续尝试无Cookie模式[/yellow]")
# 未能获取Cookie则不设置使用默认headers
def detect_content_type(self, url: str) -> ContentType:
"""检测URL内容类型"""
if '/user/' in url:
return ContentType.USER
elif '/video/' in url or 'v.douyin.com' in url:
return ContentType.VIDEO
elif '/note/' in url:
return ContentType.IMAGE
elif '/collection/' in url or '/mix/' in url:
return ContentType.MIX
elif '/music/' in url:
return ContentType.MUSIC
elif 'live.douyin.com' in url:
return ContentType.LIVE
else:
return ContentType.VIDEO # 默认当作视频
async def resolve_short_url(self, url: str) -> str:
"""解析短链接"""
if 'v.douyin.com' in url:
try:
# 使用同步请求获取重定向
response = requests.get(url, headers=self.headers, allow_redirects=True, timeout=10)
final_url = response.url
logger.info(f"解析短链接: {url} -> {final_url}")
return final_url
except Exception as e:
logger.warning(f"解析短链接失败: {e}")
return url
def extract_id_from_url(self, url: str, content_type: ContentType = None) -> Optional[str]:
"""从URL提取ID
Args:
url: 要解析的URL
content_type: 内容类型(可选,用于指导提取)
"""
# 如果已知是用户页面直接提取用户ID
if content_type == ContentType.USER or '/user/' in url:
user_patterns = [
r'/user/([\w-]+)',
r'sec_uid=([\w-]+)'
]
for pattern in user_patterns:
match = re.search(pattern, url)
if match:
user_id = match.group(1)
logger.info(f"提取到用户ID: {user_id}")
return user_id
# 视频ID模式优先
video_patterns = [
r'/video/(\d+)',
r'/note/(\d+)',
r'modal_id=(\d+)',
r'aweme_id=(\d+)',
r'item_id=(\d+)'
]
for pattern in video_patterns:
match = re.search(pattern, url)
if match:
video_id = match.group(1)
logger.info(f"提取到视频ID: {video_id}")
return video_id
# 其他模式
other_patterns = [
r'/collection/(\d+)',
r'/music/(\d+)'
]
for pattern in other_patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
# 尝试从URL中提取数字ID
number_match = re.search(r'(\d{15,20})', url)
if number_match:
video_id = number_match.group(1)
logger.info(f"从URL提取到数字ID: {video_id}")
return video_id
logger.error(f"无法从URL提取ID: {url}")
return None
def _get_aweme_id_from_info(self, info: Dict) -> Optional[str]:
"""从 aweme 信息中提取 aweme_id"""
try:
if 'aweme_id' in info:
return str(info.get('aweme_id'))
# aweme_detail 结构
return str(info.get('aweme', {}).get('aweme_id') or info.get('aweme_id'))
except Exception:
return None
def _get_sec_uid_from_info(self, info: Dict) -> Optional[str]:
"""从 aweme 信息中提取作者 sec_uid"""
try:
return info.get('author', {}).get('sec_uid')
except Exception:
return None
def _should_skip_increment(self, context: str, info: Dict, mix_id: Optional[str] = None, music_id: Optional[str] = None, sec_uid: Optional[str] = None) -> bool:
"""根据增量配置与数据库记录判断是否跳过下载"""
if not self.db:
return False
aweme_id = self._get_aweme_id_from_info(info)
if not aweme_id:
return False
try:
if context == 'post' and self.increase_cfg.get('post', False):
sec = sec_uid or self._get_sec_uid_from_info(info) or ''
return bool(self.db.get_user_post(sec, int(aweme_id)) if aweme_id.isdigit() else None)
if context == 'like' and self.increase_cfg.get('like', False):
sec = sec_uid or self._get_sec_uid_from_info(info) or ''
return bool(self.db.get_user_like(sec, int(aweme_id)) if aweme_id.isdigit() else None)
if context == 'mix' and self.increase_cfg.get('mix', False):
sec = sec_uid or self._get_sec_uid_from_info(info) or ''
mid = mix_id or ''
return bool(self.db.get_mix(sec, mid, int(aweme_id)) if aweme_id.isdigit() else None)
if context == 'music' and self.increase_cfg.get('music', False):
mid = music_id or ''
return bool(self.db.get_music(mid, int(aweme_id)) if aweme_id.isdigit() else None)
except Exception:
return False
return False
def _record_increment(self, context: str, info: Dict, mix_id: Optional[str] = None, music_id: Optional[str] = None, sec_uid: Optional[str] = None):
"""下载成功后写入数据库记录"""
if not self.db:
return
aweme_id = self._get_aweme_id_from_info(info)
if not aweme_id or not aweme_id.isdigit():
return
try:
if context == 'post':
sec = sec_uid or self._get_sec_uid_from_info(info) or ''
self.db.insert_user_post(sec, int(aweme_id), info)
elif context == 'like':
sec = sec_uid or self._get_sec_uid_from_info(info) or ''
self.db.insert_user_like(sec, int(aweme_id), info)
elif context == 'mix':
sec = sec_uid or self._get_sec_uid_from_info(info) or ''
mid = mix_id or ''
self.db.insert_mix(sec, mid, int(aweme_id), info)
elif context == 'music':
mid = music_id or ''
self.db.insert_music(mid, int(aweme_id), info)
except Exception:
pass
async def download_single_video(self, url: str, progress=None) -> bool:
"""下载单个视频/图文"""
try:
# 解析短链接
url = await self.resolve_short_url(url)
# 提取ID
video_id = self.extract_id_from_url(url, ContentType.VIDEO)
if not video_id:
logger.error(f"无法从URL提取ID: {url}")
return False
# 如果没有提取到视频ID尝试作为视频ID直接使用
if not video_id and '/user/' not in url:
# 可能短链接直接包含了视频ID
video_id = url.split('/')[-2] if url.endswith('/') else url.split('/')[-1]
logger.info(f"尝试从短链接路径提取ID: {video_id}")
if not video_id:
logger.error(f"无法从URL提取视频ID: {url}")
return False
# 限速
await self.rate_limiter.acquire()
# 获取视频信息
if progress:
progress.update(task_id=progress.task_ids[-1], description="获取视频信息...")
video_info = await self.retry_manager.execute_with_retry(
self._fetch_video_info, video_id
)
if not video_info:
logger.error(f"无法获取视频信息: {video_id}")
self.stats.failed += 1
return False
# 下载视频文件
if progress:
progress.update(task_id=progress.task_ids[-1], description="下载视频文件...")
success = await self._download_media_files(video_info, progress)
if success:
self.stats.success += 1
logger.info(f"✅ 下载成功: {url}")
else:
self.stats.failed += 1
logger.error(f"❌ 下载失败: {url}")
return success
except Exception as e:
logger.error(f"下载视频异常 {url}: {e}")
self.stats.failed += 1
return False
finally:
self.stats.total += 1
async def _fetch_video_info(self, video_id: str) -> Optional[Dict]:
"""获取视频信息"""
try:
# 直接使用 DouYinCommand.py 中成功的 Douyin 类
from apiproxy.douyin.douyin import Douyin
# 创建 Douyin 实例
dy = Douyin(database=False)
# 设置我们的 cookies 到 douyin_headers
if hasattr(self, 'cookies') and self.cookies:
cookie_str = self._build_cookie_string()
if cookie_str:
from apiproxy.douyin import douyin_headers
douyin_headers['Cookie'] = cookie_str
logger.info(f"设置 Cookie 到 Douyin 类: {cookie_str[:100]}...")
try:
# 使用现有的成功实现
result = dy.getAwemeInfo(video_id)
if result:
logger.info(f"Douyin 类成功获取视频信息: {result.get('desc', '')[:30]}")
return result
else:
logger.error("Douyin 类返回空结果")
except Exception as e:
logger.error(f"Douyin 类获取视频信息失败: {e}")
except Exception as e:
logger.error(f"导入或使用 Douyin 类失败: {e}")
import traceback
traceback.print_exc()
# 如果 Douyin 类失败尝试备用接口iesdouyin无需X-Bogus
try:
fallback_url = f"https://www.iesdouyin.com/web/api/v2/aweme/iteminfo/?item_ids={video_id}"
logger.info(f"尝试备用接口获取视频信息: {fallback_url}")
# 设置更通用的请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Referer': 'https://www.douyin.com/',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive'
}
async with aiohttp.ClientSession() as session:
async with session.get(fallback_url, headers=headers, timeout=15) as response:
logger.info(f"备用接口响应状态: {response.status}")
if response.status != 200:
logger.error(f"备用接口请求失败,状态码: {response.status}")
return None
text = await response.text()
logger.info(f"备用接口响应内容长度: {len(text)}")
if not text:
logger.error("备用接口响应为空")
return None
try:
data = json.loads(text)
logger.info(f"备用接口返回数据: {data}")
item_list = (data or {}).get('item_list') or []
if item_list:
aweme_detail = item_list[0]
logger.info("备用接口成功获取视频信息")
return aweme_detail
else:
logger.error("备用接口返回的数据中没有 item_list")
except json.JSONDecodeError as e:
logger.error(f"备用接口JSON解析失败: {e}")
logger.error(f"原始响应内容: {text}")
return None
except Exception as e:
logger.error(f"备用接口获取视频信息失败: {e}")
return None
def _build_detail_params(self, aweme_id: str) -> str:
"""构建详情API参数"""
# 使用与现有 douyinapi.py 相同的参数格式
params = [
f'aweme_id={aweme_id}',
'device_platform=webapp',
'aid=6383'
]
return '&'.join(params)
async def _download_media_files(self, video_info: Dict, progress=None) -> bool:
"""下载媒体文件"""
try:
# 判断类型
is_image = bool(video_info.get('images'))
# 构建保存路径
author_name = video_info.get('author', {}).get('nickname', 'unknown')
desc = video_info.get('desc', '')[:50].replace('/', '_')
# 兼容 create_time 为时间戳或格式化字符串
raw_create_time = video_info.get('create_time')
dt_obj = None
if isinstance(raw_create_time, (int, float)):
dt_obj = datetime.fromtimestamp(raw_create_time)
elif isinstance(raw_create_time, str) and raw_create_time:
for fmt in ('%Y-%m-%d %H.%M.%S', '%Y-%m-%d_%H-%M-%S', '%Y-%m-%d %H:%M:%S'):
try:
dt_obj = datetime.strptime(raw_create_time, fmt)
break
except Exception:
pass
if dt_obj is None:
dt_obj = datetime.fromtimestamp(time.time())
create_time = dt_obj.strftime('%Y-%m-%d_%H-%M-%S')
folder_name = f"{create_time}_{desc}" if desc else create_time
save_dir = self.save_path / author_name / folder_name
save_dir.mkdir(parents=True, exist_ok=True)
success = True
if is_image:
# 下载图文(无水印)
images = video_info.get('images', [])
for i, img in enumerate(images):
img_url = self._get_best_quality_url(img.get('url_list', []))
if img_url:
file_path = save_dir / f"image_{i+1}.jpg"
if await self._download_file(img_url, file_path):
logger.info(f"下载图片 {i+1}/{len(images)}: {file_path.name}")
else:
success = False
else:
# 下载视频(无水印)
video_url = self._get_no_watermark_url(video_info)
if video_url:
file_path = save_dir / f"{folder_name}.mp4"
if await self._download_file(video_url, file_path):
logger.info(f"下载视频: {file_path.name}")
else:
success = False
# 下载音频
if self.config.get('music', True):
music_url = self._get_music_url(video_info)
if music_url:
file_path = save_dir / f"{folder_name}_music.mp3"
await self._download_file(music_url, file_path)
# 下载封面
if self.config.get('cover', True):
cover_url = self._get_cover_url(video_info)
if cover_url:
file_path = save_dir / f"{folder_name}_cover.jpg"
await self._download_file(cover_url, file_path)
# 保存JSON数据
if self.config.get('json', True):
json_path = save_dir / f"{folder_name}_data.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(video_info, f, ensure_ascii=False, indent=2)
return success
except Exception as e:
logger.error(f"下载媒体文件失败: {e}")
return False
def _get_no_watermark_url(self, video_info: Dict) -> Optional[str]:
"""获取无水印视频URL"""
try:
# 优先使用play_addr_h264
play_addr = video_info.get('video', {}).get('play_addr_h264') or \
video_info.get('video', {}).get('play_addr')
if play_addr:
url_list = play_addr.get('url_list', [])
if url_list:
# 替换URL以获取无水印版本
url = url_list[0]
url = url.replace('playwm', 'play')
url = url.replace('720p', '1080p')
return url
# 备用download_addr
download_addr = video_info.get('video', {}).get('download_addr')
if download_addr:
url_list = download_addr.get('url_list', [])
if url_list:
return url_list[0]
except Exception as e:
logger.error(f"获取无水印URL失败: {e}")
return None
def _get_best_quality_url(self, url_list: List[str]) -> Optional[str]:
"""获取最高质量的URL"""
if not url_list:
return None
# 优先选择包含特定关键词的URL
for keyword in ['1080', 'origin', 'high']:
for url in url_list:
if keyword in url:
return url
# 返回第一个
return url_list[0]
def _get_music_url(self, video_info: Dict) -> Optional[str]:
"""获取音乐URL"""
try:
music = video_info.get('music', {})
play_url = music.get('play_url', {})
url_list = play_url.get('url_list', [])
return url_list[0] if url_list else None
except:
return None
def _get_cover_url(self, video_info: Dict) -> Optional[str]:
"""获取封面URL"""
try:
cover = video_info.get('video', {}).get('cover', {})
url_list = cover.get('url_list', [])
return self._get_best_quality_url(url_list)
except:
return None
async def _download_file(self, url: str, save_path: Path) -> bool:
"""下载文件"""
try:
if save_path.exists():
logger.info(f"文件已存在,跳过: {save_path.name}")
return True
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers) as response:
if response.status == 200:
content = await response.read()
with open(save_path, 'wb') as f:
f.write(content)
return True
else:
logger.error(f"下载失败,状态码: {response.status}")
return False
except Exception as e:
logger.error(f"下载文件失败 {url}: {e}")
return False
async def download_user_page(self, url: str) -> bool:
"""下载用户主页内容"""
try:
# 提取用户ID
user_id = self.extract_id_from_url(url, ContentType.USER)
if not user_id:
logger.error(f"无法从URL提取用户ID: {url}")
return False
console.print(f"\n[cyan]正在获取用户 {user_id} 的作品列表...[/cyan]")
# 根据配置下载不同类型的内容
mode = self.config.get('mode', ['post'])
if isinstance(mode, str):
mode = [mode]
# 增加总任务数统计
total_posts = 0
if 'post' in mode:
total_posts += self.config.get('number', {}).get('post', 0) or 1
if 'like' in mode:
total_posts += self.config.get('number', {}).get('like', 0) or 1
if 'mix' in mode:
total_posts += self.config.get('number', {}).get('allmix', 0) or 1
self.stats.total += total_posts
for m in mode:
if m == 'post':
await self._download_user_posts(user_id)
elif m == 'like':
await self._download_user_likes(user_id)
elif m == 'mix':
await self._download_user_mixes(user_id)
return True
except Exception as e:
logger.error(f"下载用户主页失败: {e}")
return False
async def _download_user_posts(self, user_id: str):
"""下载用户发布的作品"""
max_count = self.config.get('number', {}).get('post', 0)
cursor = 0
downloaded = 0
console.print(f"\n[green]开始下载用户发布的作品...[/green]")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
console=console
) as progress:
while True:
# 限速
await self.rate_limiter.acquire()
# 获取作品列表
posts_data = await self._fetch_user_posts(user_id, cursor)
if not posts_data:
break
aweme_list = posts_data.get('aweme_list', [])
if not aweme_list:
break
# 下载作品
for aweme in aweme_list:
if max_count > 0 and downloaded >= max_count:
console.print(f"[yellow]已达到下载数量限制: {max_count}[/yellow]")
return
# 时间过滤
if not self._check_time_filter(aweme):
continue
# 创建下载任务
task_id = progress.add_task(
f"下载作品 {downloaded + 1}",
total=100
)
# 增量判断
if self._should_skip_increment('post', aweme, sec_uid=user_id):
continue
# 下载
success = await self._download_media_files(aweme, progress)
if success:
downloaded += 1
self.stats.success += 1 # 增加成功计数
progress.update(task_id, completed=100)
self._record_increment('post', aweme, sec_uid=user_id)
else:
self.stats.failed += 1 # 增加失败计数
progress.update(task_id, description="[red]下载失败[/red]")
# 检查是否有更多
if not posts_data.get('has_more'):
break
cursor = posts_data.get('max_cursor', 0)
console.print(f"[green]✅ 用户作品下载完成,共下载 {downloaded} 个[/green]")
async def _fetch_user_posts(self, user_id: str, cursor: int = 0) -> Optional[Dict]:
"""获取用户作品列表"""
try:
# 直接使用 Douyin 类的 getUserInfo 方法,就像 DouYinCommand.py 那样
from apiproxy.douyin.douyin import Douyin
# 创建 Douyin 实例
dy = Douyin(database=False)
# 获取用户作品列表
result = dy.getUserInfo(
user_id,
"post",
35,
0, # 不限制数量
False, # 不启用增量
"", # start_time
"" # end_time
)
if result:
logger.info(f"Douyin 类成功获取用户作品列表,共 {len(result)} 个作品")
# 转换为期望的格式
return {
'status_code': 0,
'aweme_list': result,
'max_cursor': cursor,
'has_more': False
}
else:
logger.error("Douyin 类返回空结果")
return None
except Exception as e:
logger.error(f"获取用户作品列表失败: {e}")
import traceback
traceback.print_exc()
return None
async def _download_user_likes(self, user_id: str):
"""下载用户喜欢的作品"""
max_count = 0
try:
max_count = int(self.config.get('number', {}).get('like', 0))
except Exception:
max_count = 0
cursor = 0
downloaded = 0
console.print(f"\n[green]开始下载用户喜欢的作品...[/green]")
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeRemainingColumn(),
console=console
) as progress:
while True:
# 限速
await self.rate_limiter.acquire()
# 获取喜欢列表
likes_data = await self._fetch_user_likes(user_id, cursor)
if not likes_data:
break
aweme_list = likes_data.get('aweme_list', [])
if not aweme_list:
break
# 下载作品
for aweme in aweme_list:
if max_count > 0 and downloaded >= max_count:
console.print(f"[yellow]已达到下载数量限制: {max_count}[/yellow]")
return
if not self._check_time_filter(aweme):
continue
task_id = progress.add_task(
f"下载喜欢 {downloaded + 1}",
total=100
)
# 增量判断
if self._should_skip_increment('like', aweme, sec_uid=user_id):
continue
success = await self._download_media_files(aweme, progress)
if success:
downloaded += 1
progress.update(task_id, completed=100)
self._record_increment('like', aweme, sec_uid=user_id)
else:
progress.update(task_id, description="[red]下载失败[/red]")
# 翻页
if not likes_data.get('has_more'):
break
cursor = likes_data.get('max_cursor', 0)
console.print(f"[green]✅ 喜欢作品下载完成,共下载 {downloaded} 个[/green]")
async def _fetch_user_likes(self, user_id: str, cursor: int = 0) -> Optional[Dict]:
"""获取用户喜欢的作品列表"""
try:
params_list = [
f'sec_user_id={user_id}',
f'max_cursor={cursor}',
'count=35',
'aid=6383',
'device_platform=webapp',
'channel=channel_pc_web',
'pc_client_type=1',
'version_code=170400',
'version_name=17.4.0',
'cookie_enabled=true',
'screen_width=1920',
'screen_height=1080',
'browser_language=zh-CN',
'browser_platform=MacIntel',
'browser_name=Chrome',
'browser_version=122.0.0.0',
'browser_online=true'
]
params = '&'.join(params_list)
api_url = self.urls_helper.USER_FAVORITE_A
try:
xbogus = self.utils.getXbogus(params)
full_url = f"{api_url}{params}&X-Bogus={xbogus}"
except Exception as e:
logger.warning(f"获取X-Bogus失败: {e}, 尝试不带X-Bogus")
full_url = f"{api_url}{params}"
logger.info(f"请求用户喜欢列表: {full_url[:100]}...")
async with aiohttp.ClientSession() as session:
async with session.get(full_url, headers=self.headers, timeout=10) as response:
if response.status != 200:
logger.error(f"请求失败,状态码: {response.status}")
return None
text = await response.text()
if not text:
logger.error("响应内容为空")
return None
data = json.loads(text)
if data.get('status_code') == 0:
return data
else:
logger.error(f"API返回错误: {data.get('status_msg', '未知错误')}")
return None
except Exception as e:
logger.error(f"获取用户喜欢列表失败: {e}")
return None
async def _download_user_mixes(self, user_id: str):
"""下载用户的所有合集(按配置可限制数量)"""
max_allmix = 0
try:
# 兼容旧键名 allmix 或 mix
number_cfg = self.config.get('number', {}) or {}
max_allmix = int(number_cfg.get('allmix', number_cfg.get('mix', 0)) or 0)
except Exception:
max_allmix = 0
cursor = 0
fetched = 0
console.print(f"\n[green]开始获取用户合集列表...[/green]")
while True:
await self.rate_limiter.acquire()
mix_list_data = await self._fetch_user_mix_list(user_id, cursor)
if not mix_list_data:
break
mix_infos = mix_list_data.get('mix_infos') or []
if not mix_infos:
break
for mix in mix_infos:
if max_allmix > 0 and fetched >= max_allmix:
console.print(f"[yellow]已达到合集数量限制: {max_allmix}[/yellow]")
return
mix_id = mix.get('mix_id')
mix_name = mix.get('mix_name', '')
console.print(f"[cyan]下载合集[/cyan]: {mix_name} ({mix_id})")
await self._download_mix_by_id(mix_id)
fetched += 1
if not mix_list_data.get('has_more'):
break
cursor = mix_list_data.get('cursor', 0)
console.print(f"[green]✅ 用户合集下载完成,共处理 {fetched} 个[/green]")
async def _fetch_user_mix_list(self, user_id: str, cursor: int = 0) -> Optional[Dict]:
"""获取用户合集列表"""
try:
params_list = [
f'sec_user_id={user_id}',
f'cursor={cursor}',
'count=35',
'aid=6383',
'device_platform=webapp',
'channel=channel_pc_web',
'pc_client_type=1',
'version_code=170400',
'version_name=17.4.0',
'cookie_enabled=true',
'screen_width=1920',
'screen_height=1080',
'browser_language=zh-CN',
'browser_platform=MacIntel',
'browser_name=Chrome',
'browser_version=122.0.0.0',
'browser_online=true'
]
params = '&'.join(params_list)
api_url = self.urls_helper.USER_MIX_LIST
try:
xbogus = self.utils.getXbogus(params)
full_url = f"{api_url}{params}&X-Bogus={xbogus}"
except Exception as e:
logger.warning(f"获取X-Bogus失败: {e}, 尝试不带X-Bogus")
full_url = f"{api_url}{params}"
logger.info(f"请求用户合集列表: {full_url[:100]}...")
async with aiohttp.ClientSession() as session:
async with session.get(full_url, headers=self.headers, timeout=10) as response:
if response.status != 200:
logger.error(f"请求失败,状态码: {response.status}")
return None
text = await response.text()
if not text:
logger.error("响应内容为空")
return None
data = json.loads(text)
if data.get('status_code') == 0:
return data
else:
logger.error(f"API返回错误: {data.get('status_msg', '未知错误')}")
return None
except Exception as e:
logger.error(f"获取用户合集列表失败: {e}")
return None
async def download_mix(self, url: str) -> bool:
"""根据合集链接下载合集内所有作品"""
try:
mix_id = None
for pattern in [r'/collection/(\d+)', r'/mix/detail/(\d+)']:
m = re.search(pattern, url)
if m:
mix_id = m.group(1)
break
if not mix_id:
logger.error(f"无法从合集链接提取ID: {url}")
return False
await self._download_mix_by_id(mix_id)
return True
except Exception as e:
logger.error(f"下载合集失败: {e}")
return False
async def _download_mix_by_id(self, mix_id: str):
"""按合集ID下载全部作品"""
cursor = 0
downloaded = 0
console.print(f"\n[green]开始下载合集 {mix_id} ...[/green]")
while True:
await self.rate_limiter.acquire()
data = await self._fetch_mix_awemes(mix_id, cursor)
if not data:
break
aweme_list = data.get('aweme_list') or []
if not aweme_list:
break
for aweme in aweme_list:
success = await self._download_media_files(aweme)
if success:
downloaded += 1
if not data.get('has_more'):
break
cursor = data.get('cursor', 0)
console.print(f"[green]✅ 合集下载完成,共下载 {downloaded} 个[/green]")
async def _fetch_mix_awemes(self, mix_id: str, cursor: int = 0) -> Optional[Dict]:
"""获取合集下作品列表"""
try:
params_list = [
f'mix_id={mix_id}',
f'cursor={cursor}',
'count=35',
'aid=6383',
'device_platform=webapp',
'channel=channel_pc_web',
'pc_client_type=1',
'version_code=170400',
'version_name=17.4.0',
'cookie_enabled=true',
'screen_width=1920',
'screen_height=1080',
'browser_language=zh-CN',
'browser_platform=MacIntel',
'browser_name=Chrome',
'browser_version=122.0.0.0',
'browser_online=true'
]
params = '&'.join(params_list)
api_url = self.urls_helper.USER_MIX
try:
xbogus = self.utils.getXbogus(params)
full_url = f"{api_url}{params}&X-Bogus={xbogus}"
except Exception as e:
logger.warning(f"获取X-Bogus失败: {e}, 尝试不带X-Bogus")
full_url = f"{api_url}{params}"
logger.info(f"请求合集作品列表: {full_url[:100]}...")
async with aiohttp.ClientSession() as session:
async with session.get(full_url, headers=self.headers, timeout=10) as response:
if response.status != 200:
logger.error(f"请求失败,状态码: {response.status}")
return None
text = await response.text()
if not text:
logger.error("响应内容为空")
return None
data = json.loads(text)
# USER_MIX 返回没有统一的 status_code这里直接返回
return data
except Exception as e:
logger.error(f"获取合集作品失败: {e}")
return None
async def download_music(self, url: str) -> bool:
"""根据音乐页链接下载音乐下的所有作品(支持增量)"""
try:
# 提取 music_id
music_id = None
m = re.search(r'/music/(\d+)', url)
if m:
music_id = m.group(1)
if not music_id:
logger.error(f"无法从音乐链接提取ID: {url}")
return False
cursor = 0
downloaded = 0
limit_num = 0
try:
limit_num = int((self.config.get('number', {}) or {}).get('music', 0))
except Exception:
limit_num = 0
console.print(f"\n[green]开始下载音乐 {music_id} 下的作品...[/green]")
while True:
await self.rate_limiter.acquire()
data = await self._fetch_music_awemes(music_id, cursor)
if not data:
break
aweme_list = data.get('aweme_list') or []
if not aweme_list:
break
for aweme in aweme_list:
if limit_num > 0 and downloaded >= limit_num:
console.print(f"[yellow]已达到音乐下载数量限制: {limit_num}[/yellow]")
return True
if self._should_skip_increment('music', aweme, music_id=music_id):
continue
success = await self._download_media_files(aweme)
if success:
downloaded += 1
self._record_increment('music', aweme, music_id=music_id)
if not data.get('has_more'):
break
cursor = data.get('cursor', 0)
console.print(f"[green]✅ 音乐作品下载完成,共下载 {downloaded} 个[/green]")
return True
except Exception as e:
logger.error(f"下载音乐页失败: {e}")
return False
async def _fetch_music_awemes(self, music_id: str, cursor: int = 0) -> Optional[Dict]:
"""获取音乐下作品列表"""
try:
params_list = [
f'music_id={music_id}',
f'cursor={cursor}',
'count=35',
'aid=6383',
'device_platform=webapp',
'channel=channel_pc_web',
'pc_client_type=1',
'version_code=170400',
'version_name=17.4.0',
'cookie_enabled=true',
'screen_width=1920',
'screen_height=1080',
'browser_language=zh-CN',
'browser_platform=MacIntel',
'browser_name=Chrome',
'browser_version=122.0.0.0',
'browser_online=true'
]
params = '&'.join(params_list)
api_url = self.urls_helper.MUSIC
try:
xbogus = self.utils.getXbogus(params)
full_url = f"{api_url}{params}&X-Bogus={xbogus}"
except Exception as e:
logger.warning(f"获取X-Bogus失败: {e}, 尝试不带X-Bogus")
full_url = f"{api_url}{params}"
logger.info(f"请求音乐作品列表: {full_url[:100]}...")
async with aiohttp.ClientSession() as session:
async with session.get(full_url, headers=self.headers, timeout=10) as response:
if response.status != 200:
logger.error(f"请求失败,状态码: {response.status}")
return None
text = await response.text()
if not text:
logger.error("响应内容为空")
return None
data = json.loads(text)
return data
except Exception as e:
logger.error(f"获取音乐作品失败: {e}")
return None
def _check_time_filter(self, aweme: Dict) -> bool:
"""检查时间过滤"""
start_time = self.config.get('start_time')
end_time = self.config.get('end_time')
if not start_time and not end_time:
return True
raw_create_time = aweme.get('create_time')
if not raw_create_time:
return True
create_date = None
if isinstance(raw_create_time, (int, float)):
try:
create_date = datetime.fromtimestamp(raw_create_time)
except Exception:
create_date = None
elif isinstance(raw_create_time, str):
for fmt in ('%Y-%m-%d %H.%M.%S', '%Y-%m-%d_%H-%M-%S', '%Y-%m-%d %H:%M:%S'):
try:
create_date = datetime.strptime(raw_create_time, fmt)
break
except Exception:
pass
if create_date is None:
return True
if start_time:
start_date = datetime.strptime(start_time, '%Y-%m-%d')
if create_date < start_date:
return False
if end_time:
end_date = datetime.strptime(end_time, '%Y-%m-%d')
if create_date > end_date:
return False
return True
async def run(self):
"""运行下载器"""
# 显示启动信息
console.print(Panel.fit(
"[bold cyan]抖音下载器 v3.0 - 统一增强版[/bold cyan]\n"
"[dim]支持视频、图文、用户主页、合集批量下载[/dim]",
border_style="cyan"
))
# 初始化Cookie与请求头
await self._initialize_cookies_and_headers()
# 获取URL列表
urls = self.config.get('link', [])
# 兼容:单条字符串
if isinstance(urls, str):
urls = [urls]
if not urls:
console.print("[red]没有找到要下载的链接![/red]")
return
# 分析URL类型
console.print(f"\n[cyan]📊 链接分析[/cyan]")
url_types = {}
for url in urls:
content_type = self.detect_content_type(url)
url_types[url] = content_type
console.print(f"{content_type.upper()}: {url[:50]}...")
# 开始下载
console.print(f"\n[green]⏳ 开始下载 {len(urls)} 个链接...[/green]\n")
for i, url in enumerate(urls, 1):
content_type = url_types[url]
console.print(f"[{i}/{len(urls)}] 处理: {url}")
if content_type == ContentType.VIDEO or content_type == ContentType.IMAGE:
await self.download_single_video(url)
elif content_type == ContentType.USER:
await self.download_user_page(url)
# 若配置包含 like 或 mix顺带处理
modes = self.config.get('mode', ['post'])
if 'like' in modes:
user_id = self.extract_id_from_url(url, ContentType.USER)
if user_id:
await self._download_user_likes(user_id)
if 'mix' in modes:
user_id = self.extract_id_from_url(url, ContentType.USER)
if user_id:
await self._download_user_mixes(user_id)
elif content_type == ContentType.MIX:
await self.download_mix(url)
elif content_type == ContentType.MUSIC:
await self.download_music(url)
else:
console.print(f"[yellow]不支持的内容类型: {content_type}[/yellow]")
# 显示进度
console.print(f"进度: {i}/{len(urls)} | 成功: {self.stats.success} | 失败: {self.stats.failed}")
console.print("-" * 60)
# 显示统计
self._show_stats()
def _show_stats(self):
"""显示下载统计"""
console.print("\n" + "=" * 60)
# 创建统计表格
table = Table(title="📊 下载统计", show_header=True, header_style="bold magenta")
table.add_column("项目", style="cyan", width=12)
table.add_column("数值", style="green")
stats = self.stats.to_dict()
table.add_row("总任务数", str(stats['total']))
table.add_row("成功", str(stats['success']))
table.add_row("失败", str(stats['failed']))
table.add_row("跳过", str(stats['skipped']))
table.add_row("成功率", stats['success_rate'])
table.add_row("用时", stats['elapsed_time'])
console.print(table)
console.print("\n[bold green]✅ 下载任务完成![/bold green]")
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='抖音下载器 - 统一增强版',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'-c', '--config',
default='config.yml',
help='配置文件路径 (默认: config.yml自动兼容 config_simple.yml)'
)
parser.add_argument(
'-u', '--url',
nargs='+',
help='直接指定要下载的URL'
)
parser.add_argument(
'-p', '--path',
default=None,
help='保存路径 (覆盖配置文件)'
)
parser.add_argument(
'--auto-cookie',
action='store_true',
help='自动获取Cookie需要已安装Playwright'
)
parser.add_argument(
'--cookie',
help='手动指定Cookie字符串例如 "msToken=xxx; ttwid=yyy"'
)
args = parser.parse_args()
# 组合配置来源:优先命令行
temp_config = {}
if args.url:
temp_config['link'] = args.url
# 覆盖保存路径
if args.path:
temp_config['path'] = args.path
# Cookie配置
if args.auto_cookie:
temp_config['auto_cookie'] = True
temp_config['cookies'] = 'auto'
if args.cookie:
temp_config['cookies'] = args.cookie
temp_config['auto_cookie'] = False
# 如果存在临时配置,则生成一个临时文件供现有构造函数使用
if temp_config:
# 合并文件配置(如存在)
file_config = {}
if os.path.exists(args.config):
try:
with open(args.config, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f) or {}
except Exception:
file_config = {}
# 兼容简化键名
if 'links' in file_config and 'link' not in file_config:
file_config['link'] = file_config['links']
if 'output_dir' in file_config and 'path' not in file_config:
file_config['path'] = file_config['output_dir']
if 'cookie' in file_config and 'cookies' not in file_config:
file_config['cookies'] = file_config['cookie']
merged = {**(file_config or {}), **temp_config}
with open('temp_config.yml', 'w', encoding='utf-8') as f:
yaml.dump(merged, f, allow_unicode=True)
config_path = 'temp_config.yml'
else:
config_path = args.config
# 运行下载器
try:
downloader = UnifiedDownloader(config_path)
asyncio.run(downloader.run())
except KeyboardInterrupt:
console.print("\n[yellow]⚠️ 用户中断下载[/yellow]")
except Exception as e:
console.print(f"\n[red]❌ 程序异常: {e}[/red]")
logger.exception("程序异常")
finally:
# 清理临时配置
if args.url and os.path.exists('temp_config.yml'):
os.remove('temp_config.yml')
if __name__ == '__main__':
main()