Kokoro TTS自动化脚本编写:批量处理大量文件的完整方案

Kokoro TTS自动化脚本编写:批量处理大量文件的完整方案
Kokoro TTS自动化脚本编写批量处理大量文件的完整方案【免费下载链接】kokoro-ttsA CLI text-to-speech tool using the Kokoro model, supporting multiple languages, voices (with blending), and various input formats including EPUB books and PDF documents.项目地址: https://gitcode.com/gh_mirrors/ko/kokoro-ttsKokoro TTS是一款功能强大的命令行文本转语音工具支持多种语言、语音混合以及EPUB和PDF文档处理。对于需要处理大量文件的用户来说手动操作每个文件既耗时又容易出错。本文将为您提供一套完整的Kokoro TTS自动化脚本编写方案帮助您高效批量处理大量文件节省宝贵时间。为什么需要自动化脚本当您需要处理数十甚至数百个文本文件、电子书或文档时手动运行Kokoro TTS命令不仅繁琐还容易出错。自动化脚本可以帮助您批量处理一次性处理多个文件统一配置确保所有文件使用相同的语音、语速等参数错误处理自动处理失败的任务并重试进度跟踪实时监控处理进度结果整理自动组织输出文件结构准备工作与环境配置在开始编写自动化脚本之前确保您已经正确安装了Kokoro TTS# 使用uv安装推荐 uv tool install kokoro-tts # 或使用pip安装 pip install kokoro-tts下载必要的模型文件wget https://github.com/nazdridoy/kokoro-tts/releases/download/v1.0.0/voices-v1.0.bin wget https://github.com/nazdridoy/kokoro-tts/releases/download/v1.0.0/kokoro-v1.0.onnx基础批量处理脚本1. 文本文件批量转换脚本创建一个简单的Python脚本来自动处理多个文本文件import os import subprocess import sys def batch_process_text_files(input_dir, output_dir, voiceaf_sarah, speed1.0, langen-us): 批量处理文本文件 :param input_dir: 输入目录路径 :param output_dir: 输出目录路径 :param voice: 语音选择 :param speed: 语速 :param lang: 语言代码 # 确保输出目录存在 os.makedirs(output_dir, exist_okTrue) # 获取所有txt文件 txt_files [f for f in os.listdir(input_dir) if f.endswith(.txt)] print(f找到 {len(txt_files)} 个文本文件需要处理) for i, txt_file in enumerate(txt_files, 1): input_path os.path.join(input_dir, txt_file) output_name os.path.splitext(txt_file)[0] .wav output_path os.path.join(output_dir, output_name) print(f处理文件 {i}/{len(txt_files)}: {txt_file}) # 构建Kokoro TTS命令 cmd [ kokoro-tts, input_path, output_path, --voice, voice, --speed, str(speed), --lang, lang ] try: # 执行命令 result subprocess.run(cmd, capture_outputTrue, textTrue) if result.returncode 0: print(f✓ 成功处理: {txt_file}) else: print(f✗ 处理失败: {txt_file}) print(f错误信息: {result.stderr}) except Exception as e: print(f✗ 执行错误: {txt_file} - {e}) if __name__ __main__: # 示例用法 batch_process_text_files( input_dir./texts, output_dir./audio_output, voiceaf_sarah, speed1.2, langen-us )2. EPUB电子书批量处理脚本对于电子书爱好者这个脚本可以批量处理EPUB文件import os import subprocess import json def batch_process_epub_files(input_dir, output_base_dir, voiceaf_sarah, speed1.0, langen-us): 批量处理EPUB文件每个文件单独创建目录 :param input_dir: 输入目录路径 :param output_base_dir: 输出基础目录 :param voice: 语音选择 :param speed: 语速 :param lang: 语言代码 # 确保输出目录存在 os.makedirs(output_base_dir, exist_okTrue) # 获取所有epub文件 epub_files [f for f in os.listdir(input_dir) if f.endswith(.epub)] print(f找到 {len(epub_files)} 个EPUB文件需要处理) for i, epub_file in enumerate(epub_files, 1): input_path os.path.join(input_dir, epub_file) book_name os.path.splitext(epub_file)[0] output_dir os.path.join(output_base_dir, book_name) print(f处理电子书 {i}/{len(epub_files)}: {book_name}) # 构建Kokoro TTS命令分章节输出 cmd [ kokoro-tts, input_path, --split-output, output_dir, --voice, voice, --speed, str(speed), --lang, lang, --format, mp3 ] try: # 执行命令 result subprocess.run(cmd, capture_outputTrue, textTrue) if result.returncode 0: print(f✓ 成功处理: {book_name}) # 生成处理报告 generate_processing_report(output_dir, book_name) else: print(f✗ 处理失败: {book_name}) print(f错误信息: {result.stderr}) except Exception as e: print(f✗ 执行错误: {book_name} - {e}) def generate_processing_report(output_dir, book_name): 生成处理报告 report_path os.path.join(output_dir, processing_report.json) # 统计章节信息 chapters [] for item in os.listdir(output_dir): if item.startswith(chapter_) and os.path.isdir(os.path.join(output_dir, item)): chapter_dir os.path.join(output_dir, item) info_file os.path.join(chapter_dir, info.txt) chapter_info { chapter: item, chunks: len([f for f in os.listdir(chapter_dir) if f.startswith(chunk_) and f.endswith(.mp3)]) } if os.path.exists(info_file): with open(info_file, r, encodingutf-8) as f: chapter_info[title] f.read().strip() chapters.append(chapter_info) # 生成报告 report { book: book_name, total_chapters: len(chapters), chapters: chapters, total_audio_files: sum(c[chunks] for c in chapters) } with open(report_path, w, encodingutf-8) as f: json.dump(report, f, indent2, ensure_asciiFalse) print(f生成处理报告: {report_path}) if __name__ __main__: batch_process_epub_files( input_dir./ebooks, output_base_dir./audio_books, voiceaf_sarah, speed1.0, langen-us )高级自动化脚本方案3. 带错误重试和进度监控的脚本import os import subprocess import time import logging from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed # 配置日志 logging.basicConfig( levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(kokoro_batch.log), logging.StreamHandler() ] ) class KokoroBatchProcessor: def __init__(self, config): self.config config self.success_count 0 self.failed_count 0 self.start_time None def process_single_file(self, input_file, output_file, retry_count3): 处理单个文件支持重试 cmd [ kokoro-tts, input_file, output_file, --voice, self.config.get(voice, af_sarah), --speed, str(self.config.get(speed, 1.0)), --lang, self.config.get(lang, en-us), --format, self.config.get(format, wav) ] for attempt in range(retry_count): try: logging.info(f尝试 {attempt1}/{retry_count}: {input_file}) result subprocess.run( cmd, capture_outputTrue, textTrue, timeoutself.config.get(timeout, 300) # 5分钟超时 ) if result.returncode 0: logging.info(f✓ 成功处理: {input_file}) return True else: logging.warning(f处理失败 (尝试 {attempt1}): {input_file}) logging.warning(f错误: {result.stderr}) if attempt retry_count - 1: wait_time 5 * (attempt 1) # 指数退避 logging.info(f等待 {wait_time}秒后重试...) time.sleep(wait_time) except subprocess.TimeoutExpired: logging.error(f超时: {input_file}) except Exception as e: logging.error(f异常: {input_file} - {e}) logging.error(f✗ 处理失败已重试{retry_count}次: {input_file}) return False def batch_process(self, file_list, max_workers2): 批量处理文件支持并发 self.start_time datetime.now() total_files len(file_list) logging.info(f开始批量处理 {total_files} 个文件) logging.info(f配置: {self.config}) with ThreadPoolExecutor(max_workersmax_workers) as executor: futures {} for input_file, output_file in file_list: future executor.submit( self.process_single_file, input_file, output_file, self.config.get(retry_count, 3) ) futures[future] (input_file, output_file) for future in as_completed(futures): input_file, output_file futures[future] try: success future.result() if success: self.success_count 1 else: self.failed_count 1 # 更新进度 processed self.success_count self.failed_count progress processed / total_files * 100 logging.info(f进度: {processed}/{total_files} ({progress:.1f}%)) except Exception as e: logging.error(f任务异常: {input_file} - {e}) self.failed_count 1 self.generate_summary_report() def generate_summary_report(self): 生成汇总报告 end_time datetime.now() duration (end_time - self.start_time).total_seconds() report { start_time: self.start_time.isoformat(), end_time: end_time.isoformat(), duration_seconds: duration, total_files: self.success_count self.failed_count, success_count: self.success_count, failed_count: self.failed_count, success_rate: self.success_count / (self.success_count self.failed_count) * 100, config: self.config } report_file fbatch_report_{self.start_time.strftime(%Y%m%d_%H%M%S)}.json import json with open(report_file, w, encodingutf-8) as f: json.dump(report, f, indent2, ensure_asciiFalse) logging.info(f批量处理完成报告已保存到: {report_file}) logging.info(f成功: {self.success_count}, 失败: {self.failed_count}) logging.info(f总耗时: {duration:.1f}秒) # 使用示例 if __name__ __main__: # 配置文件 config { voice: af_sarah, speed: 1.2, lang: en-us, format: mp3, retry_count: 3, timeout: 600 # 10分钟超时 } # 准备文件列表 input_dir ./documents output_dir ./audio_output os.makedirs(output_dir, exist_okTrue) file_list [] for filename in os.listdir(input_dir): if filename.endswith((.txt, .pdf)): input_path os.path.join(input_dir, filename) output_name os.path.splitext(filename)[0] .mp3 output_path os.path.join(output_dir, output_name) file_list.append((input_path, output_path)) # 创建处理器并执行 processor KokoroBatchProcessor(config) processor.batch_process(file_list, max_workers2)实用技巧与最佳实践4. 配置文件管理创建一个配置文件来管理不同的处理方案# config.yaml profiles: english_audiobook: voice: af_sarah speed: 1.0 lang: en-us format: mp3 output_dir: ./audiobooks/english chinese_document: voice: zf_xiaoxiao speed: 1.1 lang: cmn format: wav output_dir: ./audiobooks/chinese fast_processing: voice: am_adam speed: 1.5 lang: en-us format: mp3 output_dir: ./quick_results5. 监控脚本创建一个实时监控脚本跟踪处理进度import time import os from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class AudioFileHandler(FileSystemEventHandler): def __init__(self, output_dir): self.output_dir output_dir self.processed_files set() def on_created(self, event): if not event.is_directory and event.src_path.endswith((.wav, .mp3)): filename os.path.basename(event.src_path) if filename not in self.processed_files: self.processed_files.add(filename) print(f✓ 新音频文件生成: {filename}) print(f 路径: {event.src_path}) print(f 大小: {os.path.getsize(event.src_path) / 1024 / 1024:.2f} MB) def monitor_directory(directory_to_watch): 监控目录中的新文件 print(f开始监控目录: {directory_to_watch}) print(按 CtrlC 停止监控) event_handler AudioFileHandler(directory_to_watch) observer Observer() observer.schedule(event_handler, directory_to_watch, recursiveTrue) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() print(\n监控已停止) observer.join() if __name__ __main__: monitor_directory(./audio_output)常见问题与解决方案Q1: 如何处理内存不足的问题解决方案使用--split-output参数分章节处理大型文件增加Python内存限制export PYTHONMALLOCmalloc分批处理文件不要一次性加载所有内容Q2: 如何处理处理中断的情况解决方案使用带断点续传的脚本检查已存在的输出文件跳过已处理的部分使用日志记录处理进度Q3: 如何优化处理速度解决方案使用并发处理但注意不要超过系统资源限制调整--speed参数提高语速使用更简单的语音模型如果质量可接受Q4: 如何确保语音质量一致性解决方案使用相同的语音和参数配置预处理文本确保格式统一使用--debug模式检查处理细节总结与建议Kokoro TTS的自动化脚本编写可以显著提高批量文件处理的效率。以下是一些关键建议从简单开始先实现基础功能再逐步添加高级特性做好错误处理确保脚本能够优雅地处理各种异常情况记录日志详细的日志有助于调试和监控测试充分在小规模数据集上测试脚本确保稳定后再处理重要文件资源管理监控系统资源使用避免过度消耗内存或CPU通过本文提供的完整方案您可以轻松构建适合自己需求的Kokoro TTS自动化处理系统无论是处理文档、电子书还是其他文本内容都能高效完成批量转换任务。记住自动化脚本的核心目标是让您专注于内容创作而不是重复的技术操作。现在就开始编写您的第一个Kokoro TTS自动化脚本吧【免费下载链接】kokoro-ttsA CLI text-to-speech tool using the Kokoro model, supporting multiple languages, voices (with blending), and various input formats including EPUB books and PDF documents.项目地址: https://gitcode.com/gh_mirrors/ko/kokoro-tts创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考