Evolution of our data pipelines
As Data Engineers at the biggest online grocery company in India, one of our major challenges is to democratize data across the organization. But, when we began evaluating how well we were doing on this goal, we realized that frequent errors in our data pipelines had caused our business teams to lose confidence in data. As a result, they were never sure which data source is correct and apt to use for their analyses and had to consult the data platform team at every step. This wasn’t where we aspired to be when we provided the tools for making correct data-based decisions as independently as possible without getting slowed down.
As modern data platforms gather data from many disparate, disconnected, and diverse systems they are prone to data collection issues like duplicate records, missed updates, etc. To resolve these problems we conducted a thorough study of our data platform and realized that the architectural debts accumulated over time caused most cases of incorrect data. All major functions of our data platform — extraction, transformation, and storage had issues that led to the stated quality concerns with our data platform.
Let’s first list down these issues:
1. Lack of separation between raw and processed data
When we started our data journey 5 years ago, we did not have the foresight to separate our source tables from our derived tables. Thus the application tables got dumped into the same warehouse along with schema. While this was okay when we only had 20 tables, it became a huge problem when that number grew to cross the 1000 mark.
Not only were source tables placed right next to the data marts built on top of these tables, oftentimes we would make modifications to the source tables themselves. As a result, data consumers were often unsure about the meaning of data contained in different tables and found it hard to determine which table or column to use as the source of truth.
From an engineering perspective, it had started becoming difficult for us to trace lineages of data points during troubleshooting, making our MTTR high and causing regular disruption for end users very frequently. This lack of separation also took a toll on our infrastructure bills as we were keeping raw tables along with marts in the same storage.
2. Limitations of batched SQL based loading
In the beginning, our source tables were replicated using scheduled, batched, SQL based pulls from our production databases.
These loading jobs had certain problems which were inherent to batched jobs:
- These operations needed to rely on one fixed column, e.g. the primary key, “created_at” or “updated_at” fields, which could be used as a marker to keep track of the number of rows that had already been replicated so that the subsequent job could begin where the previous one had left off. However, this failed to replicate the changes that would not be visible through the tracking column. For example, imagine using the “updated_at” column as your tracker and you DELETE a row in your source table.
- You had to be precise when specifying the boundary conditions like the next marker time from which data has to be queried. Oftentimes, analysts failed to consider this while defining their jobs and you would see duplicate or missing entries around the edges of your conditions.
- We had some business-critical tables which get a high frequency of updates. So much so that the tables mutate multiple times while the replication is still in progress. This would either cause the jobs to fail because of row conflicts or succeed but produce data that differs from the source since redshift does not have native merge command. To circumvent this problem, we had to add more tools to the platform, which in turn became fragmented, increasing the complexity and adding more points of failure.
3. Components that would not scale
To allow data analysts to program their replication operations without any help from the data engineering team, we introduced a collection of visual drag-drop tools. While this allowed us to scale up our data operations in the beginning, it quickly turned into an unmanageable nightmare. A couple of years down the line, finding the job that populated X table had gone from difficult to impossible.
Another drawback of some of the tools that we had been using to run the bulk of our jobs had very little to no ability to raise alerts when something went wrong. This led to a situation where we were often unaware of any problems or inconsistencies in our data and only found out when some data analyst raised a concern. Add to that the fact that these tools started taking more time for execution with increasing scale, we had a very expensive problem on our hands.
4. No support for real-time pipelines
As the organization grew, we had started seeing more and more real-time use cases come up and there was no way to extend the existing data platform to cater to these scenarios.
With so many issues plaguing our warehouse, we realized that we had come to the end of the road with the first generation of our data warehouse. It was at this point that we decided to take a step back, think of what we needed out of our data platform. We were not afraid to build a system from the ground up if we had to.
We listed down the following as core capabilities we wanted our data infrastructure to have:
- Overcome the limitations of batched jobs as listed above.
- Separate storage and compute as much as possible so they can scale independently.
- Reduce the number of points of failure.
- Maintain an audit of changes and have pipelines that can be readily deployed in case of failure.
- Make it easy to track changes across systems and maintain data lineages to ease troubleshooting.
With these considerations in mind, we finally landed on the decision to construct a data platform that builds Domain-Separated Data Lakes using Change Data Capture to replicate source tables.
Data Lake & CDC
Let’s begin by defining the terms ‘data lake’ and ‘data warehouse’. A lot of organizations commit the mistake of using these terms interchangeably, but a data lake is not the same thing as a data warehouse. A data warehouse is a strategic store of information that can readily be used by data analysts and business consumers. A data lake on the other hand is a large repository system intended to store raw data that can be processed and served through the data warehouse when needed.
From an engineering perspective, a data lake needs to replicate and store raw application data from your production databases. This means that any time there’s a change in your production database, the data lake needs to make sure it replicates that change as well. There are various techniques that allow you to identify and expose these changes so that they may be replicated. This method of replicating data by replaying the changes made to the source is known as “Change Data Capture”, or CDC in short.
Change Data Capture(CDC) can be done in three ways:
The replication method that we describe at the beginning of this paragraph can be classified as query-based CDC, where we ran scheduled queries to copy data in batches. This method requires an incremental key to maintain markers over tables while polling the data.
A trigger-based CDC system uses shadow tables and a set of triggers upon monitoring tables to keep track of changes. It is a relatively least used technique among all due to very low performance and high maintenance overhead.
A log-based CDC system uses database changelogs, e.g. the Write Ahead Logs (WAL) in Postgres, Binlogs in MySQL, Oplogs in MongoDB, etc. to replay the changes on the replicated data store. These systems use several supported plugins to read the logs like Wal2Json, Pgoutput in Postgres, and open-replicator in Mysql. Log-based CDC has a lot of benefits over others like all data changes are captured including DELETES, complete ordering of transactions in which they are applied and the possibility of building real time streams. Therefore, we used this technique to create the present day replication pipeline at Grofers.
Reasons for adopting a data lake with incremental processing
Building a data lake is costly and time consuming. Before deciding to make such sweeping infrastructural changes, you need to make sure you assess and evaluate it thoroughly. We went through a similar process and this is what we learned we have to gain:
1. Separation of concerns helps scalability and query efficiency
Data lakes being different storage engines than data warehouses, inherently allows us to separate applications’ tables and marts both logically and physically. Therefore, it addresses the issues regarding characteristics and lineages of tables, making analysts aware of them before they do any queries over them. This also becomes one of the checkpoints for the team in case there are any inconsistencies in marts.
Another major benefit that a data lake delivers is the separation of storage and compute. The reason behind it is that a data lake is built using file system storages like AWS S3 or traditional HDFS, which costs a lot less when compared with data warehouses like Amazon Redshift. This benefit brings all the more cost value if the duration of queried data is lesser than the overall history of data stored. For example, most of the reports at our organization only query data that goes back one year. However, we have more than 5 years of data stored in our data lake. Had we stored all of this data in our warehouse, we would have had to pay a much greater sum in infrastructure costs.
2. No more batch polling over the source databases
There is no need for batched SQL based data pulls anymore, as we use database logs that give us complete information about all the transactions that take place on the tables. Also, no more risk of losing data because of change marking columns (e.g. created_at, updated_at etc.), since this method does not rely on using any columns to identify changes. We can also replicate DELETE rows over the warehouse, which earlier got missed out due to querying.
3. Support varied application database engines and business conventions
Every database engine has its specific way of handling data types like time, decimal, JSON, and enum columns. Further, we also have many diverse backend teams that design its database architecture keeping the needs of its end-users in mind. As each team follows different conventions depending on their users, their technologies, etc, we often see a large amount of variation in the way similar columns are named. A data lake allows you to homogenise these differences like precision, namings before you present this data to your analysts.
4. Data screw-ups and failovers
Any organization having a large set of databases and systems requires it to have a minimum resolution time for incidents, so that operations can run smoothly. Data lake allows us to do the same, as we do not have to rely on the source database replicas in case any failure occurs. We became more resilient to failures as we can do reloads without affecting the production systems.
Furthermore, data now goes through multiple stages of processing which enables us to apply integrity and quality checks at the output of each of these stages and raise alerts to catch issues sooner than later.
5. Real-time analytics and use-cases
The data capture mechanism that we have employed generates real-time data streams. These data streams further can be used to serve many use cases like monitoring day-to-day operations, anomaly detection, real-time suggestions, and more machine learning use cases.
We have around 800 tables (Postgres and MySQL) in different backend systems that get constantly updated. Maintaining this colossal amount of tables only in Redshift (our warehouse) gets our infrastructure cost bundled up. We have a nearly 60:40 ratio in inserts vs updates for a day and such an enormous volume of updates require a lot of data to be reprocessed and updated (higher odds of duplicate data). In a democratized environment, controlling the quality of queries is hard. Thus, poorly written ad-hoc queries querying entire timeline of data started affecting our SLAs because of our inability to scale compute independently. Since data lake allows us to work on partitions and merge rows based on keys, it resolves issues of duplicate rows before loading to the warehouse. Further, CDC streams allow us to capture every single change in tables which resolves the problem of missing updates. Therefore using a log-based CDC along with Data Lake as the architecture for our replication pipelines seemed like the right next step in the evolution of our data platform.
CDC tools that work with data lake
As discussed earlier, the problems in our pipeline were not just with transformation and storage layers. We faced numerous issues around capturing the changes like row-conflicts, locks, missing updates, schema changes. Thus, we had to change the way we get the data from the application databases, and the solution that we found is the use of log-based CDC. There are many cloud-based and paid tools around log-based data capture but since it is the most critical part of architecture we decided to put more weightage on the control of the system. We tried out AWS Data Migration Service that we used traditionally but since it does not fall into the long-term vision of kappa architecture (being a one-to-one pipeline that cannot be reused further), we had to look for other solutions.
We wanted fine-grained control over the tool, which would allow us to easily make modifications around monitoring and alerting. Therefore using black-box services was out of the picture.
We tried out several open-source projects around CDC like Debezium, Maxwell, SpinalTap, Brooklin. Among all of those, Debezium stood tall in terms of support of database engines (both MySQL and Postgres), snapshotting, column masking, filtering, transformations, and lastly documentation. Further Debezium also has an active Redhat development team that can provide prompt support to us if we can’t get through an issue. We also tried out Confluent source connectors but those being query-based CDC, we decided to not pursue that route further.
Creating a useful data lake
Data lakes being huge repositories of raw and semi-processed data often get their use-case limited just up to the middle layer for processing data and not having any direct value for the business. Also, there is a lot of criticism surrounding the inefficient data lakes which have turned out to be a graveyard dump of files without any active use. Keeping that in mind, we decided not to dump them as general parquet/ORC files, and add some direct business value like active querying by consumers over the same.
Data lakes are a relatively new architectural pattern and there are a lot of open-source projects in the space that add metadata to these lake files which makes them quite similar to the warehouse and can be queried further using Hive, Presto, and many other SQL query engines. Most of these projects are based on the principle that the data lake should be updated in incremental upserts with the use of metadata like bloom filters, partitioning, indexing, etc.
Some of the prominent ones are as follows:
- Delta Lake: This project is the most known among the community and being developed by Databricks. It has two implementations, one with Databricks platform and another open-source. Both of them have the same architecture at the base but open-source implementation currently lags on the Databricks Runtime by a mile in terms of compaction, indexing, pruning, and many other features. But it has the most momentum and adoption in end-user tools among all in terms of ACID based data stores.
- Apache Iceberg: Originally developed by Netflix for storing slow-moving tabular data, it has the most elegant design of them all with schema management (modular OLAP) using manifests. It is relatively lesser known than the other two and lacks a tighter integration with a processing engine like Apache Spark or Flink or a cloud vendor which makes it a little bit difficult to adopt.
- Apache Hudi: This is the open-source project originally developed by Uber for ingesting and managing storage of large files over DFS (HDFS or cloud storage). It gives a lot of emphasis on performance (like latency and throughput) with deeply-optimized processing implementation like Copy on Write and Merge on Read datasets. It can also be defined in general as the incremental processing of batch data. It’s currently being supported by the AWS ecosystem (via redshift spectrum). This is currently being adopted by us after an extensive POC.
We used Apache Hudi as the choice of our storage engine for the data lake, primarily because of the performance-driven approach of the same. Most of our tables are created using Copy On Write paradigm as we do not want to serve real-time updates through redshift. After capturing the CDC from source databases using Debezium and Kafka, we put them in S3 so that they can be incrementally consumed by Spark which is the processing engine for Hudi. And since CDC cannot be put in direct format into Hudi tables we have developed an internal tool for the same i.e. Nessie (derived from the lake monster) as it does all the processing over our lake data and puts the same in the warehouse consumption at the final step.
Nessie: the lake monster
Nessie is a tool we are developing to provide an abstraction over the processing engine while coupling it with the data lake, as CDC logs need to be processed into an appropriate format so that it can be stored as a Hudi table.
An overview of the features to be developed on Nessie is as follows:
- Handle the intricacies of integrating Apache Hudi along with Debezium CDC like the generation of the incremental key using transaction information from CDC records and schema evolution.
- Support for different types of raw file dumps (Avro/JSON/parquet) with different dump intervals (minutes/hours/days) having different data types available in MySQL and Postgres.
- Support records consumption from Kafka topics using either Delta Streamer, Spark Streaming, or its consumers that poll Kafka topics.
- Support the generation of standard silver tables for further processing.
- Support monitoring SLAs using markers, compaction, and other table metrics.
Things to take care of
If you are planning to embark on creating a data lake using Debezium or Hudi, you need to consider certain issues and limitations of these projects. To describe those, let us first go through the basic parameters of Hudi’s working. A Hudi based table works on 3 important parameters of a table i.e.:
- Record Key (unique id for each row)
- Incremental Key (maximum unique value state of row version)
- Partition Key (constant partition value for row)
While developing the lake on CDC data, we faced most challenges around the incremental key where we had to generate incremental change value for each change/transaction in Postgres and MySQL DB engines based on their properties like LSN, Binlog Position. We did think of using the modified time in each row for the same, however, it did not work out due to time values precision and parallel transactions.
We also had to develop customizations around timestamp handling in Hudi as its Hive integration currently doesn’t support timestamp as a datatype. Finally, tighter integration among warehouse (redshift) and data lake had to be developed to handle schema evolution.
In the Debezium, we faced issues around consistency as there were cases of missing data changes. Some of the critical ones are as follows:
- DBZ-2288: Postgres connector may skip events during snapshot-streaming transition
- DBZ-2338: LSNs in replication slots are not monotonically increasing
Meanwhile, to circumvent it from our side, we decided to extend our validation script to a patching script that tracks and fixes any change event missed. Developing global validation scripts that can handle different database engines having varying versions was also difficult as they ran over both data lake and data warehouse.
On the performance side of the overall pipeline, we faced some issues with Kafka regarding the limitation of the topic to a single partition since Debezium follows the strict ordering of messages. We resolved the same via using increased producer request size so that the overall transfer rate of records increases.
The Paradigm Shift
If you are wondering about the adoption of this pipeline over our organization, we have currently moved nearly all of our critical tables to this along with their consumers. In the future, we are planning to integrate overall data lineage and health monitoring tools also. We are also slowly moving to stream-based kappa architecture as well where we are going to create real-time systems over this. Also, in Hudi we are planning to try out Merge On Read (MOR) for log-based tables in the warehouse. Although a lot of adoption of Hudi’s plans depends on redshift spectrum support for Hudi.
Overall moving to this architecture has reduced a lot of volatility in our pipelines along with the massive reduction of costs.
I want to acknowledge the herculean efforts and zeal of the entire Data team at Grofers especially Apoorva Aggarwal, Ashish Gambhir, Deepu T Philip, Ishank Yadav, Pragun Bhutani, Sangarshanan, Shubham Gupta, Satyam Upadhyay, Satyam Krishna, Sourav Sikka, and the entire team of data analysts for helping this transition. Also, I want to thank open source maintainers of Debezium and Apache Hudi for the development of these awesome projects.
Akshay Agarwal is a software engineer on the Data Engineering team at Grofers. Follow him on LinkedIn and Twitter.
We are hiring across various roles! If you are interested in exploring working at Grofers, we’d love to hear from you.