59 million of them, to be exact. And they were out of order.
The New York Times has been publishing since 1851. At first, we printed dozens of articles a day in the paper, then we gradually added photographs, recipes and reviews. When we moved to the internet, we added slideshows, videos, interactives and podcasts. Collectively, we call these items “publishing assets.” After nearly 170 years of publishing, we have accumulated over 59 million assets. And that number is constantly growing.
To process the number of assets that The Times has generated, we created a publishing pipeline in 2017 using Apache Kafka open-source software. That year, we began storing every revision of every asset on a single-partition Kafka topic that we call the Monolog.
Think of the Monolog as a long, ever-growing list. When a new asset is published, it is added to the end of the Monolog. Numerous Times applications — such as the website, mobile apps and search index — constantly monitor the Monolog for new assets so they can refresh when new content is available.
In order to display an article online, our engineering teams must combine multiple assets, such as an article, an image and a reporter’s bio (we call this a “person asset”).
In database terms, data on the Monolog is normalized, but when the data is combined for display purposes it becomes denormalized. In order to render an article, the image and person assets need to appear on the Monolog prior to the article asset.
It would make sense for the data on the Monolog to start with the first articles from 1851 and continue chronologically until the present day. As is the case with most things in software engineering, the reality is a bit different.
When we launched the Monolog in 2017, we immediately began adding new assets as they were published online. Shortly thereafter, we added a handful of historical articles from 2000 to 2016, including a 2001 article that commemorated The Times’s five-year anniversary on the web. We then added selected articles from 1950 through 2000, followed by three major bulk archive republishes that added articles from 1851 through 2000. To correct data bugs and introduce other enhancements, we did occasional bulk republishes that were much smaller.
This scattered asset loading process caused several problems:
Chronological disorder. If someone wanted to find all of the assets for any given year, they would have to process all 59 million assets, which could take over 24 hours. This was especially problematic for machine learning or personalization projects that only needed data from the last couple of years.
Topological disorder. Articles often reference other assets, such as images, recipes and people. Because we added assets published before 2017 to the Monolog in batches, assets that referenced each other were often at different places along the Monolog. These missing references led to an incomplete view when denormalizing an asset, such as an article with a missing image or a recipe review with a missing recipe.
Duplicate disorder. Because of things like republishes and — ahem — bugs, there were often multiple copies of the same asset on the Monolog. In one case, there were over 900,000 duplicates. These duplicates took up space and forced unnecessary processing.
Ordering the Monolog
It was clear that the way the Monolog was ordered was unsustainable. My team decided that the best way to solve this problem was to create a new Monolog that was sorted in chronological and topological order. Ordering the Monolog chronologically meant we could use a Kafka feature to jump to a specific offset using a timestamp, meaning we could precisely get assets from a particular time frame without processing the whole thing. Ordering the Monolog topologically meant we could eliminate incomplete denormalized assets. And since we would be looking at all of the assets, we could remove all of the duplicates.
The question was: how would we accomplish this? Was this a big data problem to be solved with tools like Hadoop or Spark? Would a more traditional database fit the bill? Or, would the solution be something else?
The team started by inserting metadata for each asset into a SQL database. However, the need to correct data issues, especially missing references, meant a simple sort to order the data wouldn’t suffice. This approach led to a series of increasingly complex queries that took woefully long to run (we’re talking weeks in some cases). But more importantly, these queries did not produce a correctly ordered Monolog.
We needed a solution that ran quickly and efficiently. I was new to The Times, and my team asked me to take a fresh look at the ordering problem.
As I reviewed the previous work and struggled to understand the queries, I felt like SQL wasn’t the right tool for the job — it was getting in the way of progress. So I paused, took a step back and looked for an alternative approach.
While 59 million assets taking up 330 GiB of disk space is big, it is not big data big, which often falls in the range of billions of records or petabytes of data.
My new laptop had 16 GiB of memory, and I wondered if it could fit all 59 million assets in memory. I did the math: 16 GiB divided by 59 million was roughly 300 bytes. With some clever encoding, it seemed possible.
I would still need to persist data and I began thinking of alternatives to SQL. I had an ah-ha moment. An algorithm was beginning to materialize in my head.
Key-value databases are extremely efficient for certain patterns of data access. I envisioned a multi-step algorithm which leveraged this efficiency.
Step one would consume the entire Monolog and convert each asset into a small encoded format that preserves only essential information such as a unique id, the publication date, references to other assets and a data fingerprint used to detect duplicates. This encoded asset would then be inserted into the key-value database where the key is a unique publication date and the value is the encoded asset.
Multiple assets might share the same publication date because they were published on the same day — this was primarily an issue with assets that had been published before The Times had a website. To ensure that each asset has a unique date, we added nanoseconds to the date.
Step two would look for and resolve missing references. The algorithm would inspect each asset in chronological order and flag references to other assets that hadn’t yet been seen up to that point in time.
Later on, when a flagged reference was seen, we would resolve the problem by making a copy of the referrer and setting the publication date to just after the found asset by adding nanoseconds. The reference to the missing asset in the original referrer could then be deleted.
Though this solution would fabricate revisions of assets that never occurred, it would do so in a way that did not alter history in a material way. It simply corrected the data to reflect what should have happened when the assets were first published. This solution is similar to a real-life scenario when an editor adds an image to an already published article, then republishes both the image and the article.
Step three would look for duplicate assets once all of the missing references have been fixed. Once again, each asset would be inspected in chronological order and a record would be kept of the fingerprint of the previous revision of the asset. If the fingerprints are identical (accounting for deleted references), the latter one would be deleted.
Step four would iterate through the assets one last time to fetch the original asset and adjust for altered references or publication dates. Everything would then be published to a new sorted Monolog Kafka topic.
The algorithm drove a couple of critical requirements of the key-value database. It had to be able to efficiently iterate over the keys in chronological order and it had to work within the confines of available memory.
Not all key-value databases fulfill these requirements, but two types do: LSM-Trees and B-Trees. Because many teams at The Times use Go, I began searching for Go key-value databases and chose BadgerDB, an open-source LSM-Tree database that is actively maintained.
For step one of the algorithm, I was able to encode each asset into 233 bytes on average. This meant the majority of the data could be kept in memory at the same time. Using BadgerDB, it took about four hours to extract all 59 million assets, but most of that time was due to network latency talking to Kafka.
The remaining steps required that we iterate through all 59 million assets. These proved to be very fast as well, taking seven to 15 minutes depending on the task (up to 140,000 assets per second). This was an enormous win.
By having the advantage of speed on our side, the cost of failure was low and we could experiment with getting the complex details of implementation just right. In the end, it took about eight weeks to get to a final solution (which was still faster than the five months spent on the SQL approach).
Order is Restored
After hundreds of iterations and tweaking, the algorithm was refined and finalized. The production version runs in the cloud on a virtual machine not that different from my laptop. After data is extracted from Kafka, it only takes about 90 minutes to do the entire ordering process. It takes another day and a half to re-publish the ordered assets to a new Kafka topic.
By removing duplicates, we reduced the number of assets to just over 35 million. The offset and publication date now increase in tandem and we can easily pull all assets from a particular year.
The results are beautiful:
When many of today’s technology problems entail enormous amounts of data, it may not seem cutting edge or cool to work on a problem that can be solved on a single laptop. But finding a solution to a complex problem is satisfying, no matter the size.
Doug Donohoe is a Senior Software Engineer for the New York Times, living in Pittsburgh, Pennsylvania. He’s been to all seven continents and looks forward to exploring the world again soon. Check out his Twitter, where you might catch a glimpse of his dog, Dexter.
Source: New York Times