mirror of
https://gist.github.com/6ba37e4d4084e858f917e271550ce5f6.git
synced 2024-09-20 00:34:20 +03:00
126 lines
4.2 KiB
Python
126 lines
4.2 KiB
Python
import argparse
|
|
import logging
|
|
import os
|
|
import re
|
|
import shutil
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from config import Config
|
|
from database import Database
|
|
from iqdb import Iqdb
|
|
from library import Library
|
|
from metadata import Metadata
|
|
|
|
|
|
class PicSorter:
|
|
@staticmethod
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(
|
|
description='Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library'
|
|
)
|
|
parser.add_argument('-c', '--config',
|
|
type=Path,
|
|
default='config.yml',
|
|
help='config.yml file path')
|
|
parser.add_argument('input', nargs=argparse.REMAINDER)
|
|
args = parser.parse_args()
|
|
if len(args.input) >= 1:
|
|
PicSorter(args.config).process(args.input)
|
|
|
|
def __init__(self, config_file='config.yml'):
|
|
config = Config.load(config_file)
|
|
self.config = config
|
|
self.__setup_logging(config.dir_logs)
|
|
self.library = Library(config.dir_library)
|
|
self.metadata = Metadata(config.dir_tmp)
|
|
self.db = Database()
|
|
|
|
@staticmethod
|
|
def __setup_logging(dir_logs: Path):
|
|
filename = datetime.now().strftime('%Y-%m-%d.log')
|
|
logfile = Path(dir_logs, filename)
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s %(levelname)s %(module)s: %(message)s',
|
|
datefmt='%H:%M:%S',
|
|
handlers=[
|
|
logging.FileHandler(os.fspath(logfile))
|
|
]
|
|
)
|
|
|
|
def process(self, inputs: list[str]) -> None:
|
|
for input in inputs:
|
|
if input.startswith("http") or re.search(r"(\d{3,})", input):
|
|
print("Processing url", input)
|
|
self.__process_url(input)
|
|
else:
|
|
p = Path(input)
|
|
if p.is_dir():
|
|
self.__process_folder(p)
|
|
elif p.is_file():
|
|
print("Processing file", input)
|
|
self.__process_file(input)
|
|
|
|
def __process_folder(self, dir_input: Path) -> None:
|
|
files = {p for p in dir_input.iterdir()
|
|
if p.suffix in [".jpg", ".png"]}
|
|
for filename in files:
|
|
print("Processing", filename)
|
|
try:
|
|
self.__process_file(filename)
|
|
except Exception as ex:
|
|
raise ex
|
|
time.sleep(5)
|
|
|
|
def __process_file(self, filename: str) -> bool:
|
|
url = self.__search_iqdb(filename)
|
|
if url is None:
|
|
return False
|
|
if self.__process_url(url):
|
|
self.config.dir_processed.mkdir(exist_ok=True, parents=True)
|
|
from_path = os.fspath(filename)
|
|
to_path = os.fspath(self.config.dir_processed)
|
|
shutil.move(from_path, to_path)
|
|
self.__show_path(to_path)
|
|
return True
|
|
return False
|
|
|
|
def __search_iqdb(self, filename: str) -> Optional[str]:
|
|
url = Iqdb.search(filename)
|
|
if url is None:
|
|
logging.warning("%s not found", filename)
|
|
self.library.move_to_orphan(Path(filename))
|
|
return None
|
|
return url
|
|
|
|
def __process_url(self, url: str) -> bool:
|
|
m = re.search(r"(https://((?:dan|ai)booru|yande).*?(?:post(?:s|/show)/)?(\d{3,}))", url)
|
|
if not m:
|
|
return False
|
|
provider = m.group(2)
|
|
post_id = int(m.group(3))
|
|
if provider not in ['danbooru', 'aibooru']:
|
|
return False
|
|
if self.db.is_exists(provider, post_id):
|
|
logging.info("Skipping exists post %s %d", provider, post_id)
|
|
return False
|
|
|
|
meta_result = self.metadata.process(m.group(1))
|
|
if meta_result is None:
|
|
return False
|
|
image_path, tags = meta_result
|
|
to_path = self.library.move(image_path, tags)
|
|
self.db.add(post_id, provider, tags.tags_string)
|
|
self.__show_path(to_path)
|
|
return True
|
|
|
|
def __show_path(self, p: str) -> None:
|
|
print("\033[92mSaved to", 'file://' + p.replace(' ', '%20'), "\033[0m")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
PicSorter.parse_args()
|