mirror of
https://gist.github.com/6ba37e4d4084e858f917e271550ce5f6.git
synced 2024-09-20 00:34:20 +03:00
Fix code style and deprecations
This commit is contained in:
parent
ee9bab992d
commit
00834d0e23
27
config.py
Normal file
27
config.py
Normal file
@ -0,0 +1,27 @@
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
class Config:
|
||||
@staticmethod
|
||||
def load(path = 'config.yml'):
|
||||
with open(path, 'rt', encoding='utf8') as f:
|
||||
config = yaml.load(f.read(), Loader=yaml.FullLoader)
|
||||
return Config(config)
|
||||
|
||||
def __init__(self, config):
|
||||
dirs = config.get('dirs', {})
|
||||
self.dir_tmp = Path(dirs.get('tmp', '/tmp/'))
|
||||
self.dir_input = Path(dirs.get('input', './input'))
|
||||
self.dir_processed = Path(dirs.get('processed', './input/processed'))
|
||||
self.dir_logs = Path(dirs.get('logs', './logs'))
|
||||
self.dir_library = Path(dirs.get('library', './library'))
|
||||
self.__setup_folders()
|
||||
|
||||
def __setup_folders(self):
|
||||
self.dir_tmp.mkdir(exist_ok=True)
|
||||
self.dir_processed.mkdir(exist_ok=True)
|
||||
self.dir_logs.mkdir(exist_ok=True)
|
||||
self.dir_library.mkdir(exist_ok=True)
|
||||
|
7
config.yml
Normal file
7
config.yml
Normal file
@ -0,0 +1,7 @@
|
||||
dirs:
|
||||
tmp: /tmp/
|
||||
input: ./input
|
||||
processed: ./input/processed
|
||||
logs: ./logs
|
||||
library: ./library
|
||||
|
13
database.py
13
database.py
@ -1,6 +1,6 @@
|
||||
from datetime import datetime
|
||||
import sqlite3
|
||||
import logging
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class Database:
|
||||
def __init__(self):
|
||||
@ -20,19 +20,18 @@ class Database:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def is_exists(self, id) -> bool:
|
||||
def is_exists(self, _id) -> bool:
|
||||
conn = sqlite3.connect(self.db_name)
|
||||
c = conn.cursor()
|
||||
c.execute("SELECT EXISTS(SELECT 1 FROM danbooru WHERE id=?)", (id, ))
|
||||
c.execute("SELECT EXISTS(SELECT 1 FROM danbooru WHERE id=?)", (_id, ))
|
||||
result = c.fetchone()[0]
|
||||
conn.close()
|
||||
return bool(result)
|
||||
|
||||
def add(self, id, tags):
|
||||
def add(self, _id, tags):
|
||||
conn = sqlite3.connect(self.db_name)
|
||||
c = conn.cursor()
|
||||
sql = 'INSERT INTO danbooru(id, tags, created_at) VALUES (?,?,?)'
|
||||
c.execute(sql, (id, tags, datetime.now()))
|
||||
c.execute(sql, (_id, tags, datetime.now()))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
8
iqdb.py
8
iqdb.py
@ -1,9 +1,13 @@
|
||||
from bs4 import BeautifulSoup
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
|
||||
class Iqdb:
|
||||
def search(self, file):
|
||||
@staticmethod
|
||||
def search(file: str) -> Optional[str]:
|
||||
logging.info('Searching %s', file)
|
||||
files = {'file': open(file, 'rb')}
|
||||
resp = requests.post('https://iqdb.org/', files=files, timeout=10)
|
||||
|
20
library.py
20
library.py
@ -1,8 +1,10 @@
|
||||
from tags import Tags
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from tags import Tags
|
||||
|
||||
|
||||
class Library:
|
||||
def __init__(self, dir_root: Path):
|
||||
@ -10,11 +12,11 @@ class Library:
|
||||
self.dir_orphan = Path(dir_root, '_orphan')
|
||||
self.dir_orphan.mkdir(exist_ok=True, parents=True)
|
||||
|
||||
def move_to_orphan(self, p: Path):
|
||||
def move_to_orphan(self, p: Path) -> None:
|
||||
logging.info("%s move to orphan", p)
|
||||
shutil.move(os.fspath(p), os.fspath(self.dir_orphan))
|
||||
|
||||
def move(self, p: Path, tags: Tags):
|
||||
def move(self, p: Path, tags: Tags) -> None:
|
||||
new_path = self.__compute_path(tags)
|
||||
new_path.mkdir(exist_ok=True, parents=True)
|
||||
logging.info("%s move to %s", p.name, new_path)
|
||||
@ -30,10 +32,9 @@ class Library:
|
||||
p = p / self.__sanitize(artist)
|
||||
return p
|
||||
# Main section
|
||||
copyright = ""
|
||||
if tags.copyrights != "":
|
||||
copyright = tags.copyrights.split(" ")[0]
|
||||
p = p / self.__sanitize(copyright)
|
||||
_copyright = tags.copyrights.split(" ")[0]
|
||||
p = p / self.__sanitize(_copyright)
|
||||
if tags.characters == "":
|
||||
return p
|
||||
# Characters section
|
||||
@ -44,6 +45,7 @@ class Library:
|
||||
p = p / "_multiple"
|
||||
return p
|
||||
|
||||
def __sanitize(self, s: str) -> str:
|
||||
@staticmethod
|
||||
def __sanitize(s: str) -> str:
|
||||
s = "".join(x for x in s if x.isalnum() or x in "._-()")
|
||||
return s.replace("_", " ").strip()
|
||||
|
40
metadata.py
40
metadata.py
@ -1,33 +1,33 @@
|
||||
from typing import Optional, Union
|
||||
from tags import Tags
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import fluentpy as _
|
||||
import logging
|
||||
import json
|
||||
import re
|
||||
import requests
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import fluentpy as _
|
||||
import requests
|
||||
|
||||
from tags import Tags
|
||||
|
||||
|
||||
class Metadata:
|
||||
def __init__(self, dir_tmp: Path):
|
||||
self.dir_tmp = dir_tmp
|
||||
self.tmp_image_file = Path(self.dir_tmp, "tmp.jpg")
|
||||
|
||||
def process(self, url: str) -> Optional[Union[Path, Tags]]:
|
||||
def process(self, url: str) -> Optional[tuple[Path, Tags]]:
|
||||
logging.info("Retrieving metadata for %s", url)
|
||||
meta = self.__get_metadata(url)
|
||||
status = self.__download_file(meta)
|
||||
if not status:
|
||||
logging.warn("Download failed")
|
||||
logging.warning("Download failed")
|
||||
return None
|
||||
return self.__write_tags(url, meta)
|
||||
|
||||
def __get_metadata(self, url: str) -> dict:
|
||||
@staticmethod
|
||||
def __get_metadata(url: str) -> dict:
|
||||
return requests.get(url + ".json").json()
|
||||
# with open('test/test.json', 'rt', encoding='utf8') as f:
|
||||
# r = json.load(f)
|
||||
# return r
|
||||
|
||||
def __download_file(self, r: dict) -> bool:
|
||||
ext = r.get("file_ext", "")
|
||||
@ -42,10 +42,11 @@ class Metadata:
|
||||
recompress = self.__need_recompress(ext, w, h, file_size_kb)
|
||||
return self.__download(file_url, recompress=recompress)
|
||||
|
||||
def __need_recompress(self, ext, w, h, size_kb):
|
||||
@staticmethod
|
||||
def __need_recompress(ext, w, h, size_kb) -> bool:
|
||||
return ext == 'jpg' and size_kb > 1400 and w < 2500 and h < 2500
|
||||
|
||||
def __download(self, img_url: str, recompress: bool = False):
|
||||
def __download(self, img_url: str, recompress: bool = False) -> bool:
|
||||
opt_args = []
|
||||
if recompress:
|
||||
opt_args = ['-quality', "80"]
|
||||
@ -56,8 +57,9 @@ class Metadata:
|
||||
], stdout=subprocess.PIPE)
|
||||
return ret == 0
|
||||
|
||||
|
||||
def __write_tags(self, url: str, r: dict) -> tuple:
|
||||
# noinspection PyCallingNonCallable
|
||||
# noinspection PyProtectedMember
|
||||
def __write_tags(self, url: str, r: dict) -> tuple[Path, Tags]:
|
||||
tag_general = r.get('tag_string_general', "")
|
||||
tag_copyrights = r.get('tag_string_copyright', "")
|
||||
tag_characters = r.get('tag_string_character', "")
|
||||
@ -87,8 +89,8 @@ class Metadata:
|
||||
self.tmp_image_file.rename(result_file)
|
||||
return result_file, tags
|
||||
|
||||
|
||||
def __format_filename(self, tags: Tags):
|
||||
@staticmethod
|
||||
def __format_filename(tags: Tags):
|
||||
filename = '{} {} by {} at {}.jpg'.format(
|
||||
tags.copyrights.split(" ")[0] or "",
|
||||
", ".join(tags.characters_sanitized()[:2]),
|
||||
|
56
picsorter.py
56
picsorter.py
@ -1,57 +1,50 @@
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from config import Config
|
||||
from database import Database
|
||||
from iqdb import Iqdb
|
||||
from library import Library
|
||||
from metadata import Metadata
|
||||
from database import Database
|
||||
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import re
|
||||
import logging
|
||||
import time
|
||||
import shutil
|
||||
import os
|
||||
|
||||
class PicSorter:
|
||||
def __init__(self):
|
||||
self.dir_tmp = Path('R:/')
|
||||
self.dir_input = Path('R:/input')
|
||||
self.dir_processed = Path('R:/processed')
|
||||
self.dir_logs = Path('./logs')
|
||||
self.dir_library = Path('./library')
|
||||
self.setup_folders()
|
||||
self.config = Config.load('config.yml')
|
||||
self.setup_logging()
|
||||
|
||||
def setup_folders(self):
|
||||
self.dir_tmp.mkdir(exist_ok=True)
|
||||
self.dir_processed.mkdir(exist_ok=True)
|
||||
self.dir_logs.mkdir(exist_ok=True)
|
||||
self.dir_library.mkdir(exist_ok=True)
|
||||
|
||||
def setup_logging(self):
|
||||
logfile = Path(self.dir_logs, datetime.now().strftime('%Y-%m-%d.log'))
|
||||
filename = datetime.now().strftime('%Y-%m-%d.log')
|
||||
logfile = Path(self.config.dir_logs, filename)
|
||||
logging.basicConfig(
|
||||
filename=logfile,
|
||||
filename=os.fspath(logfile),
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s %(levelname)s %(module)s: %(message)s',
|
||||
datefmt='%H:%M:%S',
|
||||
)
|
||||
|
||||
def process_folder(self):
|
||||
iqdb = Iqdb()
|
||||
library = Library(self.dir_library)
|
||||
metadata = Metadata(self.dir_tmp)
|
||||
config = self.config
|
||||
library = Library(config.dir_library)
|
||||
metadata = Metadata(config.dir_tmp)
|
||||
db = Database()
|
||||
files = {p for p in self.dir_input.iterdir() if p.suffix in [".jpg", ".png"]}
|
||||
files = {p for p in config.dir_input.iterdir()
|
||||
if p.suffix in [".jpg", ".png"]}
|
||||
for filename in files:
|
||||
print("Process ", filename)
|
||||
print("Process", filename)
|
||||
try:
|
||||
url = iqdb.search(filename)
|
||||
url = Iqdb.search(filename)
|
||||
if url is None:
|
||||
logging.warn("%s not found", filename)
|
||||
logging.warning("%s not found", filename)
|
||||
library.move_to_orphan(Path(filename))
|
||||
continue
|
||||
|
||||
m = re.search(r".*posts\/(\d{3,})", url)
|
||||
m = re.search(r".*posts/(\d{3,})", url)
|
||||
if not m:
|
||||
continue
|
||||
post_id = int(m.group(1))
|
||||
@ -65,10 +58,11 @@ class PicSorter:
|
||||
image_path, tags = meta_result
|
||||
library.move(image_path, tags)
|
||||
db.add(post_id, tags.tags_string)
|
||||
shutil.move(os.fspath(filename), os.fspath(self.dir_processed))
|
||||
shutil.move(os.fspath(filename), os.fspath(config.dir_processed))
|
||||
time.sleep(5)
|
||||
except Exception as ex:
|
||||
raise ex
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
PicSorter().process_folder()
|
||||
|
@ -1,3 +1,4 @@
|
||||
fluentpy==2.0
|
||||
beautifulsoup4==4.9.3
|
||||
fluentpy>=2.0
|
||||
PyYAML==5.4.1
|
||||
requests==2.24.0
|
||||
requests>=2.24
|
||||
|
34
tags.py
34
tags.py
@ -1,47 +1,55 @@
|
||||
import fluentpy as _
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import fluentpy as _
|
||||
|
||||
|
||||
@dataclass
|
||||
class Tags:
|
||||
general: str
|
||||
copyrights: str
|
||||
characters: str
|
||||
artists: str
|
||||
tags: list = field(init=False)
|
||||
tags: list[str] = field(init=False)
|
||||
tags_string: str = field(init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.tags = self.__union_tags()
|
||||
self.tags_string = " ".join(self.tags)
|
||||
|
||||
# noinspection PyCallingNonCallable
|
||||
# noinspection PyProtectedMember
|
||||
def characters_sanitized(self) -> list:
|
||||
if self.copyrights == "":
|
||||
# No need to sanitize tags
|
||||
return self.characters.split(" ")
|
||||
copyrights = self.copyrights.split(" ")
|
||||
return _(self.characters) \
|
||||
.split(" ") \
|
||||
.filter(lambda s: s != "") \
|
||||
.map(lambda s: self.__rename(s, copyrights)) \
|
||||
._
|
||||
.split(" ") \
|
||||
.filter(lambda s: s != "") \
|
||||
.map(lambda s: self.__rename(s, copyrights)) \
|
||||
._
|
||||
|
||||
def __rename(self, s: str, substrings: list) -> str:
|
||||
@staticmethod
|
||||
def __rename(s: str, substrings: list[str]) -> str:
|
||||
for substring in substrings:
|
||||
s = s.replace("_("+substring+")", "") \
|
||||
.replace("("+substring+")", "") \
|
||||
.strip()
|
||||
return s
|
||||
|
||||
def __union_tags(self):
|
||||
def __union_tags(self) -> list[str]:
|
||||
tags = self.general.split(" ")
|
||||
tags += self.__prefix_tags(self.copyrights, 'copyright_')
|
||||
tags += self.__prefix_tags(self.characters, 'character_')
|
||||
tags += self.__prefix_tags(self.artists, 'artist_')
|
||||
return tags
|
||||
|
||||
def __prefix_tags(self, tags, prefix):
|
||||
# noinspection PyCallingNonCallable
|
||||
# noinspection PyProtectedMember
|
||||
@staticmethod
|
||||
def __prefix_tags(tags: str, prefix: str) -> list[str]:
|
||||
return _(tags) \
|
||||
.split(" ") \
|
||||
.filter(lambda s: s != "") \
|
||||
.map(lambda s: prefix + s.strip()) \
|
||||
._
|
||||
.split(" ") \
|
||||
.filter(lambda s: s != "") \
|
||||
.map(lambda s: prefix + s.strip()) \
|
||||
._
|
||||
|
Loading…
Reference in New Issue
Block a user