1
0
mirror of https://gist.github.com/6ba37e4d4084e858f917e271550ce5f6.git synced 2024-09-19 16:24:21 +03:00
picsorter/picsorter.py
2023-07-29 18:00:12 +03:00

126 lines
4.2 KiB
Python

import argparse
import logging
import os
import re
import shutil
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
from config import Config
from database import Database
from iqdb import Iqdb
from library import Library
from metadata import Metadata
class PicSorter:
@staticmethod
def parse_args():
parser = argparse.ArgumentParser(
description='Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library'
)
parser.add_argument('-c', '--config',
type=Path,
default='config.yml',
help='config.yml file path')
parser.add_argument('input', nargs=argparse.REMAINDER)
args = parser.parse_args()
if len(args.input) >= 1:
PicSorter(args.config).process(args.input)
def __init__(self, config_file='config.yml'):
config = Config.load(config_file)
self.config = config
self.__setup_logging(config.dir_logs)
self.library = Library(config.dir_library)
self.metadata = Metadata(config.dir_tmp)
self.db = Database()
@staticmethod
def __setup_logging(dir_logs: Path):
filename = datetime.now().strftime('%Y-%m-%d.log')
logfile = Path(dir_logs, filename)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(module)s: %(message)s',
datefmt='%H:%M:%S',
handlers=[
logging.FileHandler(os.fspath(logfile))
]
)
def process(self, inputs: list[str]) -> None:
for input in inputs:
if input.startswith("http") or re.search(r"(\d{3,})", input):
print("Processing url", input)
self.__process_url(input)
else:
p = Path(input)
if p.is_dir():
self.__process_folder(p)
elif p.is_file():
print("Processing file", input)
self.__process_file(input)
def __process_folder(self, dir_input: Path) -> None:
files = {p for p in dir_input.iterdir()
if p.suffix in [".jpg", ".png"]}
for filename in files:
print("Processing", filename)
try:
self.__process_file(filename)
except Exception as ex:
raise ex
time.sleep(5)
def __process_file(self, filename: str) -> bool:
url = self.__search_iqdb(filename)
if url is None:
return False
if self.__process_url(url):
self.config.dir_processed.mkdir(exist_ok=True, parents=True)
from_path = os.fspath(filename)
to_path = os.fspath(self.config.dir_processed)
shutil.move(from_path, to_path)
self.__show_path(to_path)
return True
return False
def __search_iqdb(self, filename: str) -> Optional[str]:
url = Iqdb.search(filename)
if url is None:
logging.warning("%s not found", filename)
self.library.move_to_orphan(Path(filename))
return None
return url
def __process_url(self, url: str) -> bool:
m = re.search(r"(https://((?:dan|ai)booru|yande).*?(?:post(?:s|/show)/)?(\d{3,}))", url)
if not m:
return False
provider = m.group(2)
post_id = int(m.group(3))
if provider not in ['danbooru', 'aibooru']:
return False
if self.db.is_exists(provider, post_id):
logging.info("Skipping exists post %s %d", provider, post_id)
return False
meta_result = self.metadata.process(m.group(1))
if meta_result is None:
return False
image_path, tags = meta_result
to_path = self.library.move(image_path, tags)
self.db.add(post_id, provider, tags.tags_string)
self.__show_path(to_path)
return True
def __show_path(self, p: str) -> None:
print("\033[92mSaved to", 'file://' + p.replace(' ', '%20'), "\033[0m")
if __name__ == '__main__':
PicSorter.parse_args()