API reference

Process manager

data_registry.process_manager.get_task_manager(task)[source]

Instantiate and return a task manager for the task.

Parameters:

task (Task)

Return type:

TaskManager

data_registry.process_manager.delete_older_jobs(collection, job, *, dry_run=False)[source]

Delete old jobs for a collection.

  • Delete past unsuccessful jobs older than 180 days

  • Delete past successful jobs with less than 10% more OCIDs, keeping the second-most recent job

Parameters:
  • collection (Collection) – The collection on which to operate

  • job (Job) – A job to keep and to which to compare OCID counts

  • dry_run (bool) – Show what would be done, without making changes

Return type:

None

data_registry.process_manager.process(collection, *, dry_run=False)[source]

If the collection is out-of-date, create a job.

For each of the collection’s incomplete jobs:

  • If the job is planned, start the job

  • If the next task is planned, start the task with run()

  • If the next task is waiting or running, recheck its status with get_status():

    • If it is completed, complete the task and start the next task

    • If it failed temporarily, log the reason

    • If it failed permanently, fail the task and end the job

  • If all tasks succeeded:

    • End the job

    • Update the collection’s active job and last retrieved date

    • Delete past unsuccessful jobs older than 180 days

    • Delete past successful jobs with less than 10% more OCIDs, keeping the active and next-most recent job

In other words, this function advances each job by at most one task. As such, for all tasks of a job to succeed, this function needs to run at least as many times are there are tasks in the JOB_TASKS_PLAN setting.

Parameters:
  • collection (Collection) – The collection to process

  • dry_run (bool) – Show what would be done, without making changes

Return type:

None

Process manager utilities

data_registry.process_manager.util.export_to_task_status(export)[source]
Parameters:

export (Export)

Return type:

Status

data_registry.process_manager.util.skip_if_not_started(method)[source]

Decorate a wipe() method to return early if the task hadn’t started.

class data_registry.process_manager.util.TaskManager(task)[source]

Task managers should only update the Job context and metadata fields.

Parameters:

task (Task)

property job: Job

The job of which the task is a part.

property collection: Collection

The publication on which the task is performed.

property job_url: str

The URL to the job’s admin change page.

abstract property final_output: bool

Whether the task produces a final output, like a bulk download.

If False, then once a job is complete, the “manageprocess” management command calls wipe() (unless temporary data is to be preserved).

request(method, url, *, error_message, timeout=10, **kwargs)[source]

Send a request to an application.

If the application returns an error response or is temporarily unavailable, raise RecoverableError.

Raises:

RecoverableError

abstractmethod run()[source]

Start the task.

This method is called once.

Raises:

RecoverableError – if the task can’t be started, temporarily

Return type:

None

abstractmethod get_status()[source]

Return the status of the task, and an optional failure reason.

This method must be called only after run() is called.

This method can write metadata about the task to the job. Since this method can be called many times, write metadata only when the metadata is missing or when the task is complete.

This method is expected to be called within a transaction.

Raises:

RecoverableError

Return type:

tuple[Status, str | None]

abstractmethod wipe()[source]

Delete any side effects of (for example, data written by) the task.

This method can be called even when the task hasn’t started.

This method must be idempotent. It is retried if any task failed to be wiped.

Raises:

RecoverableError – if the task can’t be wiped, temporarily

Return type:

None

Exporter utilities

exporter.util.get_client_kwargs(rabbit_params=None)[source]
exporter.util.publish(*args, **kwargs)[source]
exporter.util.consume(*args, rabbit_params=None, **kwargs)[source]
exporter.util.decorator(decode, callback, state, channel, method, properties, body)[source]

Close the database connections opened by the callback, before returning.

If the callback raises an exception, shut down the client in the main thread, without acknowledgment. For some exceptions, assume that the same message was delivered twice, log an error, and nack the message.

class exporter.util.TaskStatus(*values)[source]
WAITING = 'WAITING'

Processing hasn’t started.

RUNNING = 'RUNNING'

Processing has started.

COMPLETED = 'COMPLETED'

Processing has ended.

class exporter.util.Export(*components, basename=None)[source]
Parameters:

basename (str | None)

classmethod get_files(*components, **kwargs)[source]
classmethod default_files()[source]
property path
property lockfile: Path
lock()[source]

Create the lock file.

Return type:

None

unlock()[source]

Delete the lock file.

Return type:

None

remove()[source]

Delete the export directory recursively.

property locked: bool

Return whether the output file is being written.

property status: TaskStatus

Return the status of the export.

property files: dict

Return all the available file formats and segments (by year or full).

iterdir()[source]

Yield path objects of the directory contents.

get_convertible_paths()[source]

Yield paths to .jsonl.gz files.

Models

class data_registry.models.JobQuerySet(model=None, query=None, using=None, hints=None)[source]
successful()[source]

Return a query set of successfully completed jobs.

unsuccessful()[source]

Return a query set of unsuccessfully completed jobs.

complete()[source]

Return a query set of complete jobs.

incomplete()[source]

Return a query set of incomplete jobs.

class data_registry.models.Job(id, collection, start, end, status, context, archived, keep_all_data, coverage, tenders_count, tenderers_count, tenders_items_count, parties_count, awards_count, awards_items_count, awards_suppliers_count, contracts_count, contracts_items_count, contracts_transactions_count, documents_count, plannings_count, milestones_count, amendments_count, date_from, date_to, ocid_prefix, license, publication_policy, created, modified)[source]
class Status(*values)[source]
PLANNED = 'PLANNED'

The job is planned.

RUNNING = 'RUNNING'

The job has started.

COMPLETED = 'COMPLETED'

The job has ended (either successfully or unsuccessfully).

initiate()[source]

Mark the job as started.

complete()[source]

Mark the job as ended.

is_successful()[source]

Return whether all tasks ended successfully.

is_unsuccessful()[source]

Return whether any tasks ended unsuccessfully.

class data_registry.models.CollectionQuerySet(model=None, query=None, using=None, hints=None)[source]
visible()[source]

Return a query set of public collections, excluding those without an active job for no reason.

class data_registry.models.Collection(id, title, description, description_long, country, country_flag, region, additional_data, language, update_frequency, summary, last_reviewed, license_custom, publication_policy, source_url, source_id, retrieval_frequency, active_job, last_retrieved, frozen, no_data_rationale, public, created, modified, date_from, date_to, ocid_prefix)[source]
class Region(*values)[source]
MEA = 'MEA'

Africa and Middle East

AS = 'AS'

Asia

EECA = 'EECA'

Eastern Europe & Central Asia

EU = 'EU'

Europe

LAC = 'LAC'

Latin America & Caribbean

NA = 'NA'

North America

OC = 'OC'

Oceania

class RetrievalFrequency(*values)[source]
WEEKLY = 'WEEKLY'

Weekly

BIWEEKLY = 'BIWEEKLY'

Every 2 weeks

MONTHLY = 'MONTHLY'

Monthly

HALF_YEARLY = 'HALF_YEARLY'

Every 6 months

ANNUALLY = 'ANNUALLY'

Annually

NEVER = 'NEVER'

This dataset is no longer updated by the publisher

class UpdateFrequency(*values)[source]
UNKNOWN = 'UNKNOWN'

Unknown

REAL_TIME = 'REAL_TIME'

Real time

HOURLY = 'HOURLY'

Hourly

DAILY = 'DAILY'

Daily

WEEKLY = 'WEEKLY'

Weekly

MONTHLY = 'MONTHLY'

Monthly

QUARTERLY = 'QUARTERLY'

Every 3 months

HALF_YEARLY = 'HALF_YEARLY'

Every 6 months

ANNUALLY = 'ANNUALLY'

Annually

is_out_of_date()[source]

Return whether the publication is out-of-date.

A publication is out-of-date if it isn’t frozen, has a retrieval frequency other than “never”, has no incomplete jobs, and either has never been scheduled or was last scheduled longer ago than the retrieval frequency. If the most recent job is unsuccessful, the minimum retrieval frequency is used, instead.

class data_registry.models.License(id, name, description, url, created, modified)[source]
class data_registry.models.TaskQuerySet(model=None, query=None, using=None, hints=None)[source]
class data_registry.models.Task(id, job, start, end, status, result, note, type, order, created, modified)[source]
class Status(*values)[source]
WAITING = 'WAITING'

The task has started, but work has not yet started in the application. (This status is never saved to the database.)

PLANNED = 'PLANNED'

The task is planned.

RUNNING = 'RUNNING'

The task has started.

COMPLETED = 'COMPLETED'

The task has ended (either successfully or unsuccessfully).

class Result(*values)[source]
OK = 'OK'

The task ended successfully.

FAILED = 'FAILED'

The task ended unsuccessfully.

class Type(*values)[source]
COLLECT = 'collect'

Kingfisher Collect

PROCESS = 'process'

Kingfisher Process

EXPORTER = 'exporter'

Exporter

COVERAGE = 'coverage'

Coverage

FLATTENER = 'flattener'

Flattener

initiate()[source]

Mark the task as started.

progress(*, result='', note='')[source]

Update the task’s progress. If called without arguments, reset the task’s progress.

complete(*, result, note='')[source]

Mark the task as ended.

class data_registry.models.TaskNote(id, task, level, note, data, created, modified)[source]
class Level(*values)[source]

Template tags

Set the link to open in a new tab.

Return type:

str

data_registry.templatetags.registry.catalog_str()[source]
data_registry.templatetags.registry.canonical_url(context, language_code=None)[source]
data_registry.templatetags.registry.translate_url(context, language_code)[source]
data_registry.templatetags.registry.redirect_to(context)[source]

Remove the letter query string parameter, to avoid zero results in the new language.

data_registry.templatetags.registry.remove_query_string_value(context, param, value)[source]
data_registry.templatetags.registry.feedback_query_string_parameters(context)[source]
data_registry.templatetags.registry.tojson(obj)[source]
Return type:

str

data_registry.templatetags.registry.markdownify(value)[source]
Parameters:

value (str) – Markdown text

Returns:

HTML text, with smartquotes, and setting all links to open in new tabs

Return type:

str

data_registry.templatetags.registry.nonempty(query_dict)[source]
data_registry.templatetags.registry.getitem(dictionary, key)[source]
data_registry.templatetags.registry.getlist(query_dict, key)[source]
data_registry.templatetags.registry.sortreversed(sequence)[source]
data_registry.templatetags.registry.humanfilesize(size)[source]

Exceptions

exception data_registry.exceptions.DataRegistryError[source]

Base class for exceptions from within this module.

exception data_registry.exceptions.ConfigurationError[source]

Raised if the project is misconfigured.

exception data_registry.exceptions.UnexpectedError[source]

Raised if the failure is unexpected.

exception data_registry.exceptions.RecoverableError[source]

Raised if the failure is expected to be temporary.

exception data_registry.exceptions.LockFileError[source]

Raised if a lock file exists.