API reference¶
Process manager¶
- data_registry.process_manager.get_task_manager(task)[source]¶
Instantiate and return a task manager for the task.
- Parameters:
task (Task)
- Return type:
- data_registry.process_manager.delete_older_jobs(collection, job, *, dry_run=False)[source]¶
Delete old jobs for a collection.
Delete past unsuccessful jobs older than 180 days
Delete past successful jobs with less than 10% more OCIDs, keeping the second-most recent job
- Parameters:
collection (Collection) – The collection on which to operate
job (Job) – A job to keep and to which to compare OCID counts
dry_run (bool) – Show what would be done, without making changes
- Return type:
None
- data_registry.process_manager.process(collection, *, dry_run=False)[source]¶
If the collection is
out-of-date, create a job.For each of the collection’s incomplete jobs:
If the job is planned, start the job
If the next task is planned, start the task with
run()If the next task is waiting or running, recheck its status with
get_status():If it is completed, complete the task and start the next task
If it failed temporarily, log the reason
If it failed permanently, fail the task and end the job
If all tasks succeeded:
End the job
Update the collection’s active job and last retrieved date
Delete past unsuccessful jobs older than 180 days
Delete past successful jobs with less than 10% more OCIDs, keeping the active and next-most recent job
In other words, this function advances each job by at most one task. As such, for all tasks of a job to succeed, this function needs to run at least as many times are there are tasks in the
JOB_TASKS_PLANsetting.- Parameters:
collection (Collection) – The collection to process
dry_run (bool) – Show what would be done, without making changes
- Return type:
None
Process manager utilities¶
- data_registry.process_manager.util.skip_if_not_started(method)[source]¶
Decorate a
wipe()method to return early if the task hadn’t started.
- class data_registry.process_manager.util.TaskManager(task)[source]¶
Task managers should only update the
Jobcontext and metadata fields.- Parameters:
task (Task)
- property collection: Collection¶
The publication on which the task is performed.
- property job_url: str¶
The URL to the job’s admin change page.
- abstract property final_output: bool¶
Whether the task produces a final output, like a bulk download.
If False, then once a job is complete, the “manageprocess” management command calls
wipe()(unless temporary data is to be preserved).
- request(method, url, *, error_message, timeout=10, **kwargs)[source]¶
Send a request to an application.
If the application returns an error response or is temporarily unavailable, raise
RecoverableError.- Raises:
- abstractmethod run()[source]¶
Start the task.
This method is called once.
- Raises:
RecoverableError – if the task can’t be started, temporarily
- Return type:
None
- abstractmethod get_status()[source]¶
Return the status of the task, and an optional failure reason.
This method must be called only after
run()is called.This method can write metadata about the task to the job. Since this method can be called many times, write metadata only when the metadata is missing or when the task is complete.
This method is expected to be called within a transaction.
- Raises:
- Return type:
tuple[Status, str | None]
- abstractmethod wipe()[source]¶
Delete any side effects of (for example, data written by) the task.
This method can be called even when the task hasn’t started.
This method must be idempotent. It is retried if any task failed to be wiped.
- Raises:
RecoverableError – if the task can’t be wiped, temporarily
- Return type:
None
Exporter utilities¶
- exporter.util.decorator(decode, callback, state, channel, method, properties, body)[source]¶
Close the database connections opened by the callback, before returning.
If the callback raises an exception, shut down the client in the main thread, without acknowledgment. For some exceptions, assume that the same message was delivered twice, log an error, and nack the message.
- class exporter.util.TaskStatus(*values)[source]¶
- WAITING = 'WAITING'¶
Processing hasn’t started.
- RUNNING = 'RUNNING'¶
Processing has started.
- COMPLETED = 'COMPLETED'¶
Processing has ended.
- class exporter.util.Export(*components, basename=None)[source]¶
- Parameters:
basename (str | None)
- property path¶
- property lockfile: Path¶
- property locked: bool¶
Return whether the output file is being written.
- property status: TaskStatus¶
Return the status of the export.
- property files: dict¶
Return all the available file formats and segments (by year or full).
Models¶
- class data_registry.models.Job(id, collection, start, end, status, context, archived, keep_all_data, coverage, tenders_count, tenderers_count, tenders_items_count, parties_count, awards_count, awards_items_count, awards_suppliers_count, contracts_count, contracts_items_count, contracts_transactions_count, documents_count, plannings_count, milestones_count, amendments_count, date_from, date_to, ocid_prefix, license, publication_policy, created, modified)[source]¶
- class data_registry.models.CollectionQuerySet(model=None, query=None, using=None, hints=None)[source]¶
- class data_registry.models.Collection(id, title, description, description_long, country, country_flag, region, additional_data, language, update_frequency, summary, last_reviewed, license_custom, publication_policy, source_url, source_id, retrieval_frequency, active_job, last_retrieved, frozen, no_data_rationale, public, created, modified, date_from, date_to, ocid_prefix)[source]¶
- class Region(*values)[source]¶
- MEA = 'MEA'¶
Africa and Middle East
- AS = 'AS'¶
Asia
- EECA = 'EECA'¶
Eastern Europe & Central Asia
- EU = 'EU'¶
Europe
- LAC = 'LAC'¶
Latin America & Caribbean
- NA = 'NA'¶
North America
- OC = 'OC'¶
Oceania
- class RetrievalFrequency(*values)[source]¶
- WEEKLY = 'WEEKLY'¶
Weekly
- BIWEEKLY = 'BIWEEKLY'¶
Every 2 weeks
- MONTHLY = 'MONTHLY'¶
Monthly
- HALF_YEARLY = 'HALF_YEARLY'¶
Every 6 months
- ANNUALLY = 'ANNUALLY'¶
Annually
- NEVER = 'NEVER'¶
This dataset is no longer updated by the publisher
- class UpdateFrequency(*values)[source]¶
- UNKNOWN = 'UNKNOWN'¶
Unknown
- REAL_TIME = 'REAL_TIME'¶
Real time
- HOURLY = 'HOURLY'¶
Hourly
- DAILY = 'DAILY'¶
Daily
- WEEKLY = 'WEEKLY'¶
Weekly
- MONTHLY = 'MONTHLY'¶
Monthly
- QUARTERLY = 'QUARTERLY'¶
Every 3 months
- HALF_YEARLY = 'HALF_YEARLY'¶
Every 6 months
- ANNUALLY = 'ANNUALLY'¶
Annually
- is_out_of_date()[source]¶
Return whether the publication is out-of-date.
A publication is out-of-date if it isn’t frozen, has a retrieval frequency other than “never”, has no incomplete jobs, and either has never been scheduled or was last scheduled longer ago than the retrieval frequency. If the most recent job is unsuccessful, the minimum retrieval frequency is used, instead.
- class data_registry.models.Task(id, job, start, end, status, result, note, type, order, created, modified)[source]¶
- class Status(*values)[source]¶
- WAITING = 'WAITING'¶
The task has started, but work has not yet started in the application. (This status is never saved to the database.)
- PLANNED = 'PLANNED'¶
The task is planned.
- RUNNING = 'RUNNING'¶
The task has started.
- COMPLETED = 'COMPLETED'¶
The task has ended (either successfully or unsuccessfully).
- class Result(*values)[source]¶
- OK = 'OK'¶
The task ended successfully.
- FAILED = 'FAILED'¶
The task ended unsuccessfully.
- class Type(*values)[source]¶
- COLLECT = 'collect'¶
Kingfisher Collect
- PROCESS = 'process'¶
Kingfisher Process
- EXPORTER = 'exporter'¶
Exporter
- COVERAGE = 'coverage'¶
Coverage
- FLATTENER = 'flattener'¶
Flattener
Exceptions¶
- exception data_registry.exceptions.DataRegistryError[source]¶
Base class for exceptions from within this module.
- exception data_registry.exceptions.ConfigurationError[source]¶
Raised if the project is misconfigured.