Source code for data_registry.models

import logging
from datetime import date, timedelta

from django.db import models
from django.db.models import Q
from django.db.models.functions import Now
from django.utils.translation import gettext_lazy as _

logger = logging.getLogger(__name__)


def format_datetime(dt):
    if isinstance(dt, Now):
        return "now"
    return dt.strftime("%d-%b-%y") if dt else ""


[docs] class JobQuerySet(models.QuerySet):
[docs] def successful(self): """Return a query set of successfully completed jobs.""" return self.complete().exclude(models.Exists(Task.objects.failed()))
[docs] def unsuccessful(self): """Return a query set of unsuccessfully completed jobs.""" return self.complete().filter(models.Exists(Task.objects.failed()))
[docs] def complete(self): """Return a query set of complete jobs.""" return self.filter(status=Job.Status.COMPLETED)
[docs] def incomplete(self): """Return a query set of incomplete jobs.""" return self.exclude(status=Job.Status.COMPLETED)
[docs] class Job(models.Model):
[docs] class Status(models.TextChoices): #: The job is planned. PLANNED = "PLANNED", "PLANNED" #: The job has started. RUNNING = "RUNNING", "RUNNING" #: The job has ended (either successfully or unsuccessfully). COMPLETED = "COMPLETED", "COMPLETED"
collection = models.ForeignKey("Collection", on_delete=models.CASCADE, db_index=True, verbose_name="publication") # Job metadata start = models.DateTimeField(blank=True, null=True, verbose_name="job started at") end = models.DateTimeField(blank=True, null=True, verbose_name="job ended at") status = models.TextField(choices=Status, blank=True, default=Status.PLANNED) context = models.JSONField( blank=True, default=dict, help_text="<dl>" "<dt><code>spider</code></dt>" "<dd>The name of the spider in Kingfisher Collect</dd>" "<dt><code>data_version</code></dt>" "<dd>The data version of the crawl in Kingfisher Collect</dd>" "<dt><code>job_id</code></dt>" "<dd>The ID of the job in Scrapyd</dd>" "<dt><code>scrapy_log</code></dt>" "<dd>A local URL to the log file of the crawl in Scrapyd</dd>" "<dt><code>process_id</code></dt>" "<dd>The ID of the base collection in Kingfisher Process</dd>" "<dt><code>process_compiled_collection_id</code></dt>" "<dd>The ID of the compiled collection in Kingfisher Process</dd>" "</dl>", ) # Job logic archived = models.BooleanField( default=False, verbose_name="temporary data deleted", help_text="Whether the temporary data created by job tasks has been deleted.", ) keep_all_data = models.BooleanField( default=False, verbose_name="preserve temporary data", help_text="By default, temporary data created by job tasks is deleted after the job " "is completed. Only the data registry's models' data and JSON exports are " "retained. To preserve temporary data for debugging, check this box. Then, " 'when ready, uncheck this box and run the "manageprocess" management command.', ) # Field coverage coverage = models.JSONField( blank=True, default=dict, help_text="The collection compiled releases coverage from Cardinal.", ) tenders_count = models.IntegerField(default=0) tenderers_count = models.IntegerField(default=0) tenders_items_count = models.IntegerField(default=0) parties_count = models.IntegerField(default=0) awards_count = models.IntegerField(default=0) awards_items_count = models.IntegerField(default=0) awards_suppliers_count = models.IntegerField(default=0) contracts_count = models.IntegerField(default=0) contracts_items_count = models.IntegerField(default=0) contracts_transactions_count = models.IntegerField(default=0) documents_count = models.IntegerField(default=0) plannings_count = models.IntegerField(default=0) milestones_count = models.IntegerField(default=0) amendments_count = models.IntegerField(default=0) # Collection metadata date_from = models.DateField(blank=True, null=True, verbose_name="minimum release date") date_to = models.DateField(blank=True, null=True, verbose_name="maximum release date") ocid_prefix = models.TextField(blank=True, verbose_name="OCID prefix") license = models.TextField(blank=True) publication_policy = models.TextField(blank=True) # Timestamps created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) objects = JobQuerySet.as_manager() def __str__(self): return f"{format_datetime(self.start)} .. {format_datetime(self.end)} ({self.pk})" def __repr__(self): return f"{self.collection!r}: {self}"
[docs] def initiate(self): """Mark the job as started.""" self.start = Now() self.status = Job.Status.RUNNING self.save()
[docs] def complete(self): """Mark the job as ended.""" self.end = Now() self.status = Job.Status.COMPLETED self.save()
[docs] def is_successful(self): """Return whether all tasks ended successfully.""" return all( task.status == Task.Status.COMPLETED and task.result == Task.Result.OK for task in self.task_set.all() )
[docs] def is_unsuccessful(self): """Return whether any tasks ended unsuccessfully.""" return any( task.status == Task.Status.COMPLETED and task.result == Task.Result.FAILED for task in self.task_set.all() )
[docs] class CollectionQuerySet(models.QuerySet):
[docs] def visible(self): """Return a query set of public collections, excluding those without an active job for no reason.""" return self.filter(public=True).exclude( Q(active_job__isnull=True) & (Q(no_data_rationale__isnull=True) | Q(no_data_rationale="")) )
[docs] class Collection(models.Model):
[docs] class Region(models.TextChoices): #: Africa and Middle East MEA = "MEA", _("Africa and Middle East") #: Asia AS = "AS", _("Asia") #: Eastern Europe & Central Asia EECA = "EECA", _("Eastern Europe & Central Asia") #: Europe EU = "EU", _("Europe") #: Latin America & Caribbean LAC = "LAC", _("Latin America & Caribbean") #: North America NA = "NA", _("North America") #: Oceania OC = "OC", _("Oceania")
[docs] class RetrievalFrequency(models.TextChoices): #: Weekly WEEKLY = "WEEKLY", _("Weekly") #: Every 2 weeks BIWEEKLY = "BIWEEKLY", _("Every 2 weeks") #: Monthly MONTHLY = "MONTHLY", _("Monthly") #: Every 6 months HALF_YEARLY = "HALF_YEARLY", _("Every 6 months") #: Annually ANNUALLY = "ANNUALLY", _("Annually") #: This dataset is no longer updated by the publisher NEVER = "NEVER", _("This dataset is no longer updated by the publisher")
[docs] class UpdateFrequency(models.TextChoices): #: Unknown UNKNOWN = "UNKNOWN", _("Unknown") #: Real time REAL_TIME = "REAL_TIME", _("Real time") #: Hourly HOURLY = "HOURLY", _("Hourly") #: Daily DAILY = "DAILY", _("Daily") #: Weekly WEEKLY = "WEEKLY", _("Weekly") #: Monthly MONTHLY = "MONTHLY", _("Monthly") #: Every 3 months QUARTERLY = "QUARTERLY", _("Every 3 months") #: Every 6 months HALF_YEARLY = "HALF_YEARLY", _("Every 6 months") #: Annually ANNUALLY = "ANNUALLY", _("Annually")
# Identification title = models.TextField( help_text='The name of the publication, following the <a href="https://docs.google.com/document/d/' '14ZXlAB6GWeK4xwDUt9HGi0fTew4BahjZQ2owdLLVp6I/edit#heading=h.t81hzvffylry">naming conventions</a>, ' "and omitting the country name." ) # Description description = models.TextField( blank=True, help_text="The first paragraph of the description of the publication, as Markdown text, following the " '<a href="https://docs.google.com/document/d/1Pr87zDrs9YY7BEvr_e6QjOy0gexs06dU9ES2_-V7Lzw/edit#heading=' 'h.fksp8fxgoi7v">template and guidance</a>.', ) description_long = models.TextField( blank=True, help_text="The remaining paragraphs of the description of the publication, as Markdown text, " 'which will appear under "Show more".', ) # Spatial coverage country = models.TextField( blank=True, help_text="The official name of the country from which the data originates." ) country_flag = models.TextField(blank=True) region = models.TextField( choices=Region, blank=True, help_text="The name of the region to which the country belongs." ) # Field coverage additional_data = models.TextField( blank=True, verbose_name="data availability", help_text="Any notable highlights about the available data, such as extensions used or additional fields," " as Markdown text.", ) # Language language = models.TextField(blank=True, help_text='The languages used within data fields: for example, "Spanish".') # Accrual periodicity update_frequency = models.TextField( choices=UpdateFrequency, blank=True, default=UpdateFrequency.UNKNOWN, help_text="The frequency at which the source updates the publication.", ) # Data quality summary = models.TextField( blank=True, verbose_name="quality summary", help_text="A short summary of quality issues, as Markdown text.", ) last_reviewed = models.DateField( blank=True, null=True, verbose_name="last reviewed", help_text="The date on which the quality summary was last confirmed to be correct. " "Only the year and month are published.", ) # License license_custom = models.ForeignKey( "License", on_delete=models.CASCADE, blank=True, null=True, db_index=True, verbose_name="data license", help_text="If not set, the license URL within the OCDS package is displayed.", ) # Documentation publication_policy = models.TextField(blank=True, verbose_name="publication policy") # Provenance source_url = models.TextField(blank=True, verbose_name="source URL", help_text="The URL of the publication.") # Job logic source_id = models.TextField( verbose_name="source ID", help_text="The name of the spider in Kingfisher Collect. If a new spider is not listed, " "Kingfisher Collect needs to be re-deployed to the registry's server.", ) retrieval_frequency = models.TextField( choices=RetrievalFrequency, blank=True, help_text="The frequency at which the registry updates the publication, based on the frequency at which " "the publication is updated.", ) active_job = models.ForeignKey( "Job", on_delete=models.RESTRICT, blank=True, null=True, db_index=True, verbose_name="active job", related_name="active_collection", help_text="A job is a set of tasks to collect and process data from a publication. " "A job can be selected once it is completed. If a new job completes, it becomes the active job.", ) last_retrieved = models.DateField( blank=True, null=True, help_text="The date on which the most recent 'collect' job task completed." ) frozen = models.BooleanField( default=False, help_text="If the spider is broken, check this box to prevent the scheduling of new jobs." ) # Visibility logic no_data_rationale = models.TextField( blank=True, verbose_name="no data rationale", help_text="The short reason why the publication has no active job. If set, freeze the publication.", ) public = models.BooleanField( default=False, help_text="If the active job's tasks completed without errors or 'No data rationale' is set, and all fields " "below in all languages are filled in, check this box to make the publication visible to anonymous users. " "Otherwise, it is visible to administrators only.", ) # Timestamps created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) # Collection metadata, to override the active job (or as backup when there's no active job). date_from = models.DateField( blank=True, null=True, verbose_name="minimum release date", help_text="If not set, the value calculated from the OCDS data is displayed.", ) date_to = models.DateField( blank=True, null=True, verbose_name="maximum release date", help_text="If not set, the value calculated from the OCDS data is displayed.", ) ocid_prefix = models.TextField( blank=True, verbose_name="OCID prefix", help_text="If not set, the value calculated from the OCDS data is displayed.", ) objects = CollectionQuerySet.as_manager() class Meta: verbose_name = "publication" def __str__(self): return f"{self.title} ({self.pk})" def __repr__(self): return f"{self.country}: {self}"
[docs] def is_out_of_date(self): """ Return whether the publication is out-of-date. A publication is out-of-date if it isn't frozen, has a retrieval frequency other than "never", has no incomplete jobs, and either has never been scheduled or was last scheduled longer ago than the retrieval frequency. If the most recent job is unsuccessful, the minimum retrieval frequency is used, instead. """ if self.frozen: return False # It has no retrieval frequency or the retrieval frequency is "never". if not self.retrieval_frequency or self.retrieval_frequency == self.RetrievalFrequency.NEVER: return False most_recent_job = self.job_set.order_by("-start").first() # It has never been scheduled. if not most_recent_job: return True # It has been scheduled, but not yet initiated. if not most_recent_job.start: return False # If the most recent job is unsuccessful, use the minimum retrieval frequency. frequency = self.RetrievalFrequency.WEEKLY if most_recent_job.is_unsuccessful() else self.retrieval_frequency match frequency: case self.RetrievalFrequency.WEEKLY: days = 7 case self.RetrievalFrequency.BIWEEKLY: days = 14 case self.RetrievalFrequency.MONTHLY: days = 30 case self.RetrievalFrequency.HALF_YEARLY: days = 180 case self.RetrievalFrequency.ANNUALLY: days = 365 case _: raise NotImplementedError out_of_date = date.today() >= (most_recent_job.start + timedelta(days=days)).date() # Avoid a query if not out-of-date. if not out_of_date: return False # It has an incomplete job (likely the most recent job). Don't run concurrent jobs for a publication. if incomplete_jobs := list(self.job_set.incomplete()): logger.warning("%s has incomplete job(s): %s", self, incomplete_jobs) return False return out_of_date
[docs] class License(models.Model): name = models.TextField(blank=True, help_text="The official name of the license.") description = models.TextField( blank=True, help_text="A brief description of the permissions, conditions and limitations, as Markdown text." ) url = models.TextField(blank=True, verbose_name="URL", help_text="The canonical URL of the license.") # Timestamps created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) class Meta: verbose_name = "data license" def __str__(self): return f"{self.name} ({self.pk})"
[docs] class TaskQuerySet(models.QuerySet): def failed(self): # https://docs.djangoproject.com/en/5.2/ref/models/expressions/#some-examples return Task.objects.filter( job=models.OuterRef("pk"), status=Task.Status.COMPLETED, result=Task.Result.FAILED, )
[docs] class Task(models.Model):
[docs] class Status(models.TextChoices): #: The task has started, but work has not yet started in the application. #: (This status is never saved to the database.) WAITING = "WAITING", "WAITING" #: The task is planned. PLANNED = "PLANNED", "PLANNED" #: The task has started. RUNNING = "RUNNING", "RUNNING" #: The task has ended (either successfully or unsuccessfully). COMPLETED = "COMPLETED", "COMPLETED"
[docs] class Result(models.TextChoices): #: The task ended successfully. OK = "OK", "OK" #: The task ended unsuccessfully. FAILED = "FAILED", "FAILED"
[docs] class Type(models.TextChoices): #: Kingfisher Collect COLLECT = "collect" #: Kingfisher Process PROCESS = "process" #: Exporter EXPORTER = "exporter" #: Coverage COVERAGE = "coverage" #: Flattener FLATTENER = "flattener"
job = models.ForeignKey("Job", on_delete=models.CASCADE, db_index=True) # Task metadata start = models.DateTimeField(blank=True, null=True) end = models.DateTimeField(blank=True, null=True) status = models.TextField(choices=Status, blank=True, default=Status.PLANNED) # Task result result = models.TextField(choices=Result, blank=True) note = models.TextField(blank=True, help_text="Metadata about any failure.") # Job logic (see `create_tasks`) type = models.TextField(choices=Type, blank=True) order = models.IntegerField(blank=True, null=True) # Timestamps created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) objects = TaskQuerySet.as_manager() class Meta: verbose_name = "job task" def __str__(self): return f"#{self.pk}({self.type})"
[docs] def initiate(self): """Mark the task as started.""" self.start = Now() self.status = Task.Status.RUNNING self.save()
[docs] def progress(self, *, result="", note=""): """Update the task's progress. If called without arguments, reset the task's progress.""" self.result = result self.note = note self.save()
[docs] def complete(self, *, result, note=""): """Mark the task as ended.""" self.end = Now() self.status = Task.Status.COMPLETED self.result = result self.note = note self.save()
[docs] class TaskNote(models.Model):
[docs] class Level(models.TextChoices): WARNING = "WARNING", "WARNING" ERROR = "ERROR", "ERROR" CRITICAL = "CRITICAL", "CRITICAL" UNKNOWN = "UNKNOWN", "UNKNOWN"
task = models.ForeignKey("Task", on_delete=models.CASCADE) level = models.TextField(choices=Level) note = models.TextField() data = models.JSONField(blank=True, default=dict) # Timestamps created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) class Meta: verbose_name = "task note" indexes = [ models.Index(fields=["task", "level"]), ] def __str__(self): return f"{self.level}: {self.note[:50]}"