import argparse import logging import os import re import shutil import time from datetime import datetime from pathlib import Path from typing import Optional from config import Config from database import Database from iqdb import Iqdb from library import Library from metadata import Metadata class PicSorter: @staticmethod def parse_args(): parser = argparse.ArgumentParser( description='Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library' ) parser.add_argument('-c', '--config', type=Path, default='config.yml', help='config.yml file path') parser.add_argument('input', nargs=argparse.REMAINDER) args = parser.parse_args() if len(args.input) >= 1: PicSorter(args.config).process(args.input) def __init__(self, config_file='config.yml'): config = Config.load(config_file) self.config = config self.__setup_logging(config.dir_logs) self.library = Library(config.dir_library) self.metadata = Metadata(config.dir_tmp) self.db = Database() @staticmethod def __setup_logging(dir_logs: Path): filename = datetime.now().strftime('%Y-%m-%d.log') logfile = Path(dir_logs, filename) logging.basicConfig( filename=os.fspath(logfile), level=logging.INFO, format='%(asctime)s %(levelname)s %(module)s: %(message)s', datefmt='%H:%M:%S', ) def process(self, inputs: list[str]) -> None: for input in inputs: if input.startswith("http") or re.search(r"(\d{3,})", input): print("Processing url", input) self.__process_url(input) else: p = Path(input) if p.is_dir(): self.__process_folder(p) elif p.is_file(): print("Processing file", input) self.__process_file(input) def __process_folder(self, dir_input: Path) -> None: files = {p for p in dir_input.iterdir() if p.suffix in [".jpg", ".png"]} for filename in files: print("Processing", filename) try: self.__process_file(filename) except Exception as ex: raise ex time.sleep(5) def __process_file(self, filename: str) -> bool: url = self.__search_iqdb(filename) if url is None: return False if self.__process_url(url): self.config.dir_processed.mkdir(exist_ok=True, parents=True) from_path = os.fspath(filename) to_path = os.fspath(self.config.dir_processed) shutil.move(from_path, to_path) self.__show_path(to_path) return True return False def __search_iqdb(self, filename: str) -> Optional[str]: url = Iqdb.search(filename) if url is None: logging.warning("%s not found", filename) self.library.move_to_orphan(Path(filename)) return None return url def __process_url(self, url: str) -> bool: m = re.search(r"(?:posts/)?(\d{3,})", url) if not m: return False post_id = int(m.group(1)) if self.db.is_exists(post_id): logging.info("Skipping exists post %d", post_id) return False meta_result = self.metadata.process("https://danbooru.donmai.us/posts/" + str(post_id)) if meta_result is None: return False image_path, tags = meta_result to_path = self.library.move(image_path, tags) self.db.add(post_id, tags.tags_string) self.__show_path(to_path) return True def __show_path(self, p: str) -> None: print("Saved to", 'file://' + p.replace(' ', '%20')) if __name__ == '__main__': PicSorter.parse_args()