Source code for exporter.util

import logging
import os
import shutil
from enum import StrEnum
from pathlib import Path
from urllib.parse import parse_qs, urlencode, urlsplit

from django.conf import settings
from django.db import connections
from yapw.clients import AsyncConsumer, Blocking
from yapw.decorators import decorate
from yapw.methods import add_callback_threadsafe, nack

from data_registry.exceptions import LockFileError

logger = logging.getLogger(__name__)


[docs] def get_client_kwargs(rabbit_params=None): if rabbit_params: parsed = urlsplit(settings.RABBIT_URL) query = parse_qs(parsed.query) query.update(rabbit_params) rabbit_url = parsed._replace(query=urlencode(query, doseq=True)).geturl() else: rabbit_url = settings.RABBIT_URL return {"url": rabbit_url, "exchange": settings.RABBIT_EXCHANGE_NAME}
[docs] def publish(*args, **kwargs): client = Blocking(**get_client_kwargs()) try: client.publish(*args, **kwargs) finally: client.close()
[docs] def consume(*args, rabbit_params=None, **kwargs): client = AsyncConsumer(*args, **kwargs, **get_client_kwargs(rabbit_params)) client.start()
[docs] def decorator(decode, callback, state, channel, method, properties, body): """ Close the database connections opened by the callback, before returning. If the callback raises an exception, shut down the client in the main thread, without acknowledgment. For some exceptions, assume that the same message was delivered twice, log an error, and nack the message. """ def errback(exception): if isinstance(exception, LockFileError): logger.exception("Locked since %s, maybe caused by duplicate message %r, skipping", exception, body) nack(state, channel, method.delivery_tag, requeue=False) else: logger.exception("Unhandled exception when consuming %r, shutting down gracefully", body) add_callback_threadsafe(state.connection, state.interrupt) def finalback(): for conn in connections.all(): conn.close() decorate(decode, callback, state, channel, method, properties, body, errback, finalback)
[docs] class TaskStatus(StrEnum): #: Processing hasn't started. WAITING = "WAITING" #: Processing has started. RUNNING = "RUNNING" #: Processing has ended. COMPLETED = "COMPLETED"
[docs] class Export:
[docs] @classmethod def get_files(cls, *components, **kwargs): components = [c for c in components if c] if components: return cls(*components, **kwargs).files return cls.default_files()
[docs] @classmethod def default_files(cls): return { "csv": {"full": False, "by_year": []}, "jsonl": {"full": False, "by_year": []}, "xlsx": {"full": False, "by_year": []}, }
def __init__(self, *components, basename: str | None = None): """ ``basename`` is required to use ``lock()``, ``unlock()``, ``locked`` and ``status``. :param components: the path components of the export directory :param basename: the basename of the output file of the export operation """ self.directory = Path(settings.EXPORTER_DIR).joinpath(*map(str, components)) self.spoonbill_directory = Path(settings.SPOONBILL_EXPORTER_DIR).joinpath(*map(str, components)) # Cause methods that require `basename` to error if the instance is improperly initialized. if basename: self.basename = basename def __str__(self): return f"{self.directory}/{self.basename}" def __repr__(self): return f"Export(directory={self.directory}, basename={self.basename})" @property def path(self): return self.directory / self.basename # This method's logic must match the workers' logic, so that views can get the status of an export task, by # providing only the desired filename. @property def lockfile(self) -> Path: # All JSONL files are exported at once. if self.basename.endswith(".jsonl.gz"): return self.directory / "exporter_full.jsonl.gz.lock" # Each XLSX file is exported with a CSV file. if self.basename.endswith(".xlsx"): return self.directory / f"exporter_{self.basename[:-5]}.csv.tar.gz.lock" return self.directory / f"exporter_{self.basename}.lock"
[docs] def lock(self) -> None: """Create the lock file.""" try: with self.lockfile.open("x"): pass except FileExistsError as e: raise LockFileError(self.lockfile.stat().st_mtime) from e
[docs] def unlock(self) -> None: """Delete the lock file.""" self.lockfile.unlink(missing_ok=True)
[docs] def remove(self): """Delete the export directory recursively.""" if self.directory.exists(): shutil.rmtree(self.directory)
@property def locked(self) -> bool: """Return whether the output file is being written.""" return self.lockfile.exists() @property def status(self) -> TaskStatus: """Return the status of the export.""" if self.locked: # the output file is being written return TaskStatus.RUNNING if self.path.exists(): # the output file has been written return TaskStatus.COMPLETED return TaskStatus.WAITING @property def files(self) -> dict: """Return all the available file formats and segments (by year or full).""" files = self.default_files() for path in self.iterdir(): suffix = path.name.split(".", 2)[1] # works for .xlsx .jsonl.gz .csv.tar.gz if suffix not in files: continue prefix = path.name[:4] # year or "full" if prefix.isdigit(): files[suffix]["by_year"].append({"year": int(prefix), "size": os.path.getsize(path)}) elif prefix == "full": files[suffix]["full"] = os.path.getsize(path) return files
[docs] def iterdir(self): """Yield path objects of the directory contents.""" if self.directory.exists(): yield from self.directory.iterdir()
[docs] def get_convertible_paths(self): """Yield paths to ``.jsonl.gz`` files.""" for path in self.iterdir(): if path.name.endswith(".jsonl.gz"): yield path