What do we do at this Data Platform?
We collect, translate and enrich global food safety data. This data covers:
- food recalls and border rejections,
- price data on agricultural commodities and animal products,
- news items related to food safety,
- fraud cases,
- laboratory testing performed by Food Safety Authorities worldwide,
- inspections and warning letters on food companies’ plants and premises,
- country level indicators concerning food safety.
There is the challenge to collect and process all this data. This is covered in a different post.
As you can imagine though, a number of workflows are involved in the process. Tasks triggering one another, signifying the collection, processing, enrichment of each of the close to 200M (taking into account the hierarchical model employed) data points that are present in our infrastructure.
All this has already been covered and will be done in more detail in dedicated posts.
There is though a main challenge we have not covered; that of the overall orchestration.
How did we tackle this challenge and what tools did we enlist for help?
How can we synchronize all these flows?
Back in 2016, when we first started implementing and deploying our stack, due to the team’s previous experience in DevOps tasks, cronjobs were our initial choice.
What did we do?
Bash scripts, bash scripts, bash scripts and crontab -e commands, A LOT!
Long story short?
Every source we track has its dedicated directory in our backend/processing servers and within each of these directories lies a run.sh script. This is the script that manages all the action. Every single task in each workflow triggered is managed by such a script, calling other scripts created with the responsibility to handle each task.
And this run.sh is triggered by crontab.
For many of you accustomed with cronjobs, execution space of each cronjob may come as natural; we had to learn it the hard way.
The above depicted lines of code accompany every single script existent in our (dedicated) servers.
- The first thing we need to do is switch over to the directory of our to-be-crawled source. (lines: 2–3)
- Then we need to check if the previously triggered script has completed its work, this is done by checking the respective lockfile. (lines: 5–10)
- Once we are done with the main work we should update the lockfile for the next trigger of our script. (line: 14)
Depending on the source, translation endpoint triggering scripts may be present. Text mining or text classification workflows may take place with their respective scripts. All initiating calls to the respective projects and endpoints.
Say we are done with the collection and processing of each data record, we now have to let our internal CMS (Drupal) know of the new data. Time to sftp over there and upload the respective transformed and enriched records. That system will take care of the rest. But more on that end to follow in a dedicated post.
Is this enough?
You have most probaly already guessed it. No!
What about data we have already processed?
We should not stress our (already working at maximum capacity) servers any more than they have to; only new data needs to be taken into account. This is where our MongoDB kicks in; the place where all the raw data is stored, along with a collected on timestamp and a flag signifying whether or not a record has already been processed.
Is this enough?
Again the (obvious) answer is no!
What about firewall limitations, fail2ban or overall crawler traffic restrictions?
Although it would make our life way easier, firing up crawlers every minute towards each of the sources tracked regardless of the publishing rate may prove fatal in our endeavor. We need to configure our ETL workflows to be triggered only when chances are new data are present.
This although easily configurable through cron expressions requires some manual labor.
We need to dive into our data and identify the rate at which new records are published. Only then can we define an acceptable rate at which we can dispach our workflows.
What about hardware limitations?
This is the most tricky question of all. Implementing and deploying a workflow capable of executing regardless the stress levels of a server is really challenging. Our choice at this point was splitting our workflow into atomic operations this ensures that even though a task or a workflow may not complete, no data loss will be observed since each new workflow triggered will always check for previous workflows’ letfovers.
And this is it! Or not?
- what about CPU/RAM intensive tasks?
- error logging?
- tools out there (Apache Airflow for instance!) that can make our life easier?
We will cover the above on another post, since we are currently switching to Apache Airflow for our ETL pipelines, however let us stress at this point that all of the above are crucial. One cannot have a robust ETL pipeline if the above remarks are not addressed. And always researching and exploring new technologies and frameworks out there should be present in day to day tasks!
Just to give a quick overview in terms of numbers, in our current infrastructure:
- 113 ETL workflow cronjobs are present;
- on average workflows are triggered once every 10 minutes;
- 9 dedicated servers are involved in this part of the infrastructure;
- 11 workflow jobs have been switched to Apache Airflow DAGs;
- 1 Elastic Stack instance (involving Elasticsearch & Kibana & Metricbeat) is employed to keep track of the health of our infrastructure.