1
0
mirror of https://gist.github.com/6ba37e4d4084e858f917e271550ce5f6.git synced 2024-09-20 00:34:20 +03:00

Initial commit

This commit is contained in:
Victor 2021-04-14 23:23:56 +03:00
commit 40aac6203b
8 changed files with 312 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
__pycache__
input
library
logs
images.db

38
database.py Normal file
View File

@ -0,0 +1,38 @@
from datetime import datetime
import sqlite3
import logging
class Database:
def __init__(self):
self.db_name = 'images.db'
self.__create_tables()
def __create_tables(self):
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS danbooru (
id INTEGER PRIMARY KEY NOT NULL UNIQUE,
tags TEXT NOT NULL,
created_at TIMESTAMP
);
""")
conn.commit()
conn.close()
def is_exists(self, id) -> bool:
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
c.execute("SELECT EXISTS(SELECT 1 FROM danbooru WHERE id=?)", (id, ))
result = c.fetchone()[0]
conn.close()
return bool(result)
def add(self, id, tags):
conn = sqlite3.connect(self.db_name)
c = conn.cursor()
sql = 'INSERT INTO danbooru(id, tags, created_at) VALUES (?,?,?)'
c.execute(sql, (id, tags, datetime.now()))
conn.commit()
conn.close()

17
iqdb.py Normal file
View File

@ -0,0 +1,17 @@
from bs4 import BeautifulSoup
import logging
import requests
class Iqdb:
def search(self, file):
logging.info('Searching %s', file)
files = {'file': open(file, 'rb')}
resp = requests.post('https://iqdb.org/', files=files, timeout=10)
doc = BeautifulSoup(resp.text, 'html.parser')
for tag in doc.select(".image a"):
url = tag.get("href")
if "danbooru.donmai.us/posts" in url:
if url.startswith("//"):
url = "https:" + url
return url
return None

54
library.py Normal file
View File

@ -0,0 +1,54 @@
from tags import Tags
from pathlib import Path
import shutil
import logging
import os
class Library:
def __init__(self, dir_root: Path):
self.dir_root = dir_root
self.dir_orphan = Path(dir_root, 'orphan')
self.dir_orphan.mkdir(exist_ok=True, parents=True)
def move_to_orphan(self, p: Path):
logging.info("%s move to orphan", p)
shutil.move(os.fspath(p), os.fspath(self.dir_orphan))
def move(self, p: Path, tags: Tags):
new_path = self.__compute_path(tags)
new_path.mkdir(exist_ok=True, parents=True)
logging.info("%s move to %s", p.name, new_path)
shutil.move(os.fspath(p), os.fspath(new_path))
def __compute_path(self, tags: Tags) -> Path:
p = self.dir_root
if tags.copyrights == 'original':
# Originals groups by artist
p = p / "_originals"
if tags.artists != "":
artist = tags.artists.split(" ")[0]
p = p / self.__sanitize(artist)
return p
# Main section
copyright = ""
if tags.copyrights != "":
copyright = tags.copyrights.split(" ")[0]
p = p / self.__sanitize(copyright)
if tags.characters == "":
return p
# Characters section
characters = tags.characters.split(" ")
if len(characters) == 1:
character = characters[0] \
.replace(copyright, "") \
.replace("("+copyright+")", "") \
.replace("()", "") \
.strip()
p = p / self.__sanitize(character)
else:
p = p / "_multiple"
return p
def __sanitize(self, s: str) -> str:
s = "".join(x for x in s if x.isalnum() or x in "._-()")
return s.replace("_", " ").strip()

98
metadata.py Normal file
View File

@ -0,0 +1,98 @@
from typing import Optional, Union
from tags import Tags
from pathlib import Path
from datetime import datetime
import fluentpy as _
import logging
import json
import re
import requests
import subprocess
class Metadata:
def __init__(self, dir_tmp: Path):
self.dir_tmp = dir_tmp
self.tmp_image_file = Path(self.dir_tmp, "tmp.jpg")
def process(self, url: str) -> Optional[Union[Path, Tags]]:
logging.info("Retrieving metadata for %s", url)
meta = self.__get_metadata(url)
status = self.__download_file(meta)
if not status:
logging.warn("Download failed")
return None
return self.__write_tags(url, meta)
def __get_metadata(self, url: str) -> dict:
return requests.get(url + ".json").json()
# with open('test/test.json', 'rt', encoding='utf8') as f:
# r = json.load(f)
# return r
def __download_file(self, r: dict) -> bool:
ext = r.get("file_ext", "")
w = int(r.get("image_width", "0"))
h = int(r.get("image_height", "0"))
if (ext not in ["jpg", "jpeg", "png", "webp"]) or w == 0 or h == 0:
return False
file_url = r.get("file_url")
file_size_kb = int(r.get('file_size', "0")) / 1024
logging.info("Downloading image")
recompress = self.__need_recompress(ext, w, h, file_size_kb)
return self.__download(file_url, recompress=recompress)
def __need_recompress(self, ext, w, h, size_kb):
return ext == 'jpg' and size_kb > 1400 and w < 2500 and h < 2500
def __download(self, img_url: str, recompress: bool = False):
opt_args = []
if recompress:
opt_args = ['-quality', 80]
ret = subprocess.call([
'magick', img_url,
'-resize', '2500x2500>',
*opt_args, self.tmp_image_file
], stdout=subprocess.PIPE)
return ret == 0
def __write_tags(self, url: str, r: dict) -> tuple:
tag_general = r.get('tag_string_general', "")
tag_copyrights = r.get('tag_string_copyright', "")
tag_characters = r.get('tag_string_character', "")
tag_artists = r.get('tag_string_artist', "")
tags = Tags(tag_general, tag_copyrights, tag_characters, tag_artists)
tags_file = Path(self.dir_tmp, "tags.txt")
with open(tags_file, "w") as f:
content = _(tags.tags) \
.map(lambda s: "-IPTC:keywords=" + s) \
.join("\n") \
._
content += "\n-Exif:ImageDescription=" + url
content += "\n-Iptc:Caption-Abstract=" + url
content += "\n-Xmp:Description=" + url
f.write(content)
logging.info("Writing tags")
subprocess.call([
'exiftool', '-q', '-overwrite_original',
'-@', tags_file,
self.tmp_image_file
], stdout=subprocess.PIPE)
filename = self.__format_filename(tags)
result_file = Path(self.tmp_image_file.parent, filename)
self.tmp_image_file.rename(result_file)
return result_file, tags
def __format_filename(self, tags: Tags):
filename = '{} {} by {} at {}.jpg'.format(
tags.copyrights.split(" ")[0] or "",
", ".join(tags.characters.split(" ")[:2]),
tags.artists.split(" ")[0] or "",
datetime.now().strftime('%Y%m%d_%H%M%S')
)
return re.sub(r'\s+', ' ', filename).strip()

68
picsorter.py Normal file
View File

@ -0,0 +1,68 @@
from iqdb import Iqdb
from library import Library
from metadata import Metadata
from database import Database
from datetime import datetime
from pathlib import Path
import re
import logging
import time
class PicSorter:
def __init__(self):
self.dir_tmp = Path('R:/')
self.dir_input = Path('./input')
self.dir_logs = Path('./logs')
self.dir_library = Path('./library')
self.setup_folders()
self.setup_logging()
def setup_folders(self):
self.dir_tmp.mkdir(exist_ok=True)
self.dir_logs.mkdir(exist_ok=True)
self.dir_library.mkdir(exist_ok=True)
def setup_logging(self):
logfile = Path(self.dir_logs, datetime.now().strftime('%Y-%m-%d.log'))
logging.basicConfig(
filename=logfile,
level=logging.INFO,
format='%(asctime)s %(levelname)s %(module)s: %(message)s',
datefmt='%H:%M:%S',
)
def process_folder(self):
iqdb = Iqdb()
library = Library(self.dir_library)
metadata = Metadata(self.dir_tmp)
db = Database()
for filename in self.dir_input.rglob('*.jpg'):
print("Process ", filename)
try:
url = iqdb.search(filename)
if url is None:
logging.warn("%s not found", filename)
library.move_to_orphan(Path(filename))
continue
m = re.search(r".*posts\/(\d{3,})", url)
if not m:
continue
post_id = int(m.group(1))
if db.is_exists(post_id):
logging.info("Skipping exists post %d", post_id)
continue
meta_result = metadata.process(url)
if meta_result is None:
continue
image_path, tags = meta_result
library.move(image_path, tags)
db.add(post_id, tags.tags_string)
time.sleep(5)
except Exception as ex:
raise ex
if __name__ == '__main__':
PicSorter().process_folder()

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
fluentpy==2.0
PyYAML==5.4.1
requests==2.24.0

29
tags.py Normal file
View File

@ -0,0 +1,29 @@
import fluentpy as _
from dataclasses import dataclass, field
@dataclass
class Tags:
general: str
copyrights: str
characters: str
artists: str
tags: list = field(init=False)
tags_string: str = field(init=False)
def __post_init__(self):
self.tags = self.__union_tags()
self.tags_string = " ".join(self.tags)
def __union_tags(self):
tags = self.general.split(" ")
tags += self.__prefix_tags(self.copyrights, 'copyright_')
tags += self.__prefix_tags(self.characters, 'character_')
tags += self.__prefix_tags(self.artists, 'artist_')
return tags
def __prefix_tags(self, tags, prefix):
return _(tags) \
.split(" ") \
.filter(lambda s: s != "") \
.map(lambda s: prefix + s.strip()) \
._