2021-04-14 23:23:56 +03:00
|
|
|
import logging
|
|
|
|
import re
|
|
|
|
import subprocess
|
2021-04-15 12:30:26 +03:00
|
|
|
from datetime import datetime
|
|
|
|
from pathlib import Path
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
import fluentpy as _
|
|
|
|
import requests
|
|
|
|
|
|
|
|
from tags import Tags
|
|
|
|
|
2021-04-14 23:23:56 +03:00
|
|
|
|
|
|
|
class Metadata:
|
|
|
|
def __init__(self, dir_tmp: Path):
|
|
|
|
self.dir_tmp = dir_tmp
|
|
|
|
self.tmp_image_file = Path(self.dir_tmp, "tmp.jpg")
|
|
|
|
|
2021-04-15 12:30:26 +03:00
|
|
|
def process(self, url: str) -> Optional[tuple[Path, Tags]]:
|
2021-04-14 23:23:56 +03:00
|
|
|
logging.info("Retrieving metadata for %s", url)
|
|
|
|
meta = self.__get_metadata(url)
|
|
|
|
status = self.__download_file(meta)
|
|
|
|
if not status:
|
2021-04-15 12:30:26 +03:00
|
|
|
logging.warning("Download failed")
|
2021-04-14 23:23:56 +03:00
|
|
|
return None
|
|
|
|
return self.__write_tags(url, meta)
|
|
|
|
|
2021-04-15 12:30:26 +03:00
|
|
|
@staticmethod
|
|
|
|
def __get_metadata(url: str) -> dict:
|
2021-04-14 23:23:56 +03:00
|
|
|
return requests.get(url + ".json").json()
|
|
|
|
|
|
|
|
def __download_file(self, r: dict) -> bool:
|
|
|
|
ext = r.get("file_ext", "")
|
|
|
|
w = int(r.get("image_width", "0"))
|
|
|
|
h = int(r.get("image_height", "0"))
|
|
|
|
if (ext not in ["jpg", "jpeg", "png", "webp"]) or w == 0 or h == 0:
|
|
|
|
return False
|
|
|
|
file_url = r.get("file_url")
|
|
|
|
file_size_kb = int(r.get('file_size', "0")) / 1024
|
|
|
|
|
|
|
|
logging.info("Downloading image")
|
|
|
|
recompress = self.__need_recompress(ext, w, h, file_size_kb)
|
|
|
|
return self.__download(file_url, recompress=recompress)
|
2021-04-15 12:30:26 +03:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def __need_recompress(ext, w, h, size_kb) -> bool:
|
2021-04-14 23:23:56 +03:00
|
|
|
return ext == 'jpg' and size_kb > 1400 and w < 2500 and h < 2500
|
2021-04-15 12:30:26 +03:00
|
|
|
|
|
|
|
def __download(self, img_url: str, recompress: bool = False) -> bool:
|
2021-04-14 23:23:56 +03:00
|
|
|
opt_args = []
|
|
|
|
if recompress:
|
2021-04-15 00:39:01 +03:00
|
|
|
opt_args = ['-quality', "80"]
|
2021-04-14 23:23:56 +03:00
|
|
|
ret = subprocess.call([
|
|
|
|
'magick', img_url,
|
|
|
|
'-resize', '2500x2500>',
|
|
|
|
*opt_args, self.tmp_image_file
|
|
|
|
], stdout=subprocess.PIPE)
|
|
|
|
return ret == 0
|
|
|
|
|
2021-04-15 12:30:26 +03:00
|
|
|
# noinspection PyCallingNonCallable
|
|
|
|
# noinspection PyProtectedMember
|
|
|
|
def __write_tags(self, url: str, r: dict) -> tuple[Path, Tags]:
|
2021-04-14 23:23:56 +03:00
|
|
|
tag_general = r.get('tag_string_general', "")
|
|
|
|
tag_copyrights = r.get('tag_string_copyright', "")
|
|
|
|
tag_characters = r.get('tag_string_character', "")
|
|
|
|
tag_artists = r.get('tag_string_artist', "")
|
|
|
|
tags = Tags(tag_general, tag_copyrights, tag_characters, tag_artists)
|
|
|
|
|
|
|
|
tags_file = Path(self.dir_tmp, "tags.txt")
|
|
|
|
with open(tags_file, "w") as f:
|
|
|
|
content = _(tags.tags) \
|
|
|
|
.map(lambda s: "-IPTC:keywords=" + s) \
|
|
|
|
.join("\n") \
|
|
|
|
._
|
|
|
|
content += "\n-Exif:ImageDescription=" + url
|
|
|
|
content += "\n-Iptc:Caption-Abstract=" + url
|
|
|
|
content += "\n-Xmp:Description=" + url
|
|
|
|
f.write(content)
|
|
|
|
|
|
|
|
logging.info("Writing tags")
|
|
|
|
subprocess.call([
|
|
|
|
'exiftool', '-q', '-overwrite_original',
|
|
|
|
'-@', tags_file,
|
|
|
|
self.tmp_image_file
|
|
|
|
], stdout=subprocess.PIPE)
|
|
|
|
|
|
|
|
filename = self.__format_filename(tags)
|
|
|
|
result_file = Path(self.tmp_image_file.parent, filename)
|
|
|
|
self.tmp_image_file.rename(result_file)
|
|
|
|
return result_file, tags
|
|
|
|
|
2021-04-15 12:30:26 +03:00
|
|
|
@staticmethod
|
|
|
|
def __format_filename(tags: Tags):
|
2021-04-14 23:23:56 +03:00
|
|
|
filename = '{} {} by {} at {}.jpg'.format(
|
|
|
|
tags.copyrights.split(" ")[0] or "",
|
2021-04-14 23:59:28 +03:00
|
|
|
", ".join(tags.characters_sanitized()[:2]),
|
2021-04-14 23:23:56 +03:00
|
|
|
tags.artists.split(" ")[0] or "",
|
|
|
|
datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
)
|
|
|
|
return re.sub(r'\s+', ' ', filename).strip()
|