diff --git a/app.py b/app.py index dac8c475..dad5f516 100644 --- a/app.py +++ b/app.py @@ -15,9 +15,9 @@ def check_python_version(): current_version = sys.version_info[:2] # (major, minor) if current_version < min_python_version or current_version > max_python_version: - error = f"""********** Error: Your OS Python version is not compatible! (current: {current_version[0]}.{current_version[1]}) + error = f'''********** Error: Your OS Python version is not compatible! (current: {current_version[0]}.{current_version[1]}) Please create a virtual python environment verrsion {min_python_version[0]}.{min_python_version[1]} or {max_python_version[0]}.{max_python_version[1]} - with conda or python -v venv **********""" + with conda or python -v venv **********''' print(error) return False else: @@ -25,7 +25,7 @@ def check_python_version(): def check_and_install_requirements(file_path): if not os.path.exists(file_path): - print(f"Warning: File {file_path} not found. Skipping package check.") + print(f'Warning: File {file_path} not found. Skipping package check.') try: from importlib.metadata import version, PackageNotFoundError with open(file_path, 'r') as f: @@ -39,42 +39,42 @@ def check_and_install_requirements(file_path): try: installed_version = version(pkg_name) except PackageNotFoundError: - print(f"{package} is missing.") + print(f'{package} is missing.') missing_packages.append(package) pass if missing_packages: - print("\nInstalling missing packages...") + print('\nInstalling missing packages...') try: - subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", "pip"] + missing_packages) + subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--upgrade', 'pip'] + missing_packages) except subprocess.CalledProcessError as e: - print(f"Failed to install packages: {e}") + print(f'Failed to install packages: {e}') return False from lib.functions import check_missing_files, download_model for mod in models.keys(): - if mod == "xtts": - mod_exists, err, list = check_missing_files(models[mod]["local"], models[mod]["files"]) + if mod == 'xtts': + mod_exists, err, list = check_missing_files(models[mod]['local'], models[mod]['files']) if mod_exists: - print("All specified xtts base model files are present in the folder.") + print('All specified xtts base model files are present in the folder.') else: - print("The following files are missing:", list) - print(f"Downloading {mod} files . . .") - download_model(models[mod]["local"], models[mod]["url"]) + print('The following files are missing:', list) + print(f'Downloading {mod} files . . .') + download_model(models[mod]['local'], models[mod]['url']) return True except Exception as e: - raise(f"An error occurred: {e}") + raise(f'An error occurred: {e}') def check_dictionary(): unidic_path = unidic.DICDIR - dicrc = os.path.join(unidic_path, "dicrc") + dicrc = os.path.join(unidic_path, 'dicrc') if not os.path.exists(dicrc) or os.path.getsize(dicrc) == 0: try: - print("UniDic dictionary not found or incomplete. Downloading now...") - subprocess.run(["python", "-m", "unidic", "download"], check=True) + print('UniDic dictionary not found or incomplete. Downloading now...') + subprocess.run(['python', '-m', 'unidic', 'download'], check=True) except subprocess.CalledProcessError as e: - print(f"Failed to download UniDic dictionary. Error: {e}") - raise SystemExit("Unable to continue without UniDic. Exiting...") + print(f'Failed to download UniDic dictionary. Error: {e}') + raise SystemExit('Unable to continue without UniDic. Exiting...') return True def is_port_in_use(port): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: @@ -84,12 +84,12 @@ def main(): global script_mode, share, ebooks_dir # Convert the list of languages to a string to display in the help text - lang_list_str = ", ".join(list(language_mapping.keys())) + lang_list_str = ', '.join(list(language_mapping.keys())) # Argument parser to handle optional parameters with descriptions parser = argparse.ArgumentParser( - description="Convert eBooks to Audiobooks using a Text-to-Speech model. You can either launch the Gradio interface or run the script in headless mode for direct conversion.", - epilog="""\ + description='Convert eBooks to Audiobooks using a Text-to-Speech model. You can either launch the Gradio interface or run the script in headless mode for direct conversion.', + epilog='''\ Example usage: Windows: headless: @@ -101,66 +101,66 @@ def main(): ./ebook2audiobook.sh --headless --ebook 'path_to_ebook' --voice 'path_to_voice' --language en --custom_model 'model.zip' Graphic Interface: ./ebook2audiobook.sh -""", +''', formatter_class=argparse.RawTextHelpFormatter ) options = [ - "--script_mode", "--share", "--headless", "--ebook", "--ebooks_dir", - "--voice", "--language", "--device", "--custom_model", - "--custom_model_url", "--temperature", - "--length_penalty", "--repetition_penalty", "--top_k", "--top_p", "--speed", - "--enable_text_splitting", "--version", "--help" + '--script_mode', '--share', '--headless', '--ebook', '--ebooks_dir', + '--voice', '--language', '--device', '--custom_model', + '--custom_model_url', '--temperature', + '--length_penalty', '--repetition_penalty', '--top_k', '--top_p', '--speed', + '--enable_text_splitting', '--version', '--help' ] parser.add_argument(options[0], type=str, - help="Force the script to run in 'native' or 'docker_utils'") - parser.add_argument(options[1], action="store_true", - help="Enable a public shareable Gradio link. Defaults to False.") + help='Force the script to run in NATIVE or DOCKER_UTILS') + parser.add_argument(options[1], action='store_true', + help='Enable a public shareable Gradio link. Defaults to False.') parser.add_argument(options[2], nargs='?', const=True, default=False, - help="Run in headless mode. Defaults to True if the flag is present without a value, False otherwise.") + help='Run in headless mode. Defaults to True if the flag is present without a value, False otherwise.') parser.add_argument(options[3], type=str, - help="Path to the ebook file for conversion. Required in headless mode.") - parser.add_argument(options[4], nargs='?', const="default", type=str, - help=f"Path to the directory containing ebooks for batch conversion. Defaults to '{os.path.basename(ebooks_dir)}' if 'default' value is provided.") + help='Path to the ebook file for conversion. Required in headless mode.') + parser.add_argument(options[4], nargs='?', const='default', type=str, + help=f'Path to the directory containing ebooks for batch conversion. Defaults to "{os.path.basename(ebooks_dir)}" if "default" is provided.') parser.add_argument(options[5], type=str, - help="Path to the target voice file for TTS. Optional, uses a default voice if not provided.") + help='Path to the target voice file for TTS. Optional, uses a default voice if not provided.') parser.add_argument(options[6], type=str, default=default_language_code, - help=f"Language for the audiobook conversion. Options: {lang_list_str}. Default to English (eng).") - parser.add_argument(options[7], type=str, default="cpu", choices=["cpu", "gpu"], - help=f"Type of processor unit for the audiobook conversion. If not specified: check first if gpu available, if not cpu is selected.") + help=f'Language for the audiobook conversion. Options: {lang_list_str}. Default to English (eng).') + parser.add_argument(options[7], type=str, default='cpu', choices=['cpu', 'gpu'], + help=f'Type of processor unit for the audiobook conversion. If not specified: check first if gpu available, if not cpu is selected.') parser.add_argument(options[8], type=str, - help="Path to the custom model file (.pth). Required if using a custom model.") + help='Path to the custom model file (.pth). Required if using a custom model.') parser.add_argument(options[9], type=str, help=("URL to download the custom model as a zip file. Optional, but will be used if provided. " "Examples include David Attenborough's model: " "'https://huggingface.co/drewThomasson/xtts_David_Attenborough_fine_tune/resolve/main/Finished_model_files.zip?download=true'. " "More XTTS fine-tunes can be found on my Hugging Face at 'https://huggingface.co/drewThomasson'.")) parser.add_argument(options[10], type=float, default=0.65, - help="Temperature for the model. Defaults to 0.65. Higher temperatures lead to more creative outputs.") + help='Temperature for the model. Defaults to 0.65. Higher temperatures lead to more creative outputs.') parser.add_argument(options[11], type=float, default=1.0, - help="A length penalty applied to the autoregressive decoder. Defaults to 1.0. Not applied to custom models.") + help='A length penalty applied to the autoregressive decoder. Defaults to 1.0. Not applied to custom models.') parser.add_argument(options[12], type=float, default=2.0, - help="A penalty that prevents the autoregressive decoder from repeating itself. Defaults to 2.0.") + help='A penalty that prevents the autoregressive decoder from repeating itself. Defaults to 2.0') parser.add_argument(options[13], type=int, default=50, - help="Top-k sampling. Lower values mean more likely outputs and increased audio generation speed. Defaults to 50.") + help='Top-k sampling. Lower values mean more likely outputs and increased audio generation speed. Defaults to 50') parser.add_argument(options[14], type=float, default=0.8, - help="Top-p sampling. Lower values mean more likely outputs and increased audio generation speed. Defaults to 0.8.") + help='Top-p sampling. Lower values mean more likely outputs and increased audio generation speed. Defaults to 0.8') parser.add_argument(options[15], type=float, default=1.0, - help="Speed factor for the speech generation. Defaults to 1.0.") - parser.add_argument(options[16], action="store_true", - help="Enable splitting text into sentences. Defaults to False.") - parser.add_argument(options[17], action="version",version=f"ebook2audiobook version {version}", - help="Show the version of the script and exit") + help='Speed factor for the speech generation. Defaults to 1.0') + parser.add_argument(options[16], action='store_true', + help='Enable splitting text into sentences. Defaults to False.') + parser.add_argument(options[17], action='version',version=f'ebook2audiobook version {version}', + help='Show the version of the script and exit') for arg in sys.argv: - if arg.startswith("--") and arg not in options: - print(f"Error: Unrecognized option '{arg}'") + if arg.startswith('--') and arg not in options: + print(f'Error: Unrecognized option "{arg}"') sys.exit(1) args = parser.parse_args() # Check if the port is already in use to prevent multiple launches if not args.headless and is_port_in_use(gradio_interface_port): - print(f"Error: Port {gradio_interface_port} is already in use. The web interface may already be running.") + print(f'Error: Port {gradio_interface_port} is already in use. The web interface may already be running.') sys.exit(1) script_mode = args.script_mode if args.script_mode else script_mode @@ -169,13 +169,13 @@ def main(): if script_mode == NATIVE: check_pkg = check_and_install_requirements(requirements_file) if check_pkg: - print("Package requirements ok") + print('Package requirements ok') if check_dictionary(): - print ("Dictionary ok") + print ('Dictionary ok') else: sys.exit(1) else: - print("Some packages could not be installed") + print('Some packages could not be installed') sys.exit(1) from lib.functions import web_interface, convert_ebook @@ -184,22 +184,22 @@ def main(): if args.headless: # Condition to stop if both --ebook and --ebooks_dir are provided if args.ebook and args.ebooks_dir: - print("Error: You cannot specify both --ebook and --ebooks_dir in headless mode.") + print('Error: You cannot specify both --ebook and --ebooks_dir in headless mode.') sys.exit(1) args.session = None # Condition 1: If --ebooks_dir exists, check value and set 'ebooks_dir' if args.ebooks_dir: - if args.ebooks_dir == "default": - print(f"Using the default ebooks_dir: {ebooks_dir}") + if args.ebooks_dir == 'default': + print(f'Using the default ebooks_dir: {ebooks_dir}') ebooks_dir = os.path.abspath(ebooks_dir) else: # Check if the directory exists if os.path.exists(args.ebooks_dir): ebooks_dir = os.path.abspath(args.ebooks_dir) else: - print(f"Error: The provided --ebooks_dir '{args.ebooks_dir}' does not exist.") + print(f'Error: The provided --ebooks_dir "{args.ebooks_dir}" does not exist.') sys.exit(1) if os.path.exists(ebooks_dir): @@ -207,24 +207,24 @@ def main(): # Process files with supported ebook formats if any(file.endswith(ext) for ext in ebook_formats): full_path = os.path.join(ebooks_dir, file) - print(f"Processing eBook file: {full_path}") + print(f'Processing eBook file: {full_path}') args.ebook = full_path progress_status, audiobook_file = convert_ebook(args) if audiobook_file is None: - print(f"Conversion failed: {progress_status}") + print(f'Conversion failed: {progress_status}') sys.exit(1) else: - print(f"Error: The directory {ebooks_dir} does not exist.") + print(f'Error: The directory {ebooks_dir} does not exist.') sys.exit(1) elif args.ebook: progress_status, audiobook_file = convert_ebook(args) if audiobook_file is None: - print(f"Conversion failed: {progress_status}") + print(f'Conversion failed: {progress_status}') sys.exit(1) else: - print("Error: In headless mode, you must specify either an ebook file using --ebook or an ebook directory using --ebooks_dir.") + print('Error: In headless mode, you must specify either an ebook file using --ebook or an ebook directory using --ebooks_dir.') sys.exit(1) else: passed_arguments = sys.argv[1:] @@ -233,7 +233,7 @@ def main(): if passed_args_set.issubset(allowed_arguments): web_interface(script_mode, share) else: - print("Error: In non-headless mode, no option or only '--share' can be passed") + print('Error: In non-headless mode, no option or only --share can be passed') sys.exit(1) if __name__ == '__main__': diff --git a/lib/conf.py b/lib/conf.py index 570ea8ec..966bd95d 100644 --- a/lib/conf.py +++ b/lib/conf.py @@ -1,40 +1,40 @@ import os -NATIVE = "native" -DOCKER_UTILS = "docker_utils" -FULL_DOCKER = "full_docker" +NATIVE = 'native' +DOCKER_UTILS = 'docker_utils' +FULL_DOCKER = 'full_docker' -version = "2.0.0" +version = '2.0.0' min_python_version = (3,10) max_python_version = (3,12) -requirements_file = os.path.abspath(os.path.join(".","requirements.txt")) +requirements_file = os.path.abspath(os.path.join('.','requirements.txt')) docker_utils_image = 'utils' gradio_interface_port = 7860 gradio_shared_expire = 72 # hours concurrency_limit = 16 # or None for unlimited -python_env_dir = os.path.abspath(os.path.join(".","python_env")) -models_dir = os.path.abspath(os.path.join(".","models")) -ebooks_dir = os.path.abspath(os.path.join(".","ebooks")) -processes_dir = os.path.abspath(os.path.join(".","tmp")) -audiobooks_gradio_dir = os.path.abspath(os.path.join(".","audiobooks","gui","gradio")) -audiobooks_host_dir = os.path.abspath(os.path.join(".","audiobooks","gui","host")) -audiobooks_cli_dir = os.path.abspath(os.path.join(".","audiobooks","cli")) +python_env_dir = os.path.abspath(os.path.join('.','python_env')) +models_dir = os.path.abspath(os.path.join('.','models')) +ebooks_dir = os.path.abspath(os.path.join('.','ebooks')) +processes_dir = os.path.abspath(os.path.join('.','tmp')) +audiobooks_gradio_dir = os.path.abspath(os.path.join('.','audiobooks','gui','gradio')) +audiobooks_host_dir = os.path.abspath(os.path.join('.','audiobooks','gui','host')) +audiobooks_cli_dir = os.path.abspath(os.path.join('.','audiobooks','cli')) # <<<<<<< HEAD -os.environ["CALIBRE_TEMP_DIR"] = processes_dir -os.environ["CALIBRE_CACHE_DIRECTORY"] = processes_dir -os.environ["CALIBRE_NO_NATIVE_FILEDIALOGS"] = "1" -os.environ["DO_NOT_TRACK"] = "true" -os.environ["HUGGINGFACE_HUB_CACHE"] = models_dir -os.environ["HF_HOME"] = models_dir -os.environ["HF_DATASETS_CACHE"] = models_dir -os.environ["HF_TOKEN_PATH"] = os.path.join(os.path.expanduser("~"), ".huggingface_token") -os.environ["TTS_CACHE"] = models_dir -os.environ["TORCH_HOME"] = models_dir -os.environ["XDG_CACHE_HOME"] = models_dir +os.environ['CALIBRE_TEMP_DIR'] = processes_dir +os.environ['CALIBRE_CACHE_DIRECTORY'] = processes_dir +os.environ['CALIBRE_NO_NATIVE_FILEDIALOGS'] = '1' +os.environ['DO_NOT_TRACK'] = 'true' +os.environ['HUGGINGFACE_HUB_CACHE'] = models_dir +os.environ['HF_HOME'] = models_dir +os.environ['HF_DATASETS_CACHE'] = models_dir +os.environ['HF_TOKEN_PATH'] = os.path.join(os.path.expanduser('~'), '.huggingface_token') +os.environ['TTS_CACHE'] = models_dir +os.environ['TORCH_HOME'] = models_dir +os.environ['XDG_CACHE_HOME'] = models_dir models = { "xtts": { @@ -52,8 +52,8 @@ } ebook_formats = ['.epub', '.mobi', '.azw3', 'fb2', 'lrf', 'rb', 'snb', 'tcr', '.pdf', '.txt', '.rtf', '.docx', '.html', '.odt', '.azw'] -audiobook_format = "m4b" -audio_proc_format = "wav" # only "wav" is valid for now +audiobook_format = 'm4b' +audio_proc_format = 'wav' # only 'wav' is valid for now xtts_fine_tuned_voice_actors = { "David Attenborough": { diff --git a/lib/functions.py b/lib/functions.py index 339436a2..ffba0434 100644 --- a/lib/functions.py +++ b/lib/functions.py @@ -38,12 +38,12 @@ import lib.lang as lang # Automatically accept the non-commercial license -os.environ["COQUI_TOS_AGREED"] = "1" +os.environ['COQUI_TOS_AGREED'] = '1' def inject_configs(target_namespace): # Extract variables from both modules and inject them into the target namespace for module in (conf, lang): - target_namespace.update({k: v for k, v in vars(module).items() if not k.startswith("__")}) + target_namespace.update({k: v for k, v in vars(module).items() if not k.startswith('__')}) # Inject configurations into the global namespace of this module inject_configs(globals()) @@ -79,7 +79,7 @@ def handle_exception(self): traceback.print_exc() # Print the exception message - print(f"Caught DependencyError: {self}") + print(f'Caught DependencyError: {self}') # Exit the script if it's not a web process if not is_gui_process: @@ -87,34 +87,34 @@ def handle_exception(self): def check_missing_files(dir_path, f_list): if not os.path.exists(dir_path): - return False, "Folder does not exist", f_list + return False, 'Folder does not exist', f_list existing_files = os.listdir(dir_path) missing_files = [file for file in f_list if file not in existing_files] if missing_files: - return False, "Some files are missing", missing_files - return True, "All files are present", [] + return False, 'Some files are missing', missing_files + return True, 'All files are present', [] def download_model(dest_dir, url): try: if not os.path.exists(dest_dir): os.makedirs(dest_dir) - zip_path = os.path.join(dest_dir, models["xtts"]["zip"]) - print("Downloading the XTTS v2 model...") + zip_path = os.path.join(dest_dir, models['xtts']['zip']) + print('Downloading the XTTS v2 model...') response = requests.get(url, stream=True) response.raise_for_status() # Raise an error for bad status codes total_size = int(response.headers.get('content-length', 0)) chunk_size = 1024 # Download in chunks of 1KB - with open(zip_path, "wb") as file, tqdm( - total=total_size, unit='B', unit_scale=True, desc="Downloading" + with open(zip_path, 'wb') as file, tqdm( + total=total_size, unit='B', unit_scale=True, desc='Downloading' ) as progress_bar: for chunk in response.iter_content(chunk_size=chunk_size): file.write(chunk) progress_bar.update(len(chunk)) - print("Extracting the model files...") - with zipfile.ZipFile(zip_path, "r") as zip_ref: + print('Extracting the model files...') + with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(dest_dir) os.remove(zip_path) - print("Model downloaded, extracted, and zip file removed successfully.") + print('Model downloaded, extracted, and zip file removed successfully.') except Exception as e: raise DependencyError(e) @@ -123,15 +123,15 @@ def prepare_dirs(src): resume = False os.makedirs(tmp_dir, exist_ok=True) os.makedirs(audiobooks_dir, exist_ok=True) - ebook["src"] = os.path.join(tmp_dir, os.path.basename(src)) - if os.path.exists(ebook["src"]): - if compare_files_by_hash(ebook["src"], src): + ebook['src'] = os.path.join(tmp_dir, os.path.basename(src)) + if os.path.exists(ebook['src']): + if compare_files_by_hash(ebook['src'], src): resume = True if not resume: - shutil.rmtree(ebook["chapters_dir"], ignore_errors=True) - os.makedirs(ebook["chapters_dir"], exist_ok=True) - os.makedirs(ebook["chapters_dir_sentences"], exist_ok=True) - shutil.copy(src, ebook["src"]) + shutil.rmtree(ebook['chapters_dir'], ignore_errors=True) + os.makedirs(ebook['chapters_dir'], exist_ok=True) + os.makedirs(ebook['chapters_dir_sentences'], exist_ok=True) + shutil.copy(src, ebook['src']) return True except Exception as e: raise DependencyError(e) @@ -141,11 +141,11 @@ def check_programs(prog_name, command, options): subprocess.run([command, options], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return True, None except FileNotFoundError: - e = f"""********** Error: {prog_name} is not installed! if your OS calibre package version - is not compatible you still can run ebook2audiobook.sh (linux/mac) or ebook2audiobook.cmd (windows) **********""" + e = f'''********** Error: {prog_name} is not installed! if your OS calibre package version + is not compatible you still can run ebook2audiobook.sh (linux/mac) or ebook2audiobook.cmd (windows) **********''' raise DependencyError(e) except subprocess.CalledProcessError: - e = f"Error: There was an issue running {prog_name}." + e = f'Error: There was an issue running {prog_name}.' raise DependencyError(e) def download_custom_model(url, dest_dir): @@ -157,34 +157,34 @@ def download_custom_model(url, dest_dir): response = requests.get(url, stream=True) response.raise_for_status() # Raise an error for bad responses f_path = os.path.join(dest_dir,fname) - with open(f_path, "wb") as file: + with open(f_path, 'wb') as file: for chunk in response.iter_content(chunk_size=8192): file.write(chunk) - print(f"File saved at: {f_path}") + print(f'File saved at: {f_path}') return extract_custom_model(f_path, dest_dir) except Exception as e: - raise RuntimeError(f"Error while downloading the file: {e}") + raise RuntimeError(f'Error while downloading the file: {e}') def extract_custom_model(f_path, dest_dir): try: with zipfile.ZipFile(f_path, 'r') as zip_ref: files = zip_ref.namelist() - with tqdm(total=len(files), unit="file", desc="Extracting Files") as t: + with tqdm(total=len(files), unit='file', desc='Extracting Files') as t: for file in files: if cancellation_requested.is_set(): - msg = "Cancel requested" + msg = 'Cancel requested' raise ValueError() if os.path.isfile(file): extracted_path = zip_ref.extract(file, dest_dir) t.update(1) os.remove(f_path) - print(f"Extracted files to {dest_dir}") - missing_files = [file for file in models["xtts"]["files"] if not os.path.exists(os.path.join(dest_dir, file))] + print(f'Extracted files to {dest_dir}') + missing_files = [file for file in models['xtts']['files'] if not os.path.exists(os.path.join(dest_dir, file))] if not missing_files: - print("All required model files found.") + print('All required model files found.') return dest_dir else: - print(f"Missing files: {', '.join(missing_files)}") + print(f'Missing files: {missing_files}') return False except Exception as e: raise DependencyError(e) @@ -214,18 +214,18 @@ def convert_to_epub(): if script_mode == DOCKER_UTILS: try: docker_dir = os.path.basename(tmp_dir) - docker_file_in = os.path.basename(ebook["src"]) - docker_file_out = os.path.basename(ebook["epub_path"]) + docker_file_in = os.path.basename(ebook['src']) + docker_file_out = os.path.basename(ebook['epub_path']) # Check if the input file is already an EPUB if docker_file_in.lower().endswith('.epub'): - shutil.copy(ebook["src"], ebook["epub_path"]) + shutil.copy(ebook['src'], ebook['epub_path']) return True # Convert the ebook to EPUB format using utils Docker image container = client.containers.run( docker_utils_image, - command=f"ebook-convert /files/{docker_dir}/{docker_file_in} /files/{docker_dir}/{docker_file_out}", + command=f'ebook-convert /files/{docker_dir}/{docker_file_in} /files/{docker_dir}/{docker_file_out}', volumes={tmp_dir: {'bind': f'/files/{docker_dir}', 'mode': 'rw'}}, remove=True, detach=False, @@ -242,8 +242,8 @@ def convert_to_epub(): raise DependencyError(e) else: try: - util_app = shutil.which("ebook-convert") - subprocess.run([util_app, ebook["src"], ebook["epub_path"]], check=True) + util_app = shutil.which('ebook-convert') + subprocess.run([util_app, ebook['src'], ebook['epub_path']], check=True) return True except subprocess.CalledProcessError as e: raise DependencyError(e) @@ -251,13 +251,13 @@ def convert_to_epub(): def get_cover(): try: cover_image = None - cover_path = os.path.join(tmp_dir, ebook["filename_noext"] + '.jpg') + cover_path = os.path.join(tmp_dir, ebook['filename_noext'] + '.jpg') cover_file = None - for item in ebook["epub"].get_items_of_type(ebooklib.ITEM_COVER): + for item in ebook['epub'].get_items_of_type(ebooklib.ITEM_COVER): cover_image = item.get_content() break if cover_image is None: - for item in ebook["epub"].get_items_of_type(ebooklib.ITEM_IMAGE): + for item in ebook['epub'].get_items_of_type(ebooklib.ITEM_IMAGE): if 'cover' in item.file_name.lower() or 'cover' in item.get_id().lower(): cover_image = item.get_content() break @@ -281,7 +281,7 @@ def assemble_audio(): # Process the chapter files in batches for i in range(0, len(chapter_files), batch_size): if cancellation_requested.is_set(): - msg = "Cancel requested" + msg = 'Cancel requested' raise ValueError(msg) batch_files = chapter_files[i:i + batch_size] @@ -290,7 +290,7 @@ def assemble_audio(): # Sequentially append each file in the current batch to the batch_audio for chapter_file in batch_files: if cancellation_requested.is_set(): - msg = "Cancel requested" + msg = 'Cancel requested' raise ValueError(msg) audio_segment = AudioSegment.from_wav(chapter_file) @@ -300,53 +300,53 @@ def assemble_audio(): combined_audio += batch_audio combined_audio.export(assembled_audio, format=audio_proc_format) - print(f"Combined audio saved to {assembled_audio}") + print(f'Combined audio saved to {assembled_audio}') return True except Exception as e: raise DependencyError(e) def generate_ffmpeg_metadata(): try: - ffmpeg_metadata = ";FFMETADATA1\n" - if ebook["metadata"].get("title"): - ffmpeg_metadata += f"title={ebook["metadata"]["title"]}\n" - if ebook["metadata"].get("creator"): - ffmpeg_metadata += f"artist={ebook["metadata"]["creator"]}\n" - if ebook["metadata"].get("language"): - ffmpeg_metadata += f"language={ebook["metadata"]["language"]}\n\n" - if ebook["metadata"].get("publisher"): - ffmpeg_metadata += f"publisher={ebook["metadata"]["publisher"]}\n" - if ebook["metadata"].get("description"): - ffmpeg_metadata += f"description={ebook["metadata"]["description"]}\n" # Description - if ebook["metadata"].get("published"): + ffmpeg_metadata = ';FFMETADATA1\n' + if ebook['metadata'].get('title'): + ffmpeg_metadata += f"title={ebook['metadata']['title']}\n" + if ebook['metadata'].get('creator'): + ffmpeg_metadata += f"artist={ebook['metadata']['creator']}\n" + if ebook['metadata'].get('language'): + ffmpeg_metadata += f"language={ebook['metadata']['language']}\n\n" + if ebook['metadata'].get('publisher'): + ffmpeg_metadata += f"publisher={ebook['metadata']['publisher']}\n" + if ebook['metadata'].get('description'): + ffmpeg_metadata += f"description={ebook['metadata']['description']}\n" + if ebook['metadata'].get('published'): # Check if the timestamp contains fractional seconds - if '.' in ebook["metadata"]["published"]: + if '.' in ebook['metadata']['published']: # Parse with fractional seconds - year = datetime.strptime(ebook["metadata"]["published"], "%Y-%m-%dT%H:%M:%S.%f%z").year + year = datetime.strptime(ebook['metadata']['published'], '%Y-%m-%dT%H:%M:%S.%f%z').year else: # Parse without fractional seconds - year = datetime.strptime(ebook["metadata"]["published"], "%Y-%m-%dT%H:%M:%S%z").year + year = datetime.strptime(ebook['metadata']['published'], '%Y-%m-%dT%H:%M:%S%z').year else: # If published is not provided, use the current year year = datetime.now().year - ffmpeg_metadata += f"year={year}\n" - if ebook["metadata"].get("identifiers") and isinstance(ebook["metadata"].get("identifiers"), dict): - isbn = ebook["metadata"]["identifiers"].get("isbn", None) + ffmpeg_metadata += f'year={year}\n' + if ebook['metadata'].get('identifiers') and isinstance(ebook['metadata'].get('identifiers'), dict): + isbn = ebook['metadata']['identifiers'].get('isbn', None) if isbn: - ffmpeg_metadata += f"isbn={isbn}\n" # ISBN - mobi_asin = ebook["metadata"]["identifiers"].get("mobi-asin", None) + ffmpeg_metadata += f'isbn={isbn}\n' # ISBN + mobi_asin = ebook['metadata']['identifiers'].get('mobi-asin', None) if mobi_asin: - ffmpeg_metadata += f"asin={mobi_asin}\n" # ASIN + ffmpeg_metadata += f'asin={mobi_asin}\n' # ASIN start_time = 0 for index, chapter_file in enumerate(chapter_files): if cancellation_requested.is_set(): - msg = "Cancel requested" + msg = 'Cancel requested' raise ValueError(msg) duration_ms = len(AudioSegment.from_wav(chapter_file)) - ffmpeg_metadata += f"[CHAPTER]\nTIMEBASE=1/1000\nSTART={start_time}\n" - ffmpeg_metadata += f"END={start_time + duration_ms}\ntitle=Chapter {index + 1}\n" + ffmpeg_metadata += f'[CHAPTER]\nTIMEBASE=1/1000\nSTART={start_time}\n' + ffmpeg_metadata += f'END={start_time + duration_ms}\ntitle=Chapter {index + 1}\n' start_time += duration_ms # Write the metadata to the file @@ -364,18 +364,18 @@ def export_audio(): ffmpeg_combined_audio = f'/files/{docker_dir}/' + os.path.basename(assembled_audio) ffmpeg_metadata_file = f'/files/{docker_dir}/' + os.path.basename(metadata_file) ffmpeg_final_file = f'/files/{docker_dir}/' + os.path.basename(docker_final_file) - if ebook["cover"] is not None: - ffmpeg_cover = f'/files/{docker_dir}/' + os.path.basename(ebook["cover"]) + if ebook['cover'] is not None: + ffmpeg_cover = f'/files/{docker_dir}/' + os.path.basename(ebook['cover']) - ffmpeg_cmd = ["ffmpeg", '-i', ffmpeg_combined_audio, '-i', ffmpeg_metadata_file] + ffmpeg_cmd = ['ffmpeg', '-i', ffmpeg_combined_audio, '-i', ffmpeg_metadata_file] else: ffmpeg_combined_audio = assembled_audio ffmpeg_metadata_file = metadata_file ffmpeg_final_file = final_file - if ebook["cover"] is not None: - ffmpeg_cover = ebook["cover"] + if ebook['cover'] is not None: + ffmpeg_cover = ebook['cover'] - ffmpeg_cmd = [shutil.which("ffmpeg"), '-i', ffmpeg_combined_audio, '-i', ffmpeg_metadata_file] + ffmpeg_cmd = [shutil.which('ffmpeg'), '-i', ffmpeg_combined_audio, '-i', ffmpeg_metadata_file] if ffmpeg_cover is not None: ffmpeg_cmd += ['-i', ffmpeg_cover, '-map', '0:a', '-map', '2:v'] @@ -428,13 +428,13 @@ def export_audio(): raise DependencyError(e) try: - chapter_files = sorted([os.path.join(ebook["chapters_dir"], f) for f in os.listdir(ebook["chapters_dir"]) if f.endswith('.' + audio_proc_format)], key=sort_key) + chapter_files = sorted([os.path.join(ebook['chapters_dir'], f) for f in os.listdir(ebook['chapters_dir']) if f.endswith('.' + audio_proc_format)], key=sort_key) assembled_audio = os.path.join(tmp_dir, 'assembled.'+audio_proc_format) metadata_file = os.path.join(tmp_dir, 'metadata.txt') if assemble_audio(): if generate_ffmpeg_metadata(): - final_name = ebook["metadata"]["title"] + '.' + audiobook_format + final_name = ebook['metadata']['title'] + '.' + audiobook_format docker_final_file = os.path.join(tmp_dir, final_name) final_file = os.path.join(audiobooks_dir, final_name) if export_audio(): @@ -446,7 +446,7 @@ def export_audio(): def get_chapters(language): try: - all_docs = list(ebook["epub"].get_items_of_type(ebooklib.ITEM_DOCUMENT)) + all_docs = list(ebook['epub'].get_items_of_type(ebooklib.ITEM_DOCUMENT)) if all_docs: all_docs = all_docs[1:] doc_patterns = [filter_pattern(str(doc)) for doc in all_docs if filter_pattern(str(doc))] @@ -456,7 +456,7 @@ def get_chapters(language): return chapters return False except Exception as e: - raise DependencyError(f"Error extracting main content pages: {e}") + raise DependencyError(f'Error extracting main content pages: {e}') def filter_doc(doc_patterns): pattern_counter = Counter(doc_patterns) @@ -473,7 +473,7 @@ def filter_pattern(doc_identifier): elif re.match(r'^[a-zA-Z]+$', segment): return segment elif re.match(r'^\d+$', segment): - return "numbers" + return 'numbers' return None def filter_chapter(doc, language): @@ -485,26 +485,26 @@ def filter_chapter(doc, language): def combine_audio_sentences(chapter_audio_file): try: - output_file = os.path.join(ebook["chapters_dir"], chapter_audio_file) + output_file = os.path.join(ebook['chapters_dir'], chapter_audio_file) combined_audio = AudioSegment.empty() sentences_dir_ordered = sorted( - [os.path.join(ebook["chapters_dir_sentences"], f) for f in os.listdir(ebook["chapters_dir_sentences"]) if f.endswith(audio_proc_format)], + [os.path.join(ebook['chapters_dir_sentences'], f) for f in os.listdir(ebook['chapters_dir_sentences']) if f.endswith(audio_proc_format)], key=lambda f: int(''.join(filter(str.isdigit, f))) ) for file in sentences_dir_ordered: if cancellation_requested.is_set(): - msg = "Cancel requested" + msg = 'Cancel requested' raise ValueError(msg) audio_segment = AudioSegment.from_wav(file) combined_audio += audio_segment combined_audio.export(output_file, format=audio_proc_format) - print(f"Combined audio saved to {output_file}") + print(f'Combined audio saved to {output_file}') except Exception as e: raise DependencyError(e) def get_sentences(sentence, language, max_pauses=4): - max_length = language_mapping[language]["char_limit"] - punctuation = language_mapping[language]["punctuation"] + max_length = language_mapping[language]['char_limit'] + punctuation = language_mapping[language]['punctuation'] parts = [] while len(sentence) > max_length or sum(sentence.count(p) for p in punctuation) > max_pauses: possible_splits = [i for i, char in enumerate(sentence[:max_length]) if char in punctuation] @@ -527,116 +527,116 @@ def convert_chapters_to_audio(params): progress_bar = None if is_gui_process: progress_bar = gr.Progress(track_tqdm=True) - if params["clone_voice_file"] is None: - params["clone_voice_file"] = default_clone_voice_file - print("Loading the TTS model ...") - params["tts_model"] = None - if ebook["custom_model"] is not None or ebook["metadata"]["language"] in language_xtts: - params["tts_model"] = "xtts" - if ebook["custom_model"] is not None: - model_path = ebook["custom_model"] + if params['clone_voice_file'] is None: + params['clone_voice_file'] = default_clone_voice_file + print('Loading the TTS model ...') + params['tts_model'] = None + if ebook['custom_model'] is not None or ebook['metadata']['language'] in language_xtts: + params['tts_model'] = 'xtts' + if ebook['custom_model'] is not None: + model_path = ebook['custom_model'] else: - model_path = models["xtts"]["local"] + model_path = models['xtts']['local'] - config_path = os.path.join(models[params["tts_model"]]["local"],models[params["tts_model"]]["files"][0]) - vocab_path = os.path.join(models[params["tts_model"]]["local"],models[params["tts_model"]]["files"][2]) + config_path = os.path.join(models[params['tts_model']]['local'],models[params['tts_model']]['files'][0]) + vocab_path = os.path.join(models[params['tts_model']]['local'],models[params['tts_model']]['files'][2]) config = XttsConfig() config.models_dir = models_dir config.load_json(config_path) - params["tts"] = Xtts.init_from_config(config) - params["tts"].to(params["device"]) - params["tts"].load_checkpoint(config, checkpoint_dir=model_path, vocab_path=vocab_path) - print("Computing speaker latents...") - params["gpt_cond_latent"], params["speaker_embedding"] = params["tts"].get_conditioning_latents(audio_path=[params["clone_voice_file"]]) + params['tts'] = Xtts.init_from_config(config) + params['tts'].to(params['device']) + params['tts'].load_checkpoint(config, checkpoint_dir=model_path, vocab_path=vocab_path) + print('Computing speaker latents...') + params['gpt_cond_latent'], params['speaker_embedding'] = params['tts'].get_conditioning_latents(audio_path=[params['clone_voice_file']]) else: - params["tts_model"] = "mms" - mms_dir = os.path.join(models_dir,"mms") - local_model_path = os.path.join(mms_dir, f'tts_models/{ebook["metadata"]["language"]}/fairseq/vits') + params['tts_model'] = 'mms' + mms_dir = os.path.join(models_dir,'mms') + local_model_path = os.path.join(mms_dir, f'tts_models/{ebook['metadata']['language']}/fairseq/vits') if os.path.isdir(local_model_path): - params["tts"] = TTS(local_model_path) + params['tts'] = TTS(local_model_path) else: - params["tts"] = TTS(f'tts_models/{ebook["metadata"]["language"]}/fairseq/vits') - params["tts"].to(params["device"]) + params['tts'] = TTS(f'tts_models/{ebook['metadata']['language']}/fairseq/vits') + params['tts'].to(params['device']) - total_chapters = len(ebook["chapters"]) - total_sentences = sum(len(array) for array in ebook["chapters"]) + total_chapters = len(ebook['chapters']) + total_sentences = sum(len(array) for array in ebook['chapters']) resume_chapter = 0 resume_sentence = 0 current_sentence = 0 # Check existing files to resume the process if it was interrupted - existing_chapters = sorted([f for f in os.listdir(ebook["chapters_dir"]) if f.endswith(f".{audio_proc_format}")]) - existing_sentences = sorted([f for f in os.listdir(ebook["chapters_dir_sentences"]) if f.endswith(f".{audio_proc_format}")]) + existing_chapters = sorted([f for f in os.listdir(ebook['chapters_dir']) if f.endswith(f'.{audio_proc_format}')]) + existing_sentences = sorted([f for f in os.listdir(ebook['chapters_dir_sentences']) if f.endswith(f'.{audio_proc_format}')]) if existing_chapters: resume_chapter = len(existing_chapters) - print(f"Resuming from chapter {resume_chapter}") + print(f'Resuming from chapter {resume_chapter}') if existing_sentences: resume_sentence = len(existing_sentences) - 1 # Remove the last file (possibly incomplete or corrupted) - last_resume_sentence_file = os.path.join(ebook["chapters_dir_sentences"], existing_sentences[resume_sentence]) + last_resume_sentence_file = os.path.join(ebook['chapters_dir_sentences'], existing_sentences[resume_sentence]) os.remove(last_resume_sentence_file) - print(f"Resuming from sentence {resume_sentence}") + print(f'Resuming from sentence {resume_sentence}') - with tqdm(total=total_sentences, desc="Processing 0.00%", bar_format='{desc}: {n_fmt}/{total_fmt} ', unit="step") as t: - if ebook["metadata"].get("creator"): + with tqdm(total=total_sentences, desc='Processing 0.00%', bar_format='{desc}: {n_fmt}/{total_fmt} ', unit='step') as t: + if ebook['metadata'].get('creator'): if resume_sentence == 0: - params["sentence_audio_file"] = os.path.join(ebook["chapters_dir_sentences"], f"{current_sentence}.{audio_proc_format}") - params["sentence"] = f' {ebook["metadata"]["creator"]}, {ebook["metadata"]["title"]}. ' + params['sentence_audio_file'] = os.path.join(ebook['chapters_dir_sentences'], f'{current_sentence}.{audio_proc_format}') + params['sentence'] = f" {ebook['metadata']['creator']}, {ebook['metadata']['title']}. " if convert_sentence_to_audio(params): current_sentence = 1 else: - print("convert_sentence_to_audio() Author and Title failed!") + print('convert_sentence_to_audio() Author and Title failed!') return False for x in range(resume_chapter, total_chapters): chapter_num = x + 1 - chapter_audio_file = f"chapter_{chapter_num}.{audio_proc_format}" - sentences = ebook["chapters"][x] + chapter_audio_file = f'chapter_{chapter_num}.{audio_proc_format}' + sentences = ebook['chapters'][x] for i, sentence in enumerate(sentences): if current_sentence >= resume_sentence: if cancellation_requested.is_set(): - stop_and_detach_tts(params["tts"]) - raise ValueError("Cancel requested") + stop_and_detach_tts(params['tts']) + raise ValueError('Cancel requested') - print(f"Sentence: {sentence}...") - params["sentence"] = sentence - params["sentence_audio_file"] = os.path.join(ebook["chapters_dir_sentences"], f"{current_sentence}.{audio_proc_format}") + print(f'Sentence: {sentence}...') + params['sentence'] = sentence + params['sentence_audio_file'] = os.path.join(ebook['chapters_dir_sentences'], f'{current_sentence}.{audio_proc_format}') if not convert_sentence_to_audio(params): - print("convert_sentence_to_audio() failed!") + print('convert_sentence_to_audio() failed!') return False percentage = (current_sentence / total_sentences) * 100 - t.set_description(f"Processing {percentage:.2f}%") + t.set_description(f'Processing {percentage:.2f}%') t.update(1) if progress_bar is not None: progress_bar(current_sentence / total_sentences) current_sentence += 1 combine_audio_sentences(chapter_audio_file) - print(f"Converted chapter {chapter_num} to audio.") + print(f'Converted chapter {chapter_num} to audio.') return True except Exception as e: raise DependencyError(e) def convert_sentence_to_audio(params): try: - if params["tts_model"] == "xtts": - output = params["tts"].inference( - text=params["sentence"], language=ebook["metadata"]["language_iso1"], gpt_cond_latent=params["gpt_cond_latent"], speaker_embedding=params["speaker_embedding"], - temperature=params["temperature"], repetition_penalty=params["repetition_penalty"], top_k=params["top_k"], top_p=params["top_p"], - speed=params["speed"], enable_text_splitting=params["enable_text_splitting"], prosody=None + if params['tts_model'] == 'xtts': + output = params['tts'].inference( + text=params['sentence'], language=ebook['metadata']['language_iso1'], gpt_cond_latent=params['gpt_cond_latent'], speaker_embedding=params['speaker_embedding'], + temperature=params['temperature'], repetition_penalty=params['repetition_penalty'], top_k=params['top_k'], top_p=params['top_p'], + speed=params['speed'], enable_text_splitting=params['enable_text_splitting'], prosody=None ) - torchaudio.save(params["sentence_audio_file"], torch.tensor(output[audio_proc_format]).unsqueeze(0), 24000) + torchaudio.save(params['sentence_audio_file'], torch.tensor(output[audio_proc_format]).unsqueeze(0), 24000) else: - params["tts"].tts_with_vc_to_file( - text=params["sentence"], - #language=params["language"], # can be used only if multilingual model - speaker_wav=params["clone_voice_file"], - file_path=params["sentence_audio_file"], - split_sentences=params["enable_text_splitting"] + params['tts'].tts_with_vc_to_file( + text=params['sentence'], + #language=params['language'], # can be used only if multilingual model + speaker_wav=params['clone_voice_file'], + file_path=params['sentence_audio_file'], + split_sentences=params['enable_text_splitting'] ) return True except Exception as e: @@ -660,7 +660,7 @@ def romanToInt(s): return num def replace_roman_numbers(text): - # Regular expression to match "chapter xxx" (case insensitive) + # Regular expression to match 'chapter xxx' (case insensitive) roman_chapter_pattern = re.compile(r'\b(chapter|chapitre|capitolo|capítulo|Kapitel|глава|κεφάλαιο|capítulo|capitul|глава|poglavlje)\s(M{0,4}(CM|CD|D?C{0,3})(XC|XL|L?X{0,3})(IX|IV|V?I{0,3})|[IVXLCDM]+)\b', re.IGNORECASE) def replace_match(match): # Extract the Roman numeral part @@ -668,8 +668,8 @@ def replace_match(match): roman_numeral = match.group(2) # Convert to integer integer_value = romanToInt(roman_numeral) - # Replace with "chapter " - return f"{chapter_word.capitalize()} {integer_value}" + # Replace with 'chapter ' + return f'{chapter_word.capitalize()} {integer_value}' # Replace Roman numerals with their integer equivalents return roman_chapter_pattern.sub(replace_match, text) @@ -684,12 +684,12 @@ def delete_old_web_folders(root_dir): try: if not os.path.exists(root_dir): os.makedirs(root_dir) - print(f"Created missing directory: {root_dir}") + print(f'Created missing directory: {root_dir}') current_time = time.time() age_limit = current_time - gradio_shared_expire * 60 * 60 # 24 hours in seconds for folder_name in os.listdir(root_dir): dir_path = os.path.join(root_dir, folder_name) - if os.path.isdir(dir_path) and folder_name.startswith("web-"): + if os.path.isdir(dir_path) and folder_name.startswith('web-'): folder_creation_time = os.path.getctime(dir_path) if folder_creation_time < age_limit: shutil.rmtree(dir_path) @@ -707,7 +707,7 @@ def convert_ebook(args): try: global cancellation_requested, client, script_mode, audiobooks_dir, tmp_dir if cancellation_requested.is_set(): - msg = "Cancel requested" + msg = 'Cancel requested' raise ValueError() else: error = None @@ -727,8 +727,8 @@ def convert_ebook(args): pass if args.language is not None and args.language in language_mapping.keys(): - ebook["resume"] = False - ebook["id"] = args.session if args.session is not None else str(uuid.uuid4()) + ebook['resume'] = False + ebook['id'] = args.session if args.session is not None else str(uuid.uuid4()) script_mode = args.script_mode if args.script_mode is not None else NATIVE device = args.device.lower() clone_voice_file = args.voice @@ -744,93 +744,93 @@ def convert_ebook(args): custom_model_url = args.custom_model_url if custom_model_file is None else None if not os.path.splitext(args.ebook)[1]: - raise ValueError("The selected ebook file has no extension. Please select a valid file.") + raise ValueError('The selected ebook file has no extension. Please select a valid file.') if script_mode == NATIVE: - bool, e = check_programs("Calibre", "calibre", "--version") + bool, e = check_programs('Calibre', 'calibre', '--version') if not bool: raise DependencyError(e) - bool, e = check_programs("FFmpeg", "ffmpeg", "-version") + bool, e = check_programs('FFmpeg', 'ffmpeg', '-version') if not bool: raise DependencyError(e) elif script_mode == DOCKER_UTILS: client = docker.from_env() - tmp_dir = os.path.join(processes_dir, f"ebook-{ebook["id"]}") - ebook["chapters_dir"] = os.path.join(tmp_dir, "chapters") - ebook["chapters_dir_sentences"] = os.path.join(ebook["chapters_dir"], "sentences") + tmp_dir = os.path.join(processes_dir, f"ebook-{ebook['id']}") + ebook['chapters_dir'] = os.path.join(tmp_dir, 'chapters') + ebook['chapters_dir_sentences'] = os.path.join(ebook['chapters_dir'], 'sentences') if not is_gui_process: audiobooks_dir = audiobooks_cli_dir if prepare_dirs(args.ebook) : - ebook["filename_noext"] = os.path.splitext(os.path.basename(ebook["src"]))[0] - ebook["custom_model"] = None + ebook['filename_noext'] = os.path.splitext(os.path.basename(ebook['src']))[0] + ebook['custom_model'] = None if custom_model_file or custom_model_url: - custom_model_dir = os.path.join(models_dir,"__sessions",f'model-{ebook["id"]}') + custom_model_dir = os.path.join(models_dir,'__sessions',f"model-{ebook['id']}") if os.isdir(custom_model_dir): shutil.rmtree(custom_model_dir) if custom_model_url: - print(f"Get custom model: {custom_model_url}") - ebook["custom_model"] = download_custom_model(custom_model_url, custom_model_dir) + print(f'Get custom model: {custom_model_url}') + ebook['custom_model'] = download_custom_model(custom_model_url, custom_model_dir) else: - ebook["custom_model"] = extract_custom_model(custom_model_file, custom_model_dir) - if not torch.cuda.is_available() or device == "cpu": - if device == "gpu": - print("GPU is not available on your device!") - device = "cpu" + ebook['custom_model'] = extract_custom_model(custom_model_file, custom_model_dir) + if not torch.cuda.is_available() or device == 'cpu': + if device == 'gpu': + print('GPU is not available on your device!') + device = 'cpu' torch.device(device) - print(f"Available Processor Unit: {device}") - ebook["epub_path"] = os.path.join(tmp_dir, '__' + ebook["filename_noext"] + '.epub') - ebook["metadata"] = {} - has_src_metadata = has_metadata(ebook["src"]) + print(f'Available Processor Unit: {device}') + ebook['epub_path'] = os.path.join(tmp_dir, '__' + ebook['filename_noext'] + '.epub') + ebook['metadata'] = {} + has_src_metadata = has_metadata(ebook['src']) if convert_to_epub(): - ebook["epub"] = epub.read_epub(ebook["epub_path"], {'ignore_ncx': True}) + ebook['epub'] = epub.read_epub(ebook['epub_path'], {'ignore_ncx': True}) for field in metadata_fields: - data = ebook["epub"].get_metadata("DC", field) + data = ebook['epub'].get_metadata('DC', field) if data: for value, attributes in data: - if field == "language" and not has_src_metadata: - ebook["metadata"][field] = language + if field == 'language' and not has_src_metadata: + ebook['metadata'][field] = language else: - ebook["metadata"][field] = value + ebook['metadata'][field] = value language_array = languages.get(part3=language) if language_array and language_array.part1: - ebook["metadata"]["language_iso1"] = language_array.part1 - if ebook["metadata"]["language"] == language or ebook["metadata"]["language_iso1"] and ebook["metadata"]["language"] == ebook["metadata"]["language_iso1"]: - ebook["metadata"]["title"] = os.path.splitext(os.path.basename(ebook["src"]))[0] if not ebook["metadata"]["title"] else ebook["metadata"]["title"] - ebook["metadata"]["creator"] = False if not ebook["metadata"]["creator"] else ebook["metadata"]["creator"] - ebook["cover"] = get_cover() - ebook["chapters"] = get_chapters(language) - if ebook["chapters"]: + ebook['metadata']['language_iso1'] = language_array.part1 + if ebook['metadata']['language'] == language or ebook['metadata']['language_iso1'] and ebook['metadata']['language'] == ebook['metadata']['language_iso1']: + ebook['metadata']['title'] = os.path.splitext(os.path.basename(ebook['src']))[0] if not ebook['metadata']['title'] else ebook['metadata']['title'] + ebook['metadata']['creator'] = False if not ebook['metadata']['creator'] else ebook['metadata']['creator'] + ebook['cover'] = get_cover() + ebook['chapters'] = get_chapters(language) + if ebook['chapters']: params = {"device": device, "temperature": temperature, "length_penalty" : length_penalty, "repetition_penalty": repetition_penalty, "top_k" : top_k, "top_p": top_p, "speed": speed, "enable_text_splitting": enable_text_splitting, "clone_voice_file": clone_voice_file, "language": language} if convert_chapters_to_audio(params): final_file = combine_audio_chapters() if final_file is not None: - progress_status = f"Audiobook {os.path.basename(final_file)} created!" - print(f"Temporary directory {tmp_dir} removed successfully.") + progress_status = f'Audiobook {os.path.basename(final_file)} created!' + print(f'Temporary directory {tmp_dir} removed successfully.') return progress_status, final_file else: - error = "combine_audio_chapters() error: final_file not created!" + error = 'combine_audio_chapters() error: final_file not created!' else: - error = "convert_chapters_to_audio() failed!" + error = 'convert_chapters_to_audio() failed!' else: - error = "get_chapters() failed!" + error = 'get_chapters() failed!' else: - error = f'WARNING: Ebook language: {ebook["metadata"]["language"]}, language selected: {language}' + error = f"WARNING: Ebook language: {ebook['metadata']['language']}, language selected: {language}" else: - error = "get_chapters() failed!" + error = 'get_chapters() failed!' else: - error = f"Temporary directory {tmp_dir} not removed due to failure." + error = f'Temporary directory {tmp_dir} not removed due to failure.' else: - error = f"Language {args.language} is not supported." + error = f'Language {args.language} is not supported.' print(error) return error, None except Exception as e: - print(f"Exception: {e}") + print(f'Exception: {e}') return e, None def web_interface(mode, share): @@ -842,7 +842,7 @@ def web_interface(mode, share): audiobook_file = None language_options = [ ( - f'{details["name"]} - {details["native_name"]}' if details["name"] != details["native_name"] else details["name"], + f"{details['name']} - {details['native_name']}" if details['name'] != details['native_name'] else details['name'], lang ) for lang, details in language_mapping.items() @@ -850,16 +850,16 @@ def web_interface(mode, share): default_language_name = next((name for name, key in language_options if key == default_language_code), None) theme = gr.themes.Origin( - primary_hue="amber", - secondary_hue="green", - neutral_hue="gray", - radius_size="lg", + primary_hue='amber', + secondary_hue='green', + neutral_hue='gray', + radius_size='lg', font_mono=['JetBrains Mono', 'monospace', 'Consolas', 'Menlo', 'Liberation Mono'] ) with gr.Blocks(theme=theme) as interface: gr.HTML( - """ + ''' - """ + ''' ) gr.Markdown( - f""" + f''' # Ebook2Audiobook v{version}
https://github.com/DrewThomasson/ebook2audiobook
Convert eBooks into immersive audiobooks with realistic voice TTS models. - """ + ''' ) with gr.Tabs(): - with gr.TabItem("Input Options"): + with gr.TabItem('Input Options'): with gr.Row(): with gr.Column(scale=3): - gr_ebook_file = gr.File(label="eBook File") - gr_device = gr.Radio(label="Processor Unit", choices=["CPU", "GPU"], value="CPU") - gr_language = gr.Dropdown(label="Language", choices=[name for name, _ in language_options], value=default_language_name) + gr_ebook_file = gr.File(label='eBook File') + gr_device = gr.Radio(label='Processor Unit', choices=['CPU', 'GPU'], value='CPU') + gr_language = gr.Dropdown(label='Language', choices=[name for name, _ in language_options], value=default_language_name) with gr.Column(scale=3): with gr.Group(): - gr_clone_voice_file = gr.File(label="Cloning Voice* (a .wav or .mp3 no more than 12sec)", file_types=[".wav", ".mp3"]) - gr_custom_model_file = gr.File(label="Model* (a .zip containing config.json, vocab.json, model.pth)", file_types=[".zip"], visible=True) - gr_custom_model_url = gr.Textbox(placeholder="https://www.example.com/model.zip", label="Model from URL*", visible=True) + gr_clone_voice_file = gr.File(label='Cloning Voice* (a .wav or .mp3 no more than 12sec)', file_types=['.wav', '.mp3']) + gr_custom_model_file = gr.File(label='Model* (a .zip containing config.json, vocab.json, model.pth)', file_types=['.zip'], visible=True) + gr_custom_model_url = gr.Textbox(placeholder='https://www.example.com/model.zip', label='Model from URL*', visible=True) gr.Markdown('

  * Optional

') - with gr.TabItem("Audio Generation Preferences"): + with gr.TabItem('Audio Generation Preferences'): gr.Markdown( - """ + ''' ### Customize Audio Generation Parameters Adjust the settings below to influence how the audio is generated. You can control the creativity, speed, repetition, and more. - """ + ''' ) gr_temperature = gr.Slider( - label="Temperature", + label='Temperature', minimum=0.1, maximum=10.0, step=0.1, value=0.65, - info="Higher values lead to more creative, unpredictable outputs. Lower values make it more monotone." + info='Higher values lead to more creative, unpredictable outputs. Lower values make it more monotone.' ) gr_length_penalty = gr.Slider( - label="Length Penalty", + label='Length Penalty', minimum=0.5, maximum=10.0, step=0.1, value=1.0, - info="Penalize longer sequences. Higher values produce shorter outputs. Not applied to custom models." + info='Penalize longer sequences. Higher values produce shorter outputs. Not applied to custom models.' ) gr_repetition_penalty = gr.Slider( - label="Repetition Penalty", + label='Repetition Penalty', minimum=1.0, maximum=10.0, step=0.1, value=3.0, - info="Penalizes repeated phrases. Higher values reduce repetition." + info='Penalizes repeated phrases. Higher values reduce repetition.' ) gr_top_k = gr.Slider( - label="Top-k Sampling", + label='Top-k Sampling', minimum=10, maximum=100, step=1, value=50, - info="Lower values restrict outputs to more likely words and increase speed at which audio generates." + info='Lower values restrict outputs to more likely words and increase speed at which audio generates.' ) gr_top_p = gr.Slider( - label="Top-p Sampling", + label='Top-p Sampling', minimum=0.1, maximum=1.0, step=.01, value=0.8, - info="Controls cumulative probability for word selection. Lower values make the output more predictable and increase speed at which audio generates." + info='Controls cumulative probability for word selection. Lower values make the output more predictable and increase speed at which audio generates.' ) gr_speed = gr.Slider( - label="Speed", + label='Speed', minimum=0.5, maximum=3.0, step=0.1, value=1.0, - info="Adjusts how fast the narrator will speak." + info='Adjusts how fast the narrator will speak.' ) gr_enable_text_splitting = gr.Checkbox( - label="Enable Text Splitting", + label='Enable Text Splitting', value=True, - info="Splits long texts into sentences to generate audio in chunks. Useful for very long inputs." + info='Splits long texts into sentences to generate audio in chunks. Useful for very long inputs.' ) - gr_session_status = gr.Textbox(label="Session") - gr_session = gr.Textbox(label="Session", visible=False) - gr_conversion_progress = gr.Textbox(label="Progress") - gr_convert_btn = gr.Button("Convert", variant="primary", interactive=False) - gr_audio_player = gr.Audio(label="Listen", type="filepath", show_download_button=False, container=True, visible=False) - gr_audiobooks_ddn = gr.Dropdown(choices=[], label="Audiobooks") - gr_audiobook_link = gr.File(label="Download") + gr_session_status = gr.Textbox(label='Session') + gr_session = gr.Textbox(label='Session', visible=False) + gr_conversion_progress = gr.Textbox(label='Progress') + gr_convert_btn = gr.Button('Convert', variant='primary', interactive=False) + gr_audio_player = gr.Audio(label='Listen', type='filepath', show_download_button=False, container=True, visible=False) + gr_audiobooks_ddn = gr.Dropdown(choices=[], label='Audiobooks') + gr_audiobook_link = gr.File(label='Download') gr_write_data = gr.JSON(visible=False) gr_read_data = gr.JSON(visible=False) gr_data = gr.State({}) gr_modal_html = gr.HTML() def show_modal(message): - return f""" + return f'''