Fetcher authoring
This page describes how to create a new fetcher, or maintain an existing one.
The DBnomics core team is open to external contributions, please get in touch!
Create a new fetcher
Before starting, please check that the terms of the provider allow data redistribution. In case of doubt, ask on the forum.
DBnomics provides a template for fetchers: dbnomics-fetcher-template.
Install tools
This template is defined using copier. Install it with pipx
as described by its documentation:
The template also uses rye to manage the Python project, install it beforehand.
Initialize the template
For example let's say we are writing the fetcher for the provider ABC.
copier copy https://git.nomics.world/dbnomics/dbnomics-fetcher-template.git my-fetcher
# [...] Answer the questions
cd my-fetcher
rye sync --pre
git init
git add -A
git commit -m "Add initial files"
Architecture
The required entry points are:
download.py
: downloads all the datasets or a subset from a provider infrastructureconvert.py
: converts the downloaded datasets to DBnomics data model
Here is a recommended architecture for organizing the source code of a fetcher:
The Downloader
class is responsible for fetching data from the provider infrastructure.
It relies on the DownloaderHelper
class from dbnomics-fetcher-toolbox to handle CLI arguments and options.
The SourceDataLoader
class is responsible for parsing source-data files and exposing them as model objects from the provider domain.
The Converter
class is responsible for loading source-data via the SourceDataLoader
class, converting the model objects from the provider domain to the DBnomics data model, and saving them via the Storage
class.
Download
The download.py
script first creates a DownloaderHelper
class instance (from dbnomics_fetcher_toolbox
) that parses CLI arguments and options, then creates a Downloader
class instance and starts the download process:
from dbnomics_fetcher_toolbox import create_downloader_helper
import abc_fetcher
from abc_fetcher.downloader import Downloader
def main() -> None:
with create_downloader_helper(package_name=abc_fetcher.__name__) as downloader_helper:
Downloader(downloader_helper=downloader_helper).start()
if __name__ == "__main__":
main()
The Downloader
class defines methods that download files from the provider infrastructure.
Those methods make use of functions exposed by the dbnomics_fetcher_toolbox
package, such as download_http_url
or validate_mimetype
.
from typing import TYPE_CHECKING
from dbnomics_fetcher_toolbox.http_utils import download_http_url
from dbnomics_fetcher_toolbox.mimetype_utils import validate_mimetype
from abc_fetcher.source_data_loader import SourceDataLoader
if TYPE_CHECKING:
from dbnomics_fetcher_toolbox import DownloaderHelper
__all__ = ["Downloader"]
class Downloader:
"""Download source-data files from the provider infrastructure."""
def __init__(self, *, downloader_helper: "DownloaderHelper") -> None:
self._downloader_helper = downloader_helper
self._source_data_loader = SourceDataLoader(base_dir=downloader_helper.target_dir)
def download_dummy_file(self) -> None:
file_base_name = "dummy"
csv_filename = f"{file_base_name}.csv"
with self._downloader_helper.start_file_section(csv_filename, id=file_base_name) as section:
if not section.is_skipped:
url = self.get_csv_file_api_url(filename=csv_filename)
download_http_url(
url,
output_file=section.file,
response_dump_dir=self._downloader_helper.debug_dir,
)
validate_mimetype(section.file)
def get_csv_file_api_url(self, *, filename: str) -> str:
return f"https://data.example-provider.org/static/{filename}"
def start(self) -> None:
self.download_dummy_file()
The dbnomics_fetcher_toolbox
functions log what happens and the DownloaderHelper
writes a JSON download report at the end of the process.
The start_file_section
returns a context manager that captures any error happening in the underlying code block, to avoid letting the whole script fail, and produce an error in the logs instead.
It also allows to skip the file if the CLI option --skip
is used, or if the file already exists and the resume mode is enabled (which is the case by default).
Note: it is advised to build the URLs in dedicated methods.
To run the download script:
Source data loader
The SourceDataLoader
class highly depends on the format of the actual source data files.
For example, let's say that the ABC provider exposes its datasets in a catalog.json
file such as:
[
{"dataset_id": "BOP", "dataset_name": "Balance of payments"},
{"dataset_id": "GDP", "dataset_name": "Gross domestic product"}
]
Those catalog items can be modeled by a CatalogItem
dataclass in the abc_provider.source_data_model
module:
from dataclasses import dataclass
@dataclass(frozen=True, kw_only=True)
class CatalogItem:
dataset_id: str
dataset_name: str
The SourceDataLoader
class defines methods that parse source-data files and expose their content as model objects from the provider domain.
To validate Python dicts and load them to dataclass instances, it is advised to use a data loading library such as typedload or pydantic.
import json
from pathlib import Path
from typing import Final, Iterator
from dbnomics_data_model.json_utils import load_json_file
from .source_data_model import CatalogItem
__all__ = ["SourceDataLoader"]
CATALOG_JSON_FILENAME: Final = "catalog.json"
class SourceDataLoader:
"""Load source-data files and expose them as Python values.
This class is useful to the Downloader (e.g. to iterate available datasets) and to the Converter.
"""
def __init__(self, *, base_dir: Path) -> None:
self._base_dir = base_dir
def get_catalog_file(self) -> Path:
return self._base_dir / CATALOG_JSON_FILENAME
def iter_catalog_items(self) -> Iterator[CatalogItem]:
catalog_file = self.get_catalog_file()
catalog_items = load_json_file(catalog_file, type_=list[CatalogItem])
yield from catalog_items
Note: it is advised to build the file paths in dedicated methods.
Convert
The convert.py
script is very similar to download.py
.
It gives the provider code to the ConverterHelper
.
from dbnomics_fetcher_toolbox import create_converter_helper
import abc_fetcher
from abc_fetcher.constants import PROVIDER_CODE
from abc_fetcher.converter import Converter
def main() -> None:
with create_converter_helper(package_name=abc_fetcher.__name__, provider_code=PROVIDER_CODE) as converter_helper:
Converter(converter_helper=converter_helper).start()
if __name__ == "__main__":
main()
The abc_fetcher.constants
module defines the provider code once and for all:
from typing import Final
from dbnomics_data_model.model import ProviderCode
PROVIDER_CODE: Final = ProviderCode.parse("ABC")
The Converter
class reuses the SourceDataLoader
methods to consume the provider model objects in a high-level manner, produces objects from dbnomics-data-model, and saves them to the Storage
provided by the ConverterHelper
:
from collections.abc import Iterator
from typing import TYPE_CHECKING
from dbnomics_data_model.model import (
Category,
CategoryTree,
DatasetDimensions,
DatasetId,
DatasetMetadata,
DatasetReference,
Dimension,
DimensionCode,
DimensionRole,
DimensionValue,
Observation,
ProviderMetadata,
Series,
)
from abc_fetcher.source_data_loader import SourceDataLoader
if TYPE_CHECKING:
from dbnomics_fetcher_toolbox.helpers import ConverterHelper
__all__ = ["Converter"]
class Converter:
"""Convert provider data to DBnomics data model."""
def __init__(self, converter_helper: "ConverterHelper") -> None:
self._converter_helper = converter_helper
self._source_data_loader = SourceDataLoader(base_dir=self._converter_helper.source_dir)
def convert_category_tree(self) -> None:
with self._converter_helper.start_section("category_tree") as (section, session):
if not section.is_skipped:
category_tree = CategoryTree(
children=[
Category.create(
children=[
DatasetReference.create(code="D1", name="Dataset 1"),
DatasetReference.create(code="D2", name="Dataset 2"),
],
code="foo",
name="Foo datasets",
)
]
)
session.storage.save_category_tree(category_tree, provider_code=self._converter_helper.provider_code)
session.commit()
def convert_dataset(self, source_dataset_id: str) -> None:
dataset_id = DatasetId.create(self._converter_helper.provider_code, source_dataset_id)
with self._converter_helper.start_dataset_section(source_dataset_id) as (section, storage):
if not section.is_skipped:
dataset_dimensions = self._build_dataset_dimensions()
dataset_metadata = self._build_dataset_metadata(source_dataset_id, dimensions=dataset_dimensions)
session.storage.save_dataset_metadata(
storage.save_dataset_metadata(
dataset_metadata, provider_code=self._converter_helper.provider_code
)
storage.save_series(
self._iter_series(dataset_dimensions=dataset_dimensions),
dataset_id=dataset_id,
)
def convert_provider_metadata(self) -> None:
with self._converter_helper.start_section("provider_metadata") as (section, session):
if not section.is_skipped:
provider_metadata = ProviderMetadata.create(
code=self._converter_helper.provider_code,
name="{{ provider_name }}",
# Cf https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2
# region="",
# terms_of_use="",
website="{{ provider_url }}",
)
session.storage.save_provider_metadata(provider_metadata)
session.commit()
def start(self) -> None:
self.convert_provider_metadata()
self.convert_category_tree()
self.convert_dataset("D1")
def _build_dataset_dimensions(self) -> DatasetDimensions:
return DatasetDimensions(
dimensions=[
Dimension.create(
"FREQ",
label="Frequency",
values=[
DimensionValue.create("A", label="Annual"),
DimensionValue.create("M", label="Monthly"),
],
),
Dimension.create(
"REF_AREA",
label="Reference area",
values=[
DimensionValue.create("DE", label="Germany"),
DimensionValue.create("FR", label="France"),
],
),
],
roles={DimensionRole.FREQUENCY: DimensionCode.parse("FREQ")},
)
def _build_dataset_metadata(self, source_dataset_id: str, *, dimensions: DatasetDimensions) -> DatasetMetadata:
return DatasetMetadata.create(
source_dataset_id,
dimensions=dimensions,
name="Dataset dummy name",
)
def _iter_series(self, *, dataset_dimensions: DatasetDimensions) -> Iterator[Series]:
observations = list(self._iter_series_observations())
yield Series.create(
dataset_dimensions=dataset_dimensions,
dimensions={"FREQ": "A", "REF_AREA": "DE"},
name="Series 1",
observations=observations,
)
def _iter_series_observations(self) -> Iterator[Observation]:
yield Observation.create(period="2000", value=1)
yield Observation.create(period="2001", value=4)
The start_section
method, like the DownloaderHelper.start_file_section
method, capture and log errors, and sections are skipped according to the --skip
CLI option.
The sections created by start_dataset_section
are a particular case that check that the dataset has indeed been written at the end of the section, and are skipped if the dataset already exists.
Run the convert script:
Code style
Do not abbreviate data model concepts
DBnomics data model defines concepts such as provider, dataset, time series, dimension or observation. Do not abbreviate those terms.
Examples:
- use
dataset_info
instead ofds_info
- use
dimensions
instead ofdim_dict
- use
current_observation
instead ofcurrent_obs
Plural of time series
In English, a time series is invariable.
In order to distinguish a single series from a list of series:
- name a single series
series
- name a list of series
series_list