import os import subprocess import logging import shutil import time # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("AudioExtractor") def extract_audio(): video_dir = r"d:\dsWork\aiData\DouYin\DownloadedVideos" audio_dir = r"d:\dsWork\aiData\DouYin\Audios" if not os.path.exists(audio_dir): os.makedirs(audio_dir) files = [f for f in os.listdir(video_dir) if f.endswith(".mp4")] logger.info(f"Found {len(files)} videos to process.") # Use a fixed temp name to avoid encoding issues with ffmpeg temp_input = os.path.join(audio_dir, "temp_process_input.mp4") temp_output = os.path.join(audio_dir, "temp_process_output.mp3") for filename in files: video_path = os.path.join(video_dir, filename) name, _ = os.path.splitext(filename) audio_filename = f"{name}.mp3" audio_path = os.path.join(audio_dir, audio_filename) if os.path.exists(audio_path): logger.info(f"Skipping (already exists): {audio_filename}") continue logger.info(f"Processing: {filename}") try: # 1.5 Check if video has audio stream using ffprobe try: probe_cmd = [ "ffprobe", "-v", "error", "-select_streams", "a:0", "-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", video_path ] # If ffprobe returns empty output, there is no audio stream probe_output = subprocess.check_output(probe_cmd, stderr=subprocess.DEVNULL).decode().strip() if not probe_output: logger.warning(f"Skipping {filename}: No audio stream found.") continue except subprocess.CalledProcessError: logger.warning(f"Skipping {filename}: ffprobe failed (possibly corrupt).") continue # 1. Copy video to temp file (to handle special chars in filename that ffmpeg might dislike) # Using shutil.copyfile is reasonably fast shutil.copyfile(video_path, temp_input) # 2. Run ffmpeg on temp file # -ar 16000: set sample rate to 16k (required for ASR) # -ac 1: set to mono (usually better for ASR) cmd = [ "ffmpeg", "-i", temp_input, "-vn", "-acodec", "libmp3lame", "-q:a", "2", "-ar", "16000", "-ac", "1", "-y", temp_output ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) # 3. Rename output to final name if os.path.exists(audio_path): os.remove(audio_path) os.rename(temp_output, audio_path) logger.info(f"Success: {audio_filename}") except subprocess.CalledProcessError as e: logger.error(f"Failed to process {filename}: {e}") logger.error(f"FFmpeg stderr: {e.stderr.decode('utf-8', errors='ignore')}") except Exception as e: logger.error(f"Failed to process {filename}: {e}") finally: # Cleanup temp files if os.path.exists(temp_input): try: os.remove(temp_input) except: pass if os.path.exists(temp_output): try: os.remove(temp_output) except: pass if __name__ == "__main__": extract_audio()