Creating a SCDF task ✅ - Full example
This tutorial explores in depth and from scratch, the creation of a Spring Cloud Data Flow (SCDF) task, using the example of harvesting a web API, the Hub'Eau API providing access to water data.
Development of the application
Initialize a working directory. To do this, go to the terminal of the 🔗 IDE and then create the directory for the new project:
and open this folder in the IDE (menu File
, then Open folder
or CTRL + K CTRL + O
).
Code of Hubeau Scrapper
Install the required dependencies:
python3 -m pip install tsdbhelper scdfhelper \
--extra-index-url https://gitlab+deploy-token-941113:kY__o3SKYzMjiQEmmhdn@gitlab.com/api/v4/projects/35140779/packages/pypi/simple
Create the file python_task.py
, develop the code to retrieve the info from Hubeau,
and format them to be able to ingest them in a database.
import string
import requests
import csv
import logging
import time
from tsdbhelper import DBManager
LOGGER = logging.getLogger(__name__)
class HubeauScrapper:
PK = "code_station"
STATION_FIELDS = [ # Name, Type
["code_site", "character varying"],
["code_station", "character varying"],
["libelle_site", "character varying"],
["code_commune_station", "character varying"],
["libelle_commune", "character varying"],
["code_departement", "character varying"],
["libelle_departement", "character varying"],
["libelle_region", "character varying"],
["code_cours_eau", "character varying"],
["libelle_cours_eau", "character varying"],
["wkb_geometry", "geometry(Point,4326)"],
]
TS_FIELDS = [
["grandeur_hydro", "character varying NULL"],
["resultat_obs", "double precision NULL"],
["code_methode_obs", "integer NULL"],
]
STATIONS_URL = "https://hubeau.eaufrance.fr/api/v1/hydrometrie/referentiel/stations?format=json&size=10000"
TS_URL = "https://hubeau.eaufrance.fr/api/v1/hydrometrie/observations_tr.csv?fields=code_station%2Cdate_obs%2Cgrandeur_hydro%2Cresultat_obs%2Ccode_methode_obs&grandeur_hydro=H&sort=asc&size=20000&date_debut_obs={isodate}"
def __init__(self, dsn: string, table: string = "hubeau"):
LOGGER.info("Initializing tables...")
self.dbmanager = DBManager(
dsn, table, self.PK, self.STATION_FIELDS, self.TS_FIELDS, True
)
def populate_stations(self):
LOGGER.info(f"Populate Hubeau stations in {self.dbmanager.table_name}...")
res = requests.get(self.STATIONS_URL).json()
for row in res["data"]:
# Some Cleanings
geom = row.pop("geometry")
row = {k: v[0] if isinstance(v, list) else v for k, v in row.items()}
# Populate
self.dbmanager.insert_feature({"properties": row, "geometry": geom})
LOGGER.info(f"Hubeau stations populated in {self.dbmanager.table_name}.")
def populate_ts(self, last_ts):
LOGGER.info(f"Import Hubeau values from {last_ts}...")
reqstart = time.time()
r = requests.get(self.TS_URL.format(isodate=last_ts.isoformat()))
csv_response = r.content.decode("utf-8")
reqend = time.time()
LOGGER.info(f"Time series retrieved in {reqend -reqstart} s.")
csvreader = csv.DictReader(csv_response.splitlines(), delimiter=";")
vals = []
for row in csvreader:
if row["code_station"]:
vals.append(
[
row["code_station"],
row["date_obs"],
row["grandeur_hydro"],
row["resultat_obs"],
row["code_methode_obs"],
]
)
self.dbmanager.insert_tsvalues(vals)
LOGGER.info(f"Time series inserted in db in {time.time() - reqend} s.")
Definition of the function main
Next, add the code to launch the previously defined class. For ease of use, we will put this code after the previous code, in the same file.
from datetime import datetime, timedelta
def main():
hs = HubeauScrapper(
"postgresql://postgres:postgres@timescaledb.timescaledb.svc.cluster.local/hydro",
"test_table")
if not hs.dbmanager.isinit:
hs.dbmanager.create_tables()
hs.populate_stations()
hs.populate_ts(datetime.now() - timedelta(weeks=1))
while hs.dbmanager.get_last_ts() + timedelta(minutes=6) < datetime.now():
hs.populate_ts(hs.dbmanager.get_last_ts() - timedelta(minutes=6))
if __name__ == "__main__":
main()
You can run main to test the function locally.
Preparation for SCDF
We will now add some code to be able to interact with SCDF. This is done through:
- The
get_cmd_arg
function, which retrieves an attribute configured in SCDF. - The
scdf_tasked
annotation, which adds the necessary environment to be able to retrieve and follow the state of the tasks in SCDF.
Modify the main
method as follows:
from datetime import datetime, timedelta
from scdfhelper.task_args import get_cmd_arg
from scdfhelper.task_helper import scdf_tasked
@scdf_tasked
def main():
hs = HubeauScrapper(get_cmd_arg("scrapper.dburi"), get_cmd_arg("scrapper.table"))
if not hs.dbmanager.isinit:
hs.dbmanager.create_tables()
hs.populate_stations()
hs.populate_ts(datetime.now() - timedelta(weeks=1))
while hs.dbmanager.get_last_ts() + timedelta(minutes=6) < datetime.now():
hs.populate_ts(hs.dbmanager.get_last_ts() - timedelta(minutes=6))
if __name__ == "__main__":
main()
Dockerfile
In the same directory, create a file named Dockerfile
with the following content:
FROM 643vlk6z.gra7.container-registry.ovh.net/metis-demo/scdf-tsdbhelper:0.1.1
ADD . /opt/
ENTRYPOINT ["python3","/opt/python_task.py"]
CMD []
That's it for the coding!
Deployment of the application
If necessary, run the following command:
then launch the publication:
Creation of the task
The hubeau-scrapper
is now available in
🔗 SCDF applications.
We are going to create a task that will allow to launch it:
under "DataFlow", in the Tasks/Jobs
tab, then Task
, click on Create Task
.
Take the hubeau-scrapper
task, connect it with inputs and outputs.
The other parameters are left as they are.
Click on Create task
and give a unique name to the task. Be careful,
use only alphabetical characters and the -
.
In our case, let's use the same name: hubeau-scrapper
.
Launching the task
Click on the newly created hubeau-scrapper
task, and then on Launch Task
.
In the Applications Properties
, add the following two entries.
These are the connection URL to the database that will store the timeseries and the destination table.
Remember to use a unique table name!
scrapper.dburi = postgresql://postgres:postgres@timescaledb.timescaledb.svc.cluster.local/hydro
scrapper.table = hubeauXYZ
Then press Lauch Task
.
The stations can be viewed via a standardized
🔗 OGC API.
Click on View the collections
and the collection with the name of the previously defined table is visible.
It is then possible to display the data in a cartigraphic view.
Schedule the execution of the task
Schedule the execution of the task
The task execution can then be 🔗 scheduled for execution at regular intervals.
Back to the page: orchestration of processing...
🗑️ Reset
- Under SCDF, remove the
hubeau-scrapper
application and thehubeau-scrapper
task. - Delete the project code: