import logging import re import subprocess from datetime import datetime from pathlib import Path from typing import Optional import fluentpy as _ import requests from tags import Tags class Metadata: def __init__(self, dir_tmp: Path): self.dir_tmp = dir_tmp self.tmp_image_file = Path(self.dir_tmp, "tmp.jpg") def process(self, url: str) -> Optional[tuple[Path, Tags]]: logging.info("Retrieving metadata for %s", url) meta = self.__get_metadata(url) status = self.__download_file(meta) if not status: logging.warning("Download failed") return None return self.__write_tags(url, meta) @staticmethod def __get_metadata(url: str) -> dict: return requests.get(url + ".json").json() def __download_file(self, r: dict) -> bool: ext = r.get("file_ext", "") w = int(r.get("image_width", "0")) h = int(r.get("image_height", "0")) if (ext not in ["jpg", "jpeg", "png", "webp"]) or w == 0 or h == 0: return False file_url = r.get("file_url") file_size_kb = int(r.get('file_size', "0")) / 1024 logging.info("Downloading image") recompress = self.__need_recompress(ext, w, h, file_size_kb) return self.__download(file_url, recompress=recompress) @staticmethod def __need_recompress(ext, w, h, size_kb) -> bool: return ext == 'jpg' and size_kb > 1400 and w < 2500 and h < 2500 def __download(self, img_url: str, recompress: bool = False) -> bool: opt_args = [] if recompress: opt_args = ['-quality', "80"] ret = subprocess.call([ 'magick', img_url, '-resize', '2500x2500>', *opt_args, self.tmp_image_file ], stdout=subprocess.PIPE) return ret == 0 # noinspection PyCallingNonCallable # noinspection PyProtectedMember def __write_tags(self, url: str, r: dict) -> tuple[Path, Tags]: tag_general = r.get('tag_string_general', "") tag_copyrights = r.get('tag_string_copyright', "") tag_characters = r.get('tag_string_character', "") tag_artists = r.get('tag_string_artist', "") tags = Tags(tag_general, tag_copyrights, tag_characters, tag_artists) tags_file = Path(self.dir_tmp, "tags.txt") with open(tags_file, "w") as f: content = _(tags.tags) \ .map(lambda s: "-IPTC:keywords=" + s) \ .join("\n") \ ._ content += "\n-Exif:ImageDescription=" + url content += "\n-Iptc:Caption-Abstract=" + url content += "\n-Xmp:Description=" + url f.write(content) logging.info("Writing tags") subprocess.call([ 'exiftool', '-q', '-overwrite_original', '-@', tags_file, self.tmp_image_file ], stdout=subprocess.PIPE) filename = self.__format_filename(tags) result_file = Path(self.tmp_image_file.parent, filename) self.tmp_image_file.rename(result_file) return result_file, tags @staticmethod def __format_filename(tags: Tags): filename = '{} {} by {} at {}.jpg'.format( tags.copyrights.split(" ")[0] or "", ", ".join(tags.characters_sanitized()[:2]), tags.artists.split(" ")[0] or "", datetime.now().strftime('%Y%m%d_%H%M%S') ) return re.sub(r'\s+', ' ', filename).strip()