1
0
mirror of https://gist.github.com/6ba37e4d4084e858f917e271550ce5f6.git synced 2024-09-20 00:34:20 +03:00
picsorter/picsorter.py

118 lines
3.8 KiB
Python

import argparse
import logging
import os
import re
import shutil
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
from config import Config
from database import Database
from iqdb import Iqdb
from library import Library
from metadata import Metadata
class PicSorter:
@staticmethod
def parse_args():
parser = argparse.ArgumentParser(
description='Finds an image on danbooru, writes tags as IPTC keywords, than places the image in the library'
)
parser.add_argument('-c', '--config',
type=Path,
default='config.yml',
help='config.yml file path')
parser.add_argument('input', nargs=argparse.REMAINDER)
args = parser.parse_args()
if len(args.input) >= 1:
PicSorter(args.config).process(args.input)
def __init__(self, config_file='config.yml'):
config = Config.load(config_file)
self.config = config
self.__setup_logging(config.dir_logs)
self.library = Library(config.dir_library)
self.metadata = Metadata(config.dir_tmp)
self.db = Database()
@staticmethod
def __setup_logging(dir_logs: Path):
filename = datetime.now().strftime('%Y-%m-%d.log')
logfile = Path(dir_logs, filename)
logging.basicConfig(
filename=os.fspath(logfile),
level=logging.INFO,
format='%(asctime)s %(levelname)s %(module)s: %(message)s',
datefmt='%H:%M:%S',
)
def process(self, inputs: list[str]) -> None:
for input in inputs:
if input.startswith("http") or re.search(r"(\d{3,})", input):
print("Processing url", input)
self.__process_url(input)
else:
p = Path(input)
if p.is_dir():
self.__process_folder(p)
elif p.is_file():
print("Processing file", input)
self.__process_file(input)
def __process_folder(self, dir_input: Path) -> None:
files = {p for p in dir_input.iterdir()
if p.suffix in [".jpg", ".png"]}
for filename in files:
print("Processing", filename)
try:
self.__process_file(filename)
except Exception as ex:
raise ex
time.sleep(5)
def __process_file(self, filename: str) -> bool:
url = self.__search_iqdb(filename)
if url is None:
return False
if self.__process_url(url):
self.config.dir_processed.mkdir(exist_ok=True, parents=True)
from_path = os.fspath(filename)
to_path = os.fspath(self.config.dir_processed)
shutil.move(from_path, to_path)
print("Saved to", to_path)
return True
return False
def __search_iqdb(self, filename: str) -> Optional[str]:
url = Iqdb.search(filename)
if url is None:
logging.warning("%s not found", filename)
self.library.move_to_orphan(Path(filename))
return None
return url
def __process_url(self, url: str) -> bool:
m = re.search(r"(?:posts/)?(\d{3,})", url)
if not m:
return False
post_id = int(m.group(1))
if self.db.is_exists(post_id):
logging.info("Skipping exists post %d", post_id)
return False
meta_result = self.metadata.process("https://danbooru.donmai.us/posts/" + str(post_id))
if meta_result is None:
return False
image_path, tags = meta_result
to_path = self.library.move(image_path, tags)
self.db.add(post_id, tags.tags_string)
print("Saved to", to_path)
return True
if __name__ == '__main__':
PicSorter.parse_args()