Source code for data_registry.process_manager

import datetime
import logging

from django.conf import settings
from django.db import transaction
from django.utils.timezone import now

from data_registry import models
from data_registry.exceptions import RecoverableError
from data_registry.process_manager.task.collect import Collect
from data_registry.process_manager.task.coverage import Coverage
from data_registry.process_manager.task.exporter import Exporter
from data_registry.process_manager.task.flattener import Flattener
from data_registry.process_manager.task.process import Process
from data_registry.process_manager.util import TaskManager

logger = logging.getLogger(__name__)

FLATTENER_EXECUTION_LIMIT = datetime.timedelta(days=1)
UNSUCCESSFUL_JOB_RETENTION = datetime.timedelta(days=180)


[docs] def get_task_manager(task: models.Task) -> TaskManager: """Instantiate and return a task manager for the task.""" match task.type: case models.Task.Type.COLLECT: return Collect(task) case models.Task.Type.PROCESS: return Process(task) case models.Task.Type.EXPORTER: return Exporter(task) case models.Task.Type.COVERAGE: return Coverage(task) case models.Task.Type.FLATTENER: return Flattener(task) case _: raise NotImplementedError(repr(task.type))
[docs] def delete_older_jobs(collection: models.Collection, job: models.Job, *, dry_run: bool = False) -> None: """ Delete old jobs for a collection. - Delete past unsuccessful jobs older than 180 days - Delete past successful jobs with less than 10% more OCIDs, keeping the second-most recent job :param collection: The collection on which to operate :param job: A job to keep and to which to compare OCID counts :param dry_run: Show what would be done, without making changes """ country = collection.country other_jobs = collection.job_set.exclude(pk=job.pk) # Keep the second-most recent successful job. old_successful_jobs = other_jobs.successful().order_by("-start")[1:] # Keep unsuccessful jobs for six months, for debugging. old_unsuccessful_jobs = other_jobs.unsuccessful().filter(start__lt=now() - UNSUCCESSFUL_JOB_RETENTION) # NOTE: Administrators must check incomplete jobs manually. # NOTE: The Collect task's wipe() method can be slow. for old_job in old_unsuccessful_jobs: if dry_run: logger.info("DRY RUN: Would delete old unsuccessful job %s (%s: %s)", old_job, country, collection) else: old_job.delete() logger.debug("Deleted old unsuccessful job %s (%s: %s)", old_job, country, collection) new_ocid_count = job.coverage.get("/ocid", 0) for old_job in old_successful_jobs: old_ocid_count = old_job.coverage.get("/ocid", 0) if old_ocid_count <= new_ocid_count * 1.1: if dry_run: logger.info("DRY RUN: Would delete old successful job %s (%s: %s)", old_job, country, collection) else: old_job.delete() logger.debug("Deleted old successful job %s (%s: %s)", old_job, country, collection) else: logger.warning( "Keeping old job %s with over 10%% more OCIDs than newest job %s (%s >> %s) (%s: %s)", old_job, job, old_ocid_count, new_ocid_count, country, collection, )
[docs] def process(collection: models.Collection, *, dry_run: bool = False) -> None: """ If the collection is :meth:`out-of-date<data_registry.models.Collection.is_out_of_date>`, create a job. For each of the collection's incomplete jobs: - If the job is planned, start the job - If the next task is planned, start the task with :meth:`~data_registry.process_manager.util.TaskManager.run` - If the next task is waiting or running, recheck its status with :meth:`~data_registry.process_manager.util.TaskManager.get_status`: - If it is completed, complete the task and start the next task - If it failed temporarily, log the reason - If it failed permanently, fail the task and end the job - If all tasks succeeded: - End the job - Update the collection's active job and last retrieved date - Delete past unsuccessful jobs older than 180 days - Delete past successful jobs with less than 10% more OCIDs, keeping the active and next-most recent job In other words, this function advances each job by at most one task. As such, for all tasks of a job to succeed, this function needs to run at least as many times are there are tasks in the ``JOB_TASKS_PLAN`` setting. :param collection: The collection to process :param dry_run: Show what would be done, without making changes """ country = collection.country if collection.is_out_of_date(): if dry_run: logger.info("DRY RUN: Would create job for out-of-date collection %s: %s", country, collection) else: collection.job_set.create() # see signals.py for job in collection.job_set.incomplete(): for task in job.task_set.exclude(status=models.Task.Status.COMPLETED).order_by("order"): task_manager = get_task_manager(task) if dry_run: logger.info("DRY RUN: Would progress task %s for job %s (%s: %s)", task, job, country, collection) continue try: with transaction.atomic(): match task.status: case models.Task.Status.PLANNED: # If this is the first task... if job.status == models.Job.Status.PLANNED: job.initiate() logger.debug("Job %s is starting (%s: %s)", job, country, collection) task_manager.run() task.initiate() logger.debug("Task %s is starting (%s: %s)", task, country, collection) break case models.Task.Status.WAITING | models.Task.Status.RUNNING: status, note = task_manager.get_status() logger.debug("Task %s is %s (%s: %s)", task, status, country, collection) match status: case models.Task.Status.WAITING | models.Task.Status.RUNNING: task.progress() # The application is responding (again). Reset any progress. # Check if the flattener task has been stuck for a day. if ( task.type == models.Task.Type.FLATTENER and now() - task.start > FLATTENER_EXECUTION_LIMIT ): # Perform the same actions as the PLANNED branch. task_manager.run() task.initiate() logger.warning("Task %s is restarting (%s: %s)", task, country, collection) break case models.Task.Status.COMPLETED: if note: logger.warning("Task %s failed (%s: %s): %s", task, country, collection, note) task.complete(result=models.Task.Result.FAILED, note=note) job.complete() logger.warning("Job %s has failed (%s: %s)", job, country, collection) break task.complete(result=models.Task.Result.OK) # Do not break! Go onto the next task. except RecoverableError as e: logger.exception("Recoverable exception during task %s (%s: %s)", task, country, collection) task.progress(result=models.Task.Result.FAILED, note=str(e)) # The application is not responding. break except Exception as e: logger.exception("Unhandled exception during task %s (%s: %s)", task, country, collection) task.complete(result=models.Task.Result.FAILED, note=str(e)) job.complete() logger.warning("Job %s has failed (%s: %s)", job, country, collection) break # All tasks completed successfully. else: if dry_run: logger.info("DRY RUN: Would complete job %s and update collection %s: %s", job, country, collection) else: with transaction.atomic(): job.complete() collection.last_retrieved = job.task_set.get(type=settings.JOB_TASKS_PLAN[0]).end collection.active_job = job if not collection.publication_policy: collection.publication_policy = job.publication_policy collection.save() logger.debug("Job %s has succeeded (%s: %s)", job, country, collection) delete_older_jobs(collection, job, dry_run=dry_run)