Skip to content

Fetcher authoring

This page describes how to create a new fetcher, or maintain an existing one.

The DBnomics core team is open to external contributions, please get in touch!

Create a new fetcher

Before starting, please check that the terms of the provider allow data redistribution. In case of doubt, ask on the forum.

DBnomics provides a template for fetchers: dbnomics-fetcher-template.

Install tools

This template is defined using copier. Install it with pipx as described by its documentation:

pipx install copier

The template also uses rye to manage the Python project, install it beforehand.

Initialize the template

For example let's say we are writing the fetcher for the provider ABC.

copier copy https://git.nomics.world/dbnomics/dbnomics-fetcher-template.git my-fetcher
# [...] Answer the questions

cd my-fetcher

rye sync --pre

git init
git add -A
git commit -m "Add initial files"

Architecture

The required entry points are:

  • download.py: downloads all the datasets or a subset from a provider infrastructure
  • convert.py: converts the downloaded datasets to DBnomics data model

Here is a recommended architecture for organizing the source code of a fetcher:

Architecture of a fetcher

The Downloader class is responsible for fetching data from the provider infrastructure. It relies on the DownloaderHelper class from dbnomics-fetcher-toolbox to handle CLI arguments and options.

The SourceDataLoader class is responsible for parsing source-data files and exposing them as model objects from the provider domain.

The Converter class is responsible for loading source-data via the SourceDataLoader class, converting the model objects from the provider domain to the DBnomics data model, and saving them via the Storage class.

Download

The download.py script first creates a DownloaderHelper class instance (from dbnomics_fetcher_toolbox) that parses CLI arguments and options, then creates a Downloader class instance and starts the download process:

from dbnomics_fetcher_toolbox import create_downloader_helper

import abc_fetcher
from abc_fetcher.downloader import Downloader


def main() -> None:
    with create_downloader_helper(package_name=abc_fetcher.__name__) as downloader_helper:
        Downloader(downloader_helper=downloader_helper).start()


if __name__ == "__main__":
    main()

The Downloader class defines methods that download files from the provider infrastructure. Those methods make use of functions exposed by the dbnomics_fetcher_toolbox package, such as download_http_url or validate_mimetype.

from typing import TYPE_CHECKING

from dbnomics_fetcher_toolbox.http_utils import download_http_url
from dbnomics_fetcher_toolbox.mimetype_utils import validate_mimetype

from abc_fetcher.source_data_loader import SourceDataLoader

if TYPE_CHECKING:
    from dbnomics_fetcher_toolbox import DownloaderHelper


__all__ = ["Downloader"]


class Downloader:
    """Download source-data files from the provider infrastructure."""

    def __init__(self, *, downloader_helper: "DownloaderHelper") -> None:
        self._downloader_helper = downloader_helper
        self._source_data_loader = SourceDataLoader(base_dir=downloader_helper.target_dir)

    def download_dummy_file(self) -> None:
        file_base_name = "dummy"
        csv_filename = f"{file_base_name}.csv"
        with self._downloader_helper.start_file_section(csv_filename, id=file_base_name) as section:
            if not section.is_skipped:
                url = self.get_csv_file_api_url(filename=csv_filename)
                download_http_url(
                    url,
                    output_file=section.file,
                    response_dump_dir=self._downloader_helper.debug_dir,
                )
                validate_mimetype(section.file)

    def get_csv_file_api_url(self, *, filename: str) -> str:
        return f"https://data.example-provider.org/static/{filename}"

    def start(self) -> None:
        self.download_dummy_file()

The dbnomics_fetcher_toolbox functions log what happens and the DownloaderHelper writes a JSON download report at the end of the process.

The start_file_section returns a context manager that captures any error happening in the underlying code block, to avoid letting the whole script fail, and produce an error in the logs instead. It also allows to skip the file if the CLI option --skip is used, or if the file already exists and the resume mode is enabled (which is the case by default).

Note: it is advised to build the URLs in dedicated methods.

To run the download script:

python download.py source-data

Source data loader

The SourceDataLoader class highly depends on the format of the actual source data files. For example, let's say that the ABC provider exposes its datasets in a catalog.json file such as:

[
  {"dataset_id": "BOP", "dataset_name": "Balance of payments"},
  {"dataset_id": "GDP", "dataset_name": "Gross domestic product"}
]

Those catalog items can be modeled by a CatalogItem dataclass in the abc_provider.source_data_model module:

from dataclasses import dataclass


@dataclass(frozen=True, kw_only=True)
class CatalogItem:
    dataset_id: str
    dataset_name: str

The SourceDataLoader class defines methods that parse source-data files and expose their content as model objects from the provider domain.

To validate Python dicts and load them to dataclass instances, it is advised to use a data loading library such as typedload or pydantic.

import json
from pathlib import Path
from typing import Final, Iterator

from dbnomics_data_model.json_utils import load_json_file

from .source_data_model import CatalogItem


__all__ = ["SourceDataLoader"]


CATALOG_JSON_FILENAME: Final = "catalog.json"


class SourceDataLoader:
    """Load source-data files and expose them as Python values.

    This class is useful to the Downloader (e.g. to iterate available datasets) and to the Converter.
    """

    def __init__(self, *, base_dir: Path) -> None:
        self._base_dir = base_dir

    def get_catalog_file(self) -> Path:
        return self._base_dir / CATALOG_JSON_FILENAME

    def iter_catalog_items(self) -> Iterator[CatalogItem]:
        catalog_file = self.get_catalog_file()
        catalog_items = load_json_file(catalog_file, type_=list[CatalogItem])
        yield from catalog_items

Note: it is advised to build the file paths in dedicated methods.

Convert

The convert.py script is very similar to download.py. It gives the provider code to the ConverterHelper.

from dbnomics_fetcher_toolbox import create_converter_helper

import abc_fetcher
from abc_fetcher.constants import PROVIDER_CODE
from abc_fetcher.converter import Converter


def main() -> None:
    with create_converter_helper(package_name=abc_fetcher.__name__, provider_code=PROVIDER_CODE) as converter_helper:
        Converter(converter_helper=converter_helper).start()


if __name__ == "__main__":
    main()

The abc_fetcher.constants module defines the provider code once and for all:

from typing import Final

from dbnomics_data_model.model import ProviderCode

PROVIDER_CODE: Final = ProviderCode.parse("ABC")

The Converter class reuses the SourceDataLoader methods to consume the provider model objects in a high-level manner, produces objects from dbnomics-data-model, and saves them to the Storage provided by the ConverterHelper:

from collections.abc import Iterator
from typing import TYPE_CHECKING

from dbnomics_data_model.model import (
    Category,
    CategoryTree,
    DatasetDimensions,
    DatasetId,
    DatasetMetadata,
    DatasetReference,
    Dimension,
    DimensionCode,
    DimensionRole,
    DimensionValue,
    Observation,
    ProviderMetadata,
    Series,
)

from abc_fetcher.source_data_loader import SourceDataLoader

if TYPE_CHECKING:
    from dbnomics_fetcher_toolbox.helpers import ConverterHelper


__all__ = ["Converter"]


class Converter:
    """Convert provider data to DBnomics data model."""

    def __init__(self, converter_helper: "ConverterHelper") -> None:
        self._converter_helper = converter_helper
        self._source_data_loader = SourceDataLoader(base_dir=self._converter_helper.source_dir)

    def convert_category_tree(self) -> None:
        with self._converter_helper.start_section("category_tree") as (section, session):
            if not section.is_skipped:
                category_tree = CategoryTree(
                    children=[
                        Category.create(
                            children=[
                                DatasetReference.create(code="D1", name="Dataset 1"),
                                DatasetReference.create(code="D2", name="Dataset 2"),
                            ],
                            code="foo",
                            name="Foo datasets",
                        )
                    ]
                )
                session.storage.save_category_tree(category_tree, provider_code=self._converter_helper.provider_code)
                session.commit()

    def convert_dataset(self, source_dataset_id: str) -> None:
        dataset_id = DatasetId.create(self._converter_helper.provider_code, source_dataset_id)
        with self._converter_helper.start_dataset_section(source_dataset_id) as (section, storage):
            if not section.is_skipped:
                dataset_dimensions = self._build_dataset_dimensions()
                dataset_metadata = self._build_dataset_metadata(source_dataset_id, dimensions=dataset_dimensions)
                session.storage.save_dataset_metadata(
                storage.save_dataset_metadata(
                    dataset_metadata, provider_code=self._converter_helper.provider_code
                )
                storage.save_series(
                    self._iter_series(dataset_dimensions=dataset_dimensions),
                    dataset_id=dataset_id,
                )

    def convert_provider_metadata(self) -> None:
        with self._converter_helper.start_section("provider_metadata") as (section, session):
            if not section.is_skipped:
                provider_metadata = ProviderMetadata.create(
                    code=self._converter_helper.provider_code,
                    name="{{ provider_name }}",
                    # Cf https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2
                    # region="",
                    # terms_of_use="",
                    website="{{ provider_url }}",
                )
                session.storage.save_provider_metadata(provider_metadata)
                session.commit()

    def start(self) -> None:
        self.convert_provider_metadata()
        self.convert_category_tree()
        self.convert_dataset("D1")

    def _build_dataset_dimensions(self) -> DatasetDimensions:
        return DatasetDimensions(
            dimensions=[
                Dimension.create(
                    "FREQ",
                    label="Frequency",
                    values=[
                        DimensionValue.create("A", label="Annual"),
                        DimensionValue.create("M", label="Monthly"),
                    ],
                ),
                Dimension.create(
                    "REF_AREA",
                    label="Reference area",
                    values=[
                        DimensionValue.create("DE", label="Germany"),
                        DimensionValue.create("FR", label="France"),
                    ],
                ),
            ],
            roles={DimensionRole.FREQUENCY: DimensionCode.parse("FREQ")},
        )

    def _build_dataset_metadata(self, source_dataset_id: str, *, dimensions: DatasetDimensions) -> DatasetMetadata:
        return DatasetMetadata.create(
            source_dataset_id,
            dimensions=dimensions,
            name="Dataset dummy name",
        )

    def _iter_series(self, *, dataset_dimensions: DatasetDimensions) -> Iterator[Series]:
        observations = list(self._iter_series_observations())
        yield Series.create(
            dataset_dimensions=dataset_dimensions,
            dimensions={"FREQ": "A", "REF_AREA": "DE"},
            name="Series 1",
            observations=observations,
        )

    def _iter_series_observations(self) -> Iterator[Observation]:
        yield Observation.create(period="2000", value=1)
        yield Observation.create(period="2001", value=4)

The start_section method, like the DownloaderHelper.start_file_section method, capture and log errors, and sections are skipped according to the --skip CLI option. The sections created by start_dataset_section are a particular case that check that the dataset has indeed been written at the end of the section, and are skipped if the dataset already exists.

Run the convert script:

python convert.py source-data json-data

Code style

Do not abbreviate data model concepts

DBnomics data model defines concepts such as provider, dataset, time series, dimension or observation. Do not abbreviate those terms.

Examples:

  • use dataset_info instead of ds_info
  • use dimensions instead of dim_dict
  • use current_observation instead of current_obs

Plural of time series

In English, a time series is invariable.

In order to distinguish a single series from a list of series:

  • name a single series series
  • name a list of series series_list