Skip to content

Creating a SCDF task ✅ - Full example

This tutorial explores in depth and from scratch, the creation of a Spring Cloud Data Flow (SCDF) task, using the example of harvesting a web API, the Hub'Eau API providing access to water data.

Development of the application

Initialize a working directory. To do this, go to the terminal of the 🔗 IDE and then create the directory for the new project:

mkdir -p $HOME/projects/hubeau_scrapper

and open this folder in the IDE (menu File, then Open folder or CTRL + K CTRL + O).

Code of Hubeau Scrapper

Install the required dependencies:

python3 -m pip install tsdbhelper scdfhelper \
    --extra-index-url https://gitlab+deploy-token-941113:kY__o3SKYzMjiQEmmhdn@gitlab.com/api/v4/projects/35140779/packages/pypi/simple

Create the file python_task.py, develop the code to retrieve the info from Hubeau, and format them to be able to ingest them in a database.

import string
import requests
import csv
import logging
import time
from tsdbhelper import DBManager

LOGGER = logging.getLogger(__name__)


class HubeauScrapper:
    PK = "code_station"

    STATION_FIELDS = [  # Name, Type
        ["code_site", "character varying"],
        ["code_station", "character varying"],
        ["libelle_site", "character varying"],
        ["code_commune_station", "character varying"],
        ["libelle_commune", "character varying"],
        ["code_departement", "character varying"],
        ["libelle_departement", "character varying"],
        ["libelle_region", "character varying"],
        ["code_cours_eau", "character varying"],
        ["libelle_cours_eau", "character varying"],
        ["wkb_geometry", "geometry(Point,4326)"],
    ]

    TS_FIELDS = [
        ["grandeur_hydro", "character varying NULL"],
        ["resultat_obs", "double precision NULL"],
        ["code_methode_obs", "integer NULL"],
    ]

    STATIONS_URL = "https://hubeau.eaufrance.fr/api/v1/hydrometrie/referentiel/stations?format=json&size=10000"
    TS_URL = "https://hubeau.eaufrance.fr/api/v1/hydrometrie/observations_tr.csv?fields=code_station%2Cdate_obs%2Cgrandeur_hydro%2Cresultat_obs%2Ccode_methode_obs&grandeur_hydro=H&sort=asc&size=20000&date_debut_obs={isodate}"

    def __init__(self, dsn: string, table: string = "hubeau"):
        LOGGER.info("Initializing tables...")
        self.dbmanager = DBManager(
            dsn, table, self.PK, self.STATION_FIELDS, self.TS_FIELDS, True
        )

    def populate_stations(self):
        LOGGER.info(f"Populate Hubeau stations in {self.dbmanager.table_name}...")

        res = requests.get(self.STATIONS_URL).json()
        for row in res["data"]:
            # Some Cleanings
            geom = row.pop("geometry")
            row = {k: v[0] if isinstance(v, list) else v for k, v in row.items()}

            # Populate
            self.dbmanager.insert_feature({"properties": row, "geometry": geom})

        LOGGER.info(f"Hubeau stations populated in {self.dbmanager.table_name}.")

    def populate_ts(self, last_ts):
        LOGGER.info(f"Import Hubeau values from {last_ts}...")

        reqstart = time.time()
        r = requests.get(self.TS_URL.format(isodate=last_ts.isoformat()))
        csv_response = r.content.decode("utf-8")
        reqend = time.time()
        LOGGER.info(f"Time series retrieved in {reqend -reqstart} s.")

        csvreader = csv.DictReader(csv_response.splitlines(), delimiter=";")
        vals = []
        for row in csvreader:
            if row["code_station"]:
                vals.append(
                    [
                        row["code_station"],
                        row["date_obs"],
                        row["grandeur_hydro"],
                        row["resultat_obs"],
                        row["code_methode_obs"],
                    ]
                )
        self.dbmanager.insert_tsvalues(vals)
        LOGGER.info(f"Time series inserted in db in {time.time() - reqend} s.")

Definition of the function main

Next, add the code to launch the previously defined class. For ease of use, we will put this code after the previous code, in the same file.

from datetime import datetime, timedelta


def main():
    hs = HubeauScrapper(
        "postgresql://postgres:postgres@timescaledb.timescaledb.svc.cluster.local/hydro",
        "test_table")
    if not hs.dbmanager.isinit:
        hs.dbmanager.create_tables()
        hs.populate_stations()
        hs.populate_ts(datetime.now() - timedelta(weeks=1))

    while hs.dbmanager.get_last_ts() + timedelta(minutes=6) < datetime.now():
        hs.populate_ts(hs.dbmanager.get_last_ts() - timedelta(minutes=6))


if __name__ == "__main__":
    main()

You can run main to test the function locally.

Preparation for SCDF

We will now add some code to be able to interact with SCDF. This is done through:

  • The get_cmd_arg function, which retrieves an attribute configured in SCDF.
  • The scdf_tasked annotation, which adds the necessary environment to be able to retrieve and follow the state of the tasks in SCDF.

Modify the main method as follows:

from datetime import datetime, timedelta
from scdfhelper.task_args import get_cmd_arg
from scdfhelper.task_helper import scdf_tasked


@scdf_tasked
def main():
    hs = HubeauScrapper(get_cmd_arg("scrapper.dburi"), get_cmd_arg("scrapper.table"))
    if not hs.dbmanager.isinit:
        hs.dbmanager.create_tables()
        hs.populate_stations()
        hs.populate_ts(datetime.now() - timedelta(weeks=1))

    while hs.dbmanager.get_last_ts() + timedelta(minutes=6) < datetime.now():
        hs.populate_ts(hs.dbmanager.get_last_ts() - timedelta(minutes=6))


if __name__ == "__main__":
    main()

Dockerfile

In the same directory, create a file named Dockerfile with the following content:

FROM 643vlk6z.gra7.container-registry.ovh.net/metis-demo/scdf-tsdbhelper:0.1.1
ADD . /opt/
ENTRYPOINT ["python3","/opt/python_task.py"]
CMD []

That's it for the coding!

Deployment of the application

If necessary, run the following command:

docker login 643vlk6z.gra7.container-registry.ovh.net

then launch the publication:

scdf_publish task hubeau-scrapper 0.1.5

Creation of the task

The hubeau-scrapper is now available in 🔗 SCDF applications. We are going to create a task that will allow to launch it: under "DataFlow", in the Tasks/Jobs tab, then Task, click on Create Task.

Take the hubeau-scrapper task, connect it with inputs and outputs. The other parameters are left as they are.

Click on Create task and give a unique name to the task. Be careful, use only alphabetical characters and the -. In our case, let's use the same name: hubeau-scrapper.

Launching the task

Click on the newly created hubeau-scrapper task, and then on Launch Task.

In the Applications Properties, add the following two entries. These are the connection URL to the database that will store the timeseries and the destination table.

Remember to use a unique table name!

scrapper.dburi = postgresql://postgres:postgres@timescaledb.timescaledb.svc.cluster.local/hydro
scrapper.table = hubeauXYZ

Then press Lauch Task.

The stations can be viewed via a standardized 🔗 OGC API. Click on View the collections and the collection with the name of the previously defined table is visible. It is then possible to display the data in a cartigraphic view. Schedule the execution of the task

Schedule the execution of the task

The task execution can then be 🔗 scheduled for execution at regular intervals.

Back to the page: orchestration of processing...


🗑️ Reset

  • Under SCDF, remove the hubeau-scrapper application and the hubeau-scrapper task.
  • Delete the project code:
rm -rf $HOME/projects/hubeau_scrapper