1
0
mirror of https://gist.github.com/6ba37e4d4084e858f917e271550ce5f6.git synced 2024-09-20 00:34:20 +03:00
picsorter/metadata.py
2023-07-29 18:00:12 +03:00

109 lines
3.9 KiB
Python

import logging
import re
import subprocess
from datetime import datetime
from pathlib import Path
from typing import Optional
import fluentpy as _
import requests
from tags import Tags
class Metadata:
def __init__(self, dir_tmp: Path):
self.dir_tmp = dir_tmp
self.tmp_image_file = Path(self.dir_tmp, "tmp.jpg")
self.tmp_fallback_download_file = Path(self.dir_tmp, "dl.jpg")
def process(self, url: str) -> Optional[tuple[Path, Tags]]:
logging.info("Retrieving metadata for %s", url)
meta = self.__get_metadata(url)
status = self.__download_file(meta)
if not status:
logging.warning("Download failed")
return None
return self.__write_tags(url, meta)
@staticmethod
def __get_metadata(url: str) -> dict:
return requests.get(url + ".json").json()
def __download_file(self, r: dict) -> bool:
ext = r.get("file_ext", "")
w = int(r.get("image_width", "0"))
h = int(r.get("image_height", "0"))
if (ext not in ["jpg", "jpeg", "png", "webp"]) or w == 0 or h == 0:
logging.warning("Skipping due to unsupported extension: %s", ext)
print("\033[93mSkipping due to unsupported extension:", ext, "\033[0m")
return False
file_url = r.get("file_url")
if file_url is None:
logging.warning("Skipping due to an empty file url")
print("\033[93mSkipping due to an empty file url\033[0m")
return False
file_size_kb = int(r.get('file_size', "0")) / 1024
logging.info("Downloading image")
recompress = self.__need_recompress(ext, w, h, file_size_kb)
return self.__download(file_url, recompress=recompress)
@staticmethod
def __need_recompress(ext, w, h, size_kb) -> bool:
return ext == 'jpg' and size_kb > 1400 and w < 2500 and h < 2500
def __download(self, img_url: str, recompress: bool = False) -> bool:
opt_args = []
if recompress:
opt_args = ['-quality', "80"]
ret = subprocess.call([
'magick', img_url,
'-resize', '2500x2500>',
*opt_args, self.tmp_image_file
], stdout=subprocess.PIPE)
return ret == 0
# noinspection PyCallingNonCallable
# noinspection PyProtectedMember
def __write_tags(self, url: str, r: dict) -> tuple[Path, Tags]:
tag_general = r.get('tag_string_general', "")
tag_copyrights = r.get('tag_string_copyright', "")
tag_characters = r.get('tag_string_character', "")
tag_artists = r.get('tag_string_artist', "")
tags = Tags(tag_general, tag_copyrights, tag_characters, tag_artists)
tags_file = Path(self.dir_tmp, "tags.txt")
with open(tags_file, "w") as f:
content = _(tags.tags) \
.map(lambda s: "-IPTC:keywords=" + s) \
.join("\n") \
._
content += "\n-Exif:ImageDescription=" + url
content += "\n-Iptc:Caption-Abstract=" + url
content += "\n-Xmp:Description=" + url
f.write(content)
logging.info("Writing tags")
subprocess.call([
'exiftool', '-q', '-overwrite_original',
'-@', tags_file,
self.tmp_image_file
], stdout=subprocess.PIPE)
filename = self.__format_filename(tags)
result_file = Path(self.tmp_image_file.parent, filename)
self.tmp_image_file.rename(result_file)
return result_file, tags
@staticmethod
def __format_filename(tags: Tags):
filename = '{} {} by {} at {}.jpg'.format(
tags.copyrights.split(" ")[0] or "",
", ".join(tags.characters_sanitized()[:2]),
tags.artists.split(" ")[0] or "",
datetime.now().strftime('%Y%m%d_%H%M%S')
)
filename = "".join(x for x in filename if x.isalnum() or x in " ._-()")
return re.sub(r'\s+', ' ', filename).strip()