from typing import Optional, Union from tags import Tags from pathlib import Path from datetime import datetime import fluentpy as _ import logging import json import re import requests import subprocess class Metadata: def __init__(self, dir_tmp: Path): self.dir_tmp = dir_tmp self.tmp_image_file = Path(self.dir_tmp, "tmp.jpg") def process(self, url: str) -> Optional[Union[Path, Tags]]: logging.info("Retrieving metadata for %s", url) meta = self.__get_metadata(url) status = self.__download_file(meta) if not status: logging.warn("Download failed") return None return self.__write_tags(url, meta) def __get_metadata(self, url: str) -> dict: return requests.get(url + ".json").json() # with open('test/test.json', 'rt', encoding='utf8') as f: # r = json.load(f) # return r def __download_file(self, r: dict) -> bool: ext = r.get("file_ext", "") w = int(r.get("image_width", "0")) h = int(r.get("image_height", "0")) if (ext not in ["jpg", "jpeg", "png", "webp"]) or w == 0 or h == 0: return False file_url = r.get("file_url") file_size_kb = int(r.get('file_size', "0")) / 1024 logging.info("Downloading image") recompress = self.__need_recompress(ext, w, h, file_size_kb) return self.__download(file_url, recompress=recompress) def __need_recompress(self, ext, w, h, size_kb): return ext == 'jpg' and size_kb > 1400 and w < 2500 and h < 2500 def __download(self, img_url: str, recompress: bool = False): opt_args = [] if recompress: opt_args = ['-quality', 80] ret = subprocess.call([ 'magick', img_url, '-resize', '2500x2500>', *opt_args, self.tmp_image_file ], stdout=subprocess.PIPE) return ret == 0 def __write_tags(self, url: str, r: dict) -> tuple: tag_general = r.get('tag_string_general', "") tag_copyrights = r.get('tag_string_copyright', "") tag_characters = r.get('tag_string_character', "") tag_artists = r.get('tag_string_artist', "") tags = Tags(tag_general, tag_copyrights, tag_characters, tag_artists) tags_file = Path(self.dir_tmp, "tags.txt") with open(tags_file, "w") as f: content = _(tags.tags) \ .map(lambda s: "-IPTC:keywords=" + s) \ .join("\n") \ ._ content += "\n-Exif:ImageDescription=" + url content += "\n-Iptc:Caption-Abstract=" + url content += "\n-Xmp:Description=" + url f.write(content) logging.info("Writing tags") subprocess.call([ 'exiftool', '-q', '-overwrite_original', '-@', tags_file, self.tmp_image_file ], stdout=subprocess.PIPE) filename = self.__format_filename(tags) result_file = Path(self.tmp_image_file.parent, filename) self.tmp_image_file.rename(result_file) return result_file, tags def __format_filename(self, tags: Tags): filename = '{} {} by {} at {}.jpg'.format( tags.copyrights.split(" ")[0] or "", ", ".join(tags.characters_sanitized()[:2]), tags.artists.split(" ")[0] or "", datetime.now().strftime('%Y%m%d_%H%M%S') ) return re.sub(r'\s+', ' ', filename).strip()