Source code for data_registry.process_manager.util

import functools
import logging
from abc import ABC, abstractmethod

import requests
from django.contrib.contenttypes.models import ContentType
from django.contrib.sites.models import Site
from django.urls import reverse

from data_registry import models
from data_registry.exceptions import RecoverableError
from data_registry.util import CHANGE
from exporter.util import Export, TaskStatus

logger = logging.getLogger(__name__)


[docs] def export_to_task_status(export: Export) -> models.Task.Status: match export.status: case TaskStatus.WAITING: return models.Task.Status.WAITING case TaskStatus.RUNNING: return models.Task.Status.RUNNING case TaskStatus.COMPLETED: return models.Task.Status.COMPLETED
[docs] def skip_if_not_started(method): """ Decorate a :meth:`~data_registry.process_manager.util.TaskManager.wipe` method to return early if the task hadn't started. """ @functools.wraps(method) def wrapper(self, *args, **kwargs): if not self.task.start: logger.debug("%s has nothing to wipe (task didn't start)", self) return method(self, *args, **kwargs) return wrapper
[docs] class TaskManager(ABC): """Task managers should only update the :class:`~data_registry.models.Job` context and metadata fields.""" def __init__(self, task: models.Task): """ Initialize the task manager. This method must not assume that any previous tasks succeeded (for example, if it is called only to :meth:`~data_registry.process_manager.util.TaskManager.wipe`). """ self.task = task @property def job(self) -> models.Job: """The job of which the task is a part.""" return self.task.job @property def collection(self) -> models.Collection: """The publication on which the task is performed.""" return self.task.job.collection @property def job_url(self) -> str: """The URL to the job's admin change page.""" path = reverse(CHANGE.format(content_type=ContentType.objects.get_for_model(models.Job)), args=[self.job.pk]) return f"https://{Site.objects.get_current().domain}{path}" @property @abstractmethod def final_output(self) -> bool: """ Whether the task produces a final output, like a bulk download. If False, then once a job is complete, the "manageprocess" management command calls :meth:`~data_registry.process_manager.util.TaskManager.wipe` (unless temporary data is to be preserved). """
[docs] def request(self, method, url, *, error_message, timeout=10, **kwargs): """ Send a request to an application. If the application returns an error response or is temporarily unavailable, raise :class:`~data_registry.exceptions.RecoverableError`. :raises RecoverableError: """ try: response = requests.request(method, url, **kwargs, timeout=timeout) response.raise_for_status() except requests.RequestException as e: raise RecoverableError(f"{self}: {error_message} ({url})") from e return response
[docs] @abstractmethod def run(self) -> None: """ Start the task. This method is called once. :raises RecoverableError: if the task can't be started, temporarily """
[docs] @abstractmethod def get_status(self) -> tuple[models.Task.Status, str | None]: """ Return the status of the task, and an optional failure reason. This method must be called only after :meth:`~data_registry.process_manager.util.TaskManager.run` is called. This method can write metadata about the task to the job. Since this method can be called many times, write metadata only when the metadata is missing or when the task is complete. This method is expected to be called within a transaction. :raises RecoverableError: """
[docs] @abstractmethod def wipe(self) -> None: """ Delete any side effects of (for example, data written by) the task. This method can be called even when the task hasn't started. This method must be idempotent. It is retried if any task failed to be wiped. :raises RecoverableError: if the task can't be wiped, temporarily """
def __str__(self): return f"Publication {self.collection}: Job #{self.job.pk}: {type(self).__name__}"