Enabling teams access to their data with a low-code ETL tool

Nahara Calcidoni Pacheco
HelloTech
Published in
5 min readApr 3, 2023

--

Context

At HelloFresh we have many different sources of data available on our data lake, from domain databases to microservices and APIs. Each of these sources come in different formats and schemas.

When a team of analysts needs to consume this data it becomes a challenge to not have multiple implementations of ETLs (Extract, Transform and Load) but overall all doing the same process of:

So how do we centralize this process? The Data Platform Team, which I am part of, came up with a low code ETL tool called Data Curation Engine (DCE).

Data Curation Engine (DCE)

What is it?

Data Curator Engine is a fully-managed and configuration-based service that allows Engineers and Analysts to easily build ETLs without worrying about infrastructure and coding. It also ensures regulatory compliance (GDPR) by having an anonymization process available out of the box.

What does it do?

The process of transforming the data and making it accessible through DCE is composed of possibly four different steps that can form two different patterns of ETL depending on the type of load required.
Incremental load will read and process only new data that has arrived on the S3 bucket path and a full load will read all data from the path on every execution.

Let’s describe each of the steps:

  • Sensor:

Checks if there’s data present at the provided input path on S3. For incremental loads of data that are partitioned by data, it checks for the period of the pipeline execution.

  • Data Formatting:

As stated, data comes in multiple formats and schemas, so in order to have the downstream steps to work seamlessly, data is formatted in a pre-defined schema:

| - event: struct (nullable = false)
| | - source_data
| - operation: string (nullable = true)
| - meta: struct (nullable = false)

event.source_data: contains the event’s consumable data
operation: represents the kind of event (c — create, u — update, d — delete, etc)
meta: metadata about the source of the event. E.g time the event landed on the data lake and type of source

  • Anonymization (optional — if dealing with personally identifiable information (PII)):

Performs certain operations on PII columns as defined by the user. There are three different types of anonymization transformations on DCE:

  • hash: applies sha2
  • start_of_year: transforms date to January 1st of that date’s year
  • delete: drops the column

The output from this step has the same schema as the data formatting step

  • Snapshot (Table Creation):

Until now everything looks good but the schema is not really nice to read, right? Snapshot step takes care of transforming that into a more readable format and making it accessible creating a table on Glue Catalog

Also when dealing with CDC events, Snapshot processes the update/delete/create operations according to the chosen table type SCD1, SCD2.

Each of these steps are executed as tasks inside a scheduled pipeline orchestrated by Airflow

How can it be used?

To generate the pipeline all the user needs to do is create a.json configuration file with a few inputs which will configure each of the steps.

The file looks like this:

{
"dag_name": "hellofres.data-curator-engine",
"type": "mysql", -- type of the source event. E.g mysql,postgres, generic, protobuf
"start_date": "2023–03–09",
"schedule": "0 8 * * *",
"tribe": "hellofresh-tribe",
"squad": "hellodata-squad",
"email": "hellofresh-tribe@hellofresh.com",
"load_type": "full", -- can be either full or incremental
"event_formatter": {
"input": {
"path": "s3://path-to-input-events/sample_events",
"format": "parquet" -- csv, avro and json are also supported
},
"output": {
"path": "s3://path-to-output-formatted-events/sample_events",
"format": "parquet"
}
},
"anonymization": {
"input": {
"path": "s3://path-to-output-formatted-events/sample_events", -- input for anonymization should be formatted data
"format": "parquet"
},
"output": {
"path": "s3://path-to-output-anonymized-events/sample_events",
"format": "parquet"
},
"rules": [
{
"col_name": "name",
"type": "string",
"anonymization_type": "hash" -- applies sha2 to column value
}
]
},
"snapshot": {
"type": "scd1",
"config": {
"identifiers": ["sample_id"],
"timestamp_column": "created_at"
},
"input": {
"path": "s3://path-to-output-anonymized-events/sample_events",
"format": "parquet"
},
"output": {
"path": "s3://path-to-output-table/sample_schema/",
"format": "delta"
},
"table_name": "sample_schema.sample_table"
}
}

This will create our “hellofresh.data-curator-engine” pipeline which will read raw data from the source input, anonymize the “name” column and create a table named sample_schema.sample_table on Glue catalog.

When a configuration file like the one shown is submitted to the project repository, automation takes care of validating if the configurations are set as expected and then deploying the pipeline to Airflow.

This is how a DCE pipeline looks like on Airflow:

Conclusions

Data Curator Engine has enabled multiple teams to deploy their pipelines within a 30 minutes time frame compared to what could take up to months. As of now, over 50 pipelines are running on the Airflow community instance provided. It is also being used as a library where teams install the package on their own Airflow instance benefiting from the same configuration-based pipeline generation.

In the future, the Data Platform team aims to provide an expanded version of the configuration-based process to its users, including capturing lineage, building enrichment pipelines, syncing data to other sources such as Snowflake and so on, all in a centralized tool. This will bring standardization, easy governance and enable teams to focus on the business logic of their data rather than writing code and maintaining multiple infrastructures. This project, named Tardis, should soon be described in another blog post like this one.

--

--