API reference

Process manager

data_registry.process_manager.get_task_manager(task)[source]

Instantiate and return a task manager for the task.

Parameters:

task (Task)

Return type:

TaskManager

data_registry.process_manager.process(collection)[source]

If the collection is out-of-date, create a job.

For each of the collection’s incomplete jobs:

  • If the job is planned, start the job

  • If the next task is planned, start the task with run()

  • If the next task is waiting or running, recheck its status with get_status():

    • If it is completed, complete the task and start the next task

    • If it failed temporarily, log the reason

    • If it failed permanently, fail the task and end the job

  • If all tasks succeeded, end the job and update the collection’s active job and last retrieved date.

In other words, this function advances each job by at most one task. As such, for all tasks of a job to succeed, this function needs to run at least as many times are there are tasks in the JOB_TASKS_PLAN setting.

Parameters:

collection (Collection)

Return type:

None

Process manager utilities

data_registry.process_manager.util.exporter_status_to_task_status(status)[source]
Parameters:

status (TaskStatus)

Return type:

Status

data_registry.process_manager.util.skip_if_not_started(method)[source]

A decorator to return early from a wipe() method if the task hadn’t started.

class data_registry.process_manager.util.TaskManager(task)[source]

Task managers should only update the Job context and metadata fields.

Parameters:

task (Task)

property job: Job

The job of which the task is a part.

property collection: Collection

The publication on which the task is performed.

abstract property final_output: bool

Whether the task produces a final output, like a bulk download.

request(method, url, *, error_message, **kwargs)[source]

Send a request to an application. If the application returns an error response or is temporarily unavailable, raise RecoverableException.

Raises:

RecoverableException

abstract run()[source]

Start the task.

This method is called once.

Raises:

RecoverableException

Return type:

None

abstract get_status()[source]

Return the status of the task.

This method must be called only after run() is called.

This method can write metadata about the task to the job. Since this method can be called many times, write metadata only when the metadata is missing or when the task is complete.

Raises:

RecoverableException

Return type:

Status

abstract wipe()[source]

Delete any side effects of (for example, data written by) the task.

This method can be called even when the task hasn’t started.

This method must be idempotent. It is retried if any task failed to be wiped.

Raises:

RecoverableException

Return type:

None

Exporter utilities

exporter.util.get_client_kwargs(rabbit_params=None)[source]
exporter.util.publish(*args, **kwargs)[source]
exporter.util.consume(*args, rabbit_params=None, **kwargs)[source]
exporter.util.decorator(decode, callback, state, channel, method, properties, body)[source]

Close the database connections opened by the callback, before returning.

If the callback raises an exception, shut down the client in the main thread, without acknowledgment. For some exceptions, assume that the same message was delivered twice, log an error, and nack the message.

class exporter.util.TaskStatus[source]
WAITING = 'WAITING'

Processing hasn’t started.

RUNNING = 'RUNNING'

Processing has started.

COMPLETED = 'COMPLETED'

Processing has ended.

class exporter.util.Export(*components, basename=None)[source]
Parameters:

basename (str | None)

classmethod get_files(*components, **kwargs)[source]
classmethod default_files()[source]
property path
property lockfile: str
lock()[source]

Create the lock file.

Return type:

None

unlock()[source]

Delete the lock file.

Return type:

None

remove()[source]

Delete the export directory recursively.

property running: bool

Return whether the output file is being written.

property completed: bool

Return whether the output file has been written.

property status: Literal['RUNNING', 'COMPLETED', 'WAITING']

Return the status of the export.

property files: dict

Return all the available file formats and segments (by year or full).

iterdir()[source]

Yield path objects of the directory contents.

get_convertible_paths()[source]

Yield paths to .jsonl.gz files.

Models

class data_registry.models.JobQuerySet(model=None, query=None, using=None, hints=None)[source]
active()[source]

Return a query set of active jobs.

complete()[source]

Return a query set of complete jobs.

incomplete()[source]

Return a query set of incomplete jobs.

class data_registry.models.Job(id, collection, start, end, status, context, active, archived, keep_all_data, tenders_count, tenderers_count, tenders_items_count, parties_count, awards_count, awards_items_count, awards_suppliers_count, contracts_count, contracts_items_count, contracts_transactions_count, documents_count, plannings_count, milestones_count, amendments_count, date_from, date_to, ocid_prefix, license, created, modified)[source]
class Status(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
WAITING = 'WAITING'

Not in use.

PLANNED = 'PLANNED'

The job is planned.

RUNNING = 'RUNNING'

The job has started.

COMPLETED = 'COMPLETED'

The job has ended (either successfully or unsuccessfully).

initiate()[source]

Mark the job as started.

complete()[source]

Mark the job as ended.

class data_registry.models.CollectionQuerySet(model=None, query=None, using=None, hints=None)[source]
visible()[source]

Return a query set of public collections with active jobs.

class data_registry.models.Collection(id, title, country, country_flag, region, language, description, description_long, last_retrieved, license_custom, source_id, source_url, retrieval_frequency, update_frequency, summary, last_reviewed, additional_data, public, frozen, created, modified)[source]
class Region(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
MEA = 'MEA'

Africa and Middle East

AS = 'AS'

Asia

EECA = 'EECA'

Eastern Europe & Central Asia

EU = 'EU'

Europe

LAC = 'LAC'

Latin America & Caribbean

NA = 'NA'

North America

OC = 'OC'

Oceania

class RetrievalFrequency(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
MONTHLY = 'MONTHLY'

Monthly

HALF_YEARLY = 'HALF_YEARLY'

Every 6 months

ANNUALLY = 'ANNUALLY'

Annually

NEVER = 'NEVER'

This dataset is no longer updated by the publisher

class UpdateFrequency(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
UNKNOWN = 'UNKNOWN'

Unknown

REAL_TIME = 'REAL_TIME'

Real time

HOURLY = 'HOURLY'

Hourly

DAILY = 'DAILY'

Daily

WEEKLY = 'WEEKLY'

Weekly

MONTHLY = 'MONTHLY'

Monthly

QUARTERLY = 'QUARTERLY'

Every 3 months

HALF_YEARLY = 'HALF_YEARLY'

Every 6 months

ANNUALLY = 'ANNUALLY'

Annually

is_out_of_date()[source]

A publication is out-of-date if it isn’t frozen and has a retrieval frequency other than “never”, and either has never been scheduled or was last scheduled longer ago than the retrieval frequency.

class data_registry.models.License(id, name, description, url, created, modified)[source]
class data_registry.models.Issue(id, description, collection, created, modified)[source]
class data_registry.models.Task(id, job, start, end, status, result, note, context, type, order, created, modified)[source]
class Status(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
WAITING = 'WAITING'

The task has started, but work has not yet started in the application. (This status is never saved to the database.)

PLANNED = 'PLANNED'

The task is planned.

RUNNING = 'RUNNING'

The task has started.

COMPLETED = 'COMPLETED'

The task has ended (either successfully or unsuccessfully).

class Result(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
OK = 'OK'

The task ended successfully.

FAILED = 'FAILED'

The task ended unsuccessfully.

class Type(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
COLLECT = 'collect'

Kingfisher Collect

PROCESS = 'process'

Kingfisher Process

PELICAN = 'pelican'

Pelican

EXPORTER = 'exporter'

Exporter

FLATTENER = 'flattener'

Flattener

initiate()[source]

Mark the task as started.

progress(*, result='', note='')[source]

Update the task’s progress. If called without arguments, reset the task’s progress.

complete(*, result, note='')[source]

Mark the task as ended.

Template tags

class data_registry.templatetags.registry.CaptureNode(nodelist, varname)[source]
render(context)[source]

Return the node rendered as a string.

data_registry.templatetags.registry.do_capture(parser, token)[source]
data_registry.templatetags.registry.catalog_str()[source]
data_registry.templatetags.registry.canonical_url(context)[source]
data_registry.templatetags.registry.translate_url(context, language)[source]
data_registry.templatetags.registry.redirect_to(context)[source]

Remove the letter query string parameter, to avoid zero results in the new language.

data_registry.templatetags.registry.replace_query_string_parameter(context, param, value)[source]
data_registry.templatetags.registry.remove_query_string_parameter(context, param)[source]
data_registry.templatetags.registry.feedback_query_string_parameters(context)[source]
data_registry.templatetags.registry.markdownify(value)[source]
data_registry.templatetags.registry.get_item(dictionary, key)[source]
data_registry.templatetags.registry.getlist(query_dict, key)[source]
data_registry.templatetags.registry.sortreversed(sequence)[source]
data_registry.templatetags.registry.humanfilesize(size)[source]

Exceptions

exception data_registry.exceptions.DataRegistryError[source]

Base class for exceptions from within this module

exception data_registry.exceptions.RecoverableException[source]

Raised if the failure is expected to be temporary.

exception data_registry.exceptions.LockFileError[source]

Raised if a lock file exists.