diff --git a/komga_cover_extractor.py b/komga_cover_extractor.py index 5bfc128..61b1df6 100644 --- a/komga_cover_extractor.py +++ b/komga_cover_extractor.py @@ -43,7 +43,7 @@ from settings import * # Version of the script -script_version = (2, 4, 14) +script_version = (2, 4, 15) script_version_text = "v{}.{}.{}".format(*script_version) # Paths = existing library @@ -187,6 +187,14 @@ # Type of file formats for manga and novels file_formats = ["chapter", "volume"] +# stores our folder path modification times +# used for skipping folders that haven't been modified +# when running extract_covers() with watchdog enabled +root_modification_times = {} + +# Stores all the new series paths for series that were added to an existing library +moved_folders = [] + class LibraryType: def __init__(self, name, extensions, must_contain, must_not_contain): @@ -429,6 +437,9 @@ def __str__(self): # Used for sending scan reqeusts after files have been moved over. komga_libraries = [] +# Will move new series that couldn't be matched to the library to the appropriate library. +# requires: '--watchdog "True"' and check_for_existing_series_toggle = True +move_new_series_to_library_toggle = False # Folder Class class Folder: @@ -583,12 +594,14 @@ def run(self): try: while True: time.sleep(sleep_timer) - except KeyboardInterrupt: + except Exception as e: + print(f"ERROR in Watcher.run(): {e}") for observer in self.observers: observer.stop() print("Observer Stopped") for observer in self.observers: observer.join() + print("Observer Joined") # Handles our embed object along with any associated file @@ -990,7 +1003,7 @@ def process_path(path, paths_with_types, paths, is_download_folders=False): COMMON_EXTENSION_THRESHOLD = 0.3 # 30% def process_auto_classification(): - nonlocal path_formats, path_extensions + nonlocal path_formats, path_extensions, path_library_types CHAPTER_THRESHOLD = 0.9 # 90% VOLUME_THRESHOLD = 0.9 # 90% @@ -1084,7 +1097,6 @@ def process_auto_classification(): print("\t\t\t\t\t- path types: " + str(path_formats)) - # Gets the common extensions from a list of extensions def get_common_extensions(all_extensions): nonlocal COMMON_EXTENSION_THRESHOLD @@ -2109,8 +2121,7 @@ def move_images( file, folder_name, group=False, - highest_num=None, - highest_part="", + highest_index_num="", is_chapter_dir=False, ): for extension in image_extensions: @@ -2140,16 +2151,15 @@ def move_images( shutil.move(cover_image_file_path, folder_name) elif ( use_latest_volume_cover_as_series_cover - and highest_num != None + and highest_index_num != "" and file.file_type == "volume" and ( - file.volume_number == highest_num + file.index_number == highest_index_num or ( - isinstance(file.volume_number, list) - and highest_num in file.volume_number + isinstance(file.index_number, list) + and highest_index_num in file.index_number ) ) - and file.volume_part == highest_part ): # get the cover image in the folder existing_cover_image_file_path = [ @@ -3307,8 +3317,7 @@ def move_file( new_location, silent=False, group=False, - highest_num=None, - highest_part="", + highest_index_num="", is_chapter_dir=False, ): try: @@ -3345,8 +3354,7 @@ def move_file( file, new_location, group=group, - highest_num=highest_num, - highest_part=highest_part, + highest_index_num=highest_index_num, is_chapter_dir=is_chapter_dir, ) return True @@ -3363,8 +3371,9 @@ def move_file( send_message(str(e), error=True) return False + # Replaces an old file. -def replace_file(old_file, new_file, group=False, highest_num=None, highest_part=""): +def replace_file(old_file, new_file, group=False, highest_index_num=""): result = False try: if os.path.isfile(old_file.path) and os.path.isfile(new_file.path): @@ -3374,8 +3383,7 @@ def replace_file(old_file, new_file, group=False, highest_num=None, highest_part new_file, old_file.root, silent=True, - highest_num=highest_num, - highest_part=highest_part, + highest_index_num=highest_index_num, ) if os.path.isfile(os.path.join(old_file.root, new_file.name)): result = True @@ -3477,9 +3485,26 @@ def remove_duplicate_releases_from_download( is_chapter_dir = ( chapter_percentage_download_folder >= required_matching_percentage ) - highest_num, highest_part = get_highest_release( - downloaded_releases + original_releases, - is_chapter_directory=is_chapter_dir, + highest_index_num = ( + get_highest_release( + tuple( + [ + item.index_number + if not isinstance(item.index_number, list) + else tuple(item.index_number) + for item in downloaded_releases + ] + + [ + item.index_number + if not isinstance(item.index_number, list) + else tuple(item.index_number) + for item in original_releases + ] + ), + is_chapter_directory=is_chapter_dir, + ) + if not is_chapter_dir + else "" ) for original in original_releases[:]: if not os.path.isfile(download.path): @@ -3670,8 +3695,7 @@ def remove_duplicate_releases_from_download( original, download, group=group, - highest_num=highest_num, - highest_part=highest_part, + highest_index_num=highest_index_num, ) if replace_file_status: # append the new path to moved_files @@ -4872,16 +4896,32 @@ def check_upgrade( is_chapter_dir = ( chapter_percentage_existing_folder ) >= required_matching_percentage - highest_num_and_part = get_highest_release( - clean_existing + download_dir_volumes, - is_chapter_directory=is_chapter_dir, + highest_index_num = ( + get_highest_release( + tuple( + [ + item.index_number + if not isinstance(item.index_number, list) + else tuple(item.index_number) + for item in clean_existing + ] + + [ + item.index_number + if not isinstance(item.index_number, list) + else tuple(item.index_number) + for item in download_dir_volumes + ] + ), + is_chapter_directory=is_chapter_dir, + ) + if not is_chapter_dir + else "" ) move_status = move_file( volume, existing_dir, group=group, - highest_num=highest_num_and_part[0], - highest_part=highest_num_and_part[1], + highest_index_num=highest_index_num, is_chapter_dir=is_chapter_dir, ) if move_status: @@ -5425,6 +5465,7 @@ def get_internal_file_hash(zip_file, file_name): # regex out underscore from passed string and return it +@lru_cache(maxsize=None) def replace_underscore_in_name(name): # Replace underscores that are preceded and followed by a number with a period name = re.sub(r"(?<=\d)_(?=\d)", ".", name) @@ -7970,6 +8011,7 @@ def rename_files_in_download_folders( replacement = re.sub( r"([A-Za-z])(\:)", r"\1 -", replacement ) + # remove dual spaces from dir_clean replacement = remove_dual_space(replacement) processed_files.append(replacement) @@ -8428,46 +8470,30 @@ def is_blank_image(image_data): # Returns the highest volume number and volume part number of a release in a list of volume releases +@lru_cache(maxsize=None) def get_highest_release(releases, is_chapter_directory=False): - highest_volume_number = None - highest_volume_part_number = "" + highest_index_number = "" if use_latest_volume_cover_as_series_cover and not is_chapter_directory: - for item in releases: - if not item.file_type == "volume": - continue - - if item.volume_number == "": - continue - - part = None - - if hasattr(item, "volume_part"): - part = item.volume_part - else: - part = get_file_part(item.name) + contains_empty_or_tuple_index_number = any( + isinstance(item, (tuple, list)) or item == "" for item in releases + ) + if contains_empty_or_tuple_index_number: + for item in releases: + if item == "" or item == [] or item == None: + continue - number = item.volume_number - if isinstance(number, (int, float)): - if highest_volume_number is None or number > highest_volume_number: - highest_volume_number = number - highest_volume_part_number = part - elif ( - number == highest_volume_number - and part > highest_volume_part_number - ): - highest_volume_part_number = part - elif isinstance(number, list): - if highest_volume_number is None or max(number) > highest_volume_number: - highest_volume_number = max(number) - highest_volume_part_number = part - elif ( - max(number) == highest_volume_number - and part > highest_volume_part_number - ): - highest_volume_part_number = part + number = item + if isinstance(number, (int, float)): + if highest_index_number == "" or number > highest_index_number: + highest_index_number = number + elif isinstance(number, (tuple, list)): + if highest_index_number == "" or max(number) > highest_index_number: + highest_index_number = max(number) + else: + highest_index_number = max(releases) - return highest_volume_number, highest_volume_part_number + return highest_index_number # Series covers that have been checked and can be skipped. @@ -8488,6 +8514,7 @@ def print_execution_time(start_time, function_name): # Extracts the covers out from our manga and novel files. def extract_covers(paths_to_process=paths): global checked_series + global root_modification_times print("\nLooking for covers to extract...") # Only volume defined paths in the paths_with_types list @@ -8495,6 +8522,16 @@ def extract_covers(paths_to_process=paths): # volume library to a chapter library volume_paths = [] + # contains cleaned basenames of folders that have been moved + moved_folder_names = ( + [ + remove_punctuation(os.path.basename(x).lower(), disable_lang=True).strip() + for x in moved_folders + ] + if moved_folders and copy_existing_volume_covers_toggle + else [] + ) + # Iterate over each path for path in paths_to_process: # Check if the path exists @@ -8508,6 +8545,25 @@ def extract_covers(paths_to_process=paths): # Traverse the directory tree rooted at the path for root, dirs, files in scandir.walk(path): + if watchdog_toggle: + if not moved_folder_names or ( + remove_punctuation( + os.path.basename(root).lower(), disable_lang=True + ).strip() + not in moved_folder_names + ): + root_mod_time = get_modification_date(root) + if root in root_modification_times: + # Modification time hasn't changed; continue to the next iteration + if root_modification_times[root] == root_mod_time: + continue + else: + # update the modification time for the root + root_modification_times[root] = root_mod_time + else: + # Store the modification time for the root + root_modification_times[root] = root_mod_time + clean = None # Clean and sort the files and directories @@ -8607,19 +8663,24 @@ def extract_covers(paths_to_process=paths): ) # Get the highest volume number and part number - highest_volume_number, highest_volume_part_number = ( - get_highest_release(folder_accessor.files, is_chapter_directory) + highest_index_number = ( + get_highest_release( + tuple( + [ + item.index_number + if not isinstance(item.index_number, list) + else tuple(item.index_number) + for item in folder_accessor.files + ] + ), + is_chapter_directory=is_chapter_directory, + ) if not is_chapter_directory - else (None, "") + else "" ) - if highest_volume_number: - print(f"\n\t\tHighest Volume Number: {highest_volume_number}") - - if highest_volume_part_number: - print( - f"\t\tHighest Volume Part Number: {highest_volume_part_number}" - ) + if highest_index_number: + print(f"\n\t\tHighest Index Number: {highest_index_number}") # Check if it contains multiple volume ones contains_multiple_volume_ones = False @@ -8628,14 +8689,14 @@ def extract_covers(paths_to_process=paths): for file in folder_accessor.files: if not file.is_one_shot and not file.volume_part: - if file.volume_number == 1: + if file.index_number == 1: volume_ones_counter += 1 if volume_ones_counter > 1: contains_multiple_volume_ones = True break elif ( - isinstance(file.volume_number, list) - and 1 in file.volume_number + isinstance(file.index_number, list) + and 1 in file.index_number ): volume_ones_counter += 1 if volume_ones_counter > 1: @@ -8647,8 +8708,7 @@ def extract_covers(paths_to_process=paths): process_cover_extraction( file, contains_multiple_volume_ones, - highest_volume_number, - highest_volume_part_number, + highest_index_number, is_chapter_directory, series_cover_path, volume_paths, @@ -8696,15 +8756,14 @@ def convert_webp_to_jpg(webp_file_path): # Retrieves the modification date of the passed file path. -def get_modification_date(file_path): - return os.path.getmtime(file_path) +def get_modification_date(path): + return os.path.getmtime(path) def process_cover_extraction( file, contains_multiple_volume_ones, - highest_volume_number, - highest_volume_part_number, + highest_index_number, is_chapter_directory, series_cover_path, volume_paths, @@ -8740,35 +8799,34 @@ def process_cover_extraction( print("\t\tFile does not have a cover.") result = find_and_extract_cover(file) - if result and result.endswith(".webp"): - # Cover image is a .webp file, attempt to convert to .jpg - print("\t\tCover is a .webp file. Converting to .jpg...") - conversion_result = convert_webp_to_jpg(result) - - if conversion_result: - # Cover image successfully converted to .jpg - print("\t\tCover successfully converted to .jpg") - result = conversion_result - else: - # Cover conversion failed, clean up the webp file - print("\t\tCover conversion failed.") - print("\t\tCleaning up webp file...") - remove_file(result, silent=True) - - # Verify that the webp file was deleted - if not os.path.isfile(result): - print("\t\tWebp file successfully deleted.") + if result: + if result.endswith(".webp"): + # Cover image is a .webp file, attempt to convert to .jpg + print("\t\tCover is a .webp file. Converting to .jpg...") + conversion_result = convert_webp_to_jpg(result) + + if conversion_result: + # Cover image successfully converted to .jpg + print("\t\tCover successfully converted to .jpg") + result = conversion_result else: - print("\t\tWebp file could not be deleted.") - - result = None + # Cover conversion failed, clean up the webp file + print("\t\tCover conversion failed.") + print("\t\tCleaning up webp file...") + remove_file(result, silent=True) + + # Verify that the webp file was deleted + if not os.path.isfile(result): + print("\t\tWebp file successfully deleted.") + else: + print("\t\tWebp file could not be deleted.") - # Cover image successfully extracted - if result: - print("\t\tCover successfully extracted.\n") - has_cover = True - cover = result - image_count += 1 + result = None + else: + print("\t\tCover successfully extracted.\n") + has_cover = True + cover = result + image_count += 1 else: print("\t\tCover not found.") @@ -8778,26 +8836,25 @@ def process_cover_extraction( and cover and series_cover_path and not contains_multiple_volume_ones - and file.volume_part == highest_volume_part_number and ( ( use_latest_volume_cover_as_series_cover - and highest_volume_number + and highest_index_number != "" and ( - file.volume_number == highest_volume_number + file.index_number == highest_index_number or ( - isinstance(file.volume_number, list) - and highest_volume_number in file.volume_number + isinstance(file.index_number, list) + and highest_index_number in file.index_number ) ) ) or ( not use_latest_volume_cover_as_series_cover and ( - file.volume_number == 1 + file.index_number == 1 or ( - isinstance(file.volume_number, list) - and 1 in file.volume_number + isinstance(file.index_number, list) + and 1 in file.index_number ) ) ) @@ -8838,9 +8895,9 @@ def process_cover_extraction( # Check the volume libaries for a matching series so the series cover can be copied # to the chapter library series folder if ( - volume_paths + file.root not in checked_series + and volume_paths and is_chapter_directory - and file.root not in checked_series and clean_basename ): for v_path in volume_paths: @@ -8851,21 +8908,38 @@ def process_cover_extraction( # Get the first letter of file.basename # and remove any that don't start with it - first_letter = re.search(r"^[A-Za-z]+", file.basename) + first_letter = re.search(r"^[A-Za-z]+", clean_basename) if first_letter: first_letter = first_letter.group(0) + # Filter filtered_series to only include folders that start with first_set_of_letters filtered_series = [ x for x in filtered_series - if re.search(r"^" + first_letter, x, re.IGNORECASE) + if re.search( + r"^" + first_letter, + replace_underscore_in_name( + remove_punctuation( + remove_bracketed_info_from_name(x), + disable_lang=True, + ) + ) + .lower() + .strip(), + re.IGNORECASE, + ) ] + else: + pass for folder in filtered_series: folder_path = os.path.join(v_path.path, folder) clean_folder = ( replace_underscore_in_name( - remove_punctuation(remove_bracketed_info_from_name(folder)) + remove_punctuation( + remove_bracketed_info_from_name(folder), + disable_lang=True, + ) ) .lower() .strip() @@ -8894,18 +8968,23 @@ def process_cover_extraction( ] if volume_one and len(volume_one) == 1: volume_one = volume_one[0] + # find the image cover - cover_path = None - for extension in image_extensions: - if os.path.isfile( + cover_path = next( + ( volume_one.extensionless_path + extension - ): - cover_path = ( + for extension in image_extensions + if os.path.isfile( volume_one.extensionless_path + extension ) - break - volume_one_modification_date = get_modification_date( - cover_path + ), + None, + ) + + volume_one_modification_date = ( + get_modification_date(cover_path) + if cover_path + else None ) if series_cover_path: cover_modification_date = ( @@ -8916,10 +8995,14 @@ def process_cover_extraction( if ( cover_modification_date and volume_one_modification_date - ) and ( - cover_modification_date - != volume_one_modification_date ): + if ( + cover_modification_date + == volume_one_modification_date + ): + checked_series.append(file.root) + return + cover_hash = ( get_file_hash(series_cover_path) if series_cover_path @@ -8957,9 +9040,12 @@ def process_cover_extraction( volume_one_modification_date, ) if not series_cover_path and cover_path: + cover_path_extension = get_file_extension( + cover_path + ) # copy the file to the series cover folder series_cover_path = os.path.join( - file.root, "cover" + extension + file.root, "cover" + cover_path_extension ) shutil.copy( cover_path, @@ -8990,7 +9076,7 @@ def process_cover_extraction( and cover and ( ( - file.volume_number == 1 + file.index_number == 1 and ( not use_latest_volume_cover_as_series_cover or is_chapter_directory @@ -9000,15 +9086,14 @@ def process_cover_extraction( file.file_type == "volume" and not is_chapter_directory and use_latest_volume_cover_as_series_cover - and highest_volume_number + and highest_index_number != "" and ( - file.volume_number == highest_volume_number + file.index_number == highest_index_number or ( - isinstance(file.volume_number, list) - and highest_volume_number in file.volume_number + isinstance(file.index_number, list) + and highest_index_number in file.index_number ) ) - and file.volume_part == highest_volume_part_number ) ) ): @@ -11510,6 +11595,7 @@ def correct_file_extensions(group=False): if group and grouped_notifications and not group_discord_notifications_until_max: send_discord_message(None, grouped_notifications) + # Optional features below, use at your own risk. # Activate them in settings.py def main(): @@ -11645,14 +11731,43 @@ def main(): # Extract the covers from the files in the download folders if extract_covers_toggle and paths and download_folder_in_paths: extract_covers() + print_stats() # Match the files in the download folders to the files in the library if check_for_existing_series_toggle and download_folders and paths: check_for_existing_series(group=True) + if watchdog_toggle: + if transferred_files: + # remove any deleted/renamed/moved files + transferred_files = [x for x in transferred_files if os.path.isfile(x)] + + # remove any deleted/renamed/moved directories + if transferred_dirs: + transferred_dirs = [x for x in transferred_dirs if os.path.isdir(x.root)] + # Extract the covers from the files in the library if extract_covers_toggle and paths and not download_folder_in_paths: - extract_covers() + if (watchdog_toggle and moved_files) or not watchdog_toggle: + if watchdog_toggle and not copy_existing_volume_covers_toggle: + paths_to_trigger = [] + for path in paths: + if moved_files: + if ( + any( + moved_file.startswith(path) + for moved_file in moved_files + ) + and path not in paths_to_trigger + ): + paths_to_trigger.append(path) + continue + + if paths_to_trigger: + extract_covers(paths_to_process=paths_to_trigger) + else: + extract_covers() + print_stats() # Check for missing volumes in the library (local solution) if check_for_missing_volumes_toggle and paths: @@ -11662,19 +11777,6 @@ def main(): if bookwalker_check and not watchdog_toggle: check_for_new_volumes_on_bookwalker() - # Print the number of files encountered, and their extensions - if extract_covers_toggle and paths: - print_stats() - - if watchdog_toggle: - if transferred_files: - # remove any deleted/renamed/moved files - transferred_files = [x for x in transferred_files if os.path.isfile(x)] - - # remove any deleted/renamed/moved directories - if transferred_dirs: - transferred_dirs = [x for x in transferred_dirs if os.path.isdir(x.root)] - # Sends a scan request to Komga for each library that had a file moved into it. if ( send_scan_request_to_komga_libraries_toggle @@ -11714,7 +11816,9 @@ def main(): parse_my_args() # parses the user's arguments if watchdog_toggle and download_folders: print("\nWatchdog is enabled, watching for changes...") - watch = Watcher() - watch.run() + while True: + print("\nWatchdog is enabled, watching for changes...") + watch = Watcher() + watch.run() else: main()