Manas Realtime — Enabling changes to be searchable in a blink of an eye
Michael Mi | Tech Lead, Core Product Serving Infra
Manas, Pinterest’s in-house search engine, is a generic information retrieval platform. As we discussed in our previous post, Manas was designed as a search framework with high performance, availability, and scalability. Today, Manas powers search for the majority of Pinterest products, including Ads, Search, Homefeed, Related Pins, Visual, and Shopping.
One of the key metrics for a search system is the indexing latency, which is the time taken to update the search index to reflect changes. As we keep growing the system capabilities and onboarding new use cases, the ability to instantly index new documents has become more important. Manas already supports incremental indexing, which is able to provide indexing latency within the order of tens of minutes. Unfortunately, this can’t meet our growing business requirements from Ads and following feeds. We decided to build a new module within Manas to further reduce indexing latency to a fraction of a second.
In this blog post we describe the architecture of the system and its key challenges, and we provide details about the tradeoffs we made.
New requirements come with new challenges. Here are several of the major challenges we faced.
The tiny batch approach, aka near-realtime, is the most popular choice for open source projects like Lucene, Vespa, etc. With this approach, the newly written document is not searchable until index commit is called. As a result, you need to make a tradeoff between indexing latency and throughput. Unfortunately, we can’t leverage this approach to reduce indexing latency to the order of seconds.
Index Refresh Ability
One of the drawbacks of realtime serving is the lack of index refresh agility. For a batch pipeline, it is trivial to rerun the indexing job to pick up all schema changes at once. However, when it comes to the realtime serving pipeline, an efficient index refresh support becomes complicated.
Scale-up for Constantly Changing Data
To avoid over-provisioning, auto-scaling was employed to adjust replicas based on the actual query load. If the index is immutable, it is relatively easy to bring up new replicas: you just need to copy the index to new nodes, and you are done. All of the difficulty lies in handling the constantly changing index: how to ensure that all replicas end up with the same index?
Manas is a data-intensive service where each host may serve an index of up to several hundred GBs. Manas is also a stateful system; a bad binary could introduce data issues that rollbacks would not be able to fix. We needed to build a system that supports both fault tolerance and error recovery so that it is possible to recover from both binary bugs and data corruption.
Moving from Static to Realtime
Let’s take a brief look at the differences between conventional static serving and realtime serving. As shown in the above diagram, the major work for realtime serving is moving the indexing pipeline from offline to online.
For static serving, the indexes are generated offline with a batch workflow, and then they are copied to leaf for online serving. With batch workflows, due to the high framework overhead, it is barely possible to build a servable index within a fraction of a second. For realtime serving, instead of using an offline workflow, all writes are handled on the fly within the service. In addition, the realtime indexing pipeline handles writes in a way that generates the same index format as the static indexing pipeline, allowing us to reuse the entire index read logic. With this in mind, let’s continue the journey of understanding how realtime serving works.
Instead of directly using RPC, Kafka was employed as our high write throughput stream. Leaf servers continuously pull mutations to build incremental indexes. Turns out this decision dramatically simplified our system in multiple ways:
- Data replications and write failures are taken care of by Kafka.
- With the seek back ability, the Kafka queue also serves as WAL.
- With a strict ordering guarantee in each partition, the system can blindly apply deletions without needing to worry about correctness.
Since the serving logic can be reused with a shared index format, we will focus on the indexing data flow.
Essentially, realtime Manas leaf is a LSM engine, which converts random IOs writes into sequential IOs and enables efficient serving for both read amplification and write amplification applications. As shown below, the whole indexing process consists of three critical steps. Let’s discuss them one by one.
Realtime Segment Build
We introduced realtime segments except for the existing static segments. As shown above, there are two types of realtime segments in the system: active realtime segments, and sealed realtime segments.
- Active realtime segment, the only mutable component, is used to accumulate mutations (adds/deletes) pulled from Kafka. It’s worth pointing out that after a document is added into a realtime segment, it becomes searchable immediately after the document level commit.
- Once the active realtime segment reaches a configurable threshold, it is sealed, becomes immutable, and is put into a flush queue. Meanwhile, a new active realtime segment is created to continue accumulating mutations.
In the case of a service restart, the realtime segments can be reconstructed by replaying messages from Kafka.
Index flush is the process of persisting in-memory data from a realtime segment into a compact index file. A flush is automatically triggered when a realtime segment gets sealed, and a flush can also be manually triggered using a debug command.
The index flush is a beneficial operator that guarantees data persistency so that we don’t need to reconstruct in-memory segments from scratch during restart. In addition, flushing reduces a segment’s memory footprint and improves serving efficiency with a compact immutable index.
Over time, multiple generated small segments hurt serving performance. To overcome this, we introduced a background compaction thread to merge small segments into bigger ones. Since deletion operators just mark documents as deleted instead of physically deleting them, the compaction thread also persists these deleted/expired documents.
After each flush and compaction operator, a new index manifest consisting of all the static segments would be generated. Kafka offsets, used as checkpoints, are also added into each manifest. Based on the checkpoints, the service knows where to consume messages after a restart.
In this section, we will cover several key areas in more detail. Let’s start with the most interesting part, the concurrency model.
The realtime segment, as aforementioned, is the only mutable component where we need to handle both read and write simultaneously. Unfortunately, the near-realtime approach employed by open source projects can’t meet our business requirement. Instead, we chose a different approach that enables us to commit a document immediately after adding into the index without waiting for an index flush. For the sake of performance, we employed a lock-free technique for data structures tailored to our usage. Now let’s open the box!
Each realtime segment consists of an inverted index and a forward index. The inverted index is logically a mapping from term to posting list, a list of document ids, used for retrieval. Meanwhile, the forward index stores an arbitrary binary blob used for full scoring and data fetching. Let’s only focus on realtime inverted index part, which is more interesting and challenging as compared to the forward index.
At a high level, the major difference between a realtime segment and a static segment is mutability. For the realtime inverted index, the map from term to posting list needs to be a concurrent one. This is well supported by open sources like folly’s concurrent hashmap. What we care about more is the internal representation for the posting list, which can support our concurrency model in an efficient way.
Usually, it is more efficient and easier to reason about a single-writer, multiple-readers model. We chose a similar data model as HDFS with an append-only lock-free data structure. Let’s take a closer look at how the reader and the writer interact with each other.
- Writer appends doc id into the vector, then commits size to make it accessible to readers
- Reader takes a snapshot up till committed size before accessing data
In order to avoid memory copying overhead as the posting list grows, internally we manage data as a list of buckets. We just need to add a new bucket without touching old ones when we run out of capacity. In addition, usually search engines use skip lists to speed up the skip operator. Thanks to this format, it is convenient to support a single-level skip list, which is good enough for realtime inverted index since the size of it is usually small.
Now with an append-only vector, we are able to achieve atomicity for a single posting list. However, a document can contain a list of terms, and we may end up returning unexpected documents with a partially updated index. To address this potential issue, we introduced a document level commit to guarantee document atomicity. In the serving pipeline, an additional filter is used to make sure only committed documents are returned.
Speaking of document atomicity, document updating is another scenario worth mentioning here. For each document update, we deliberately convert it to two operators: adding the new document, then deleting the old one from the index. Although each operator is atomic, together we can’t guarantee atomicity. We think it is ok to either return the old version or the new version in a very short time window, but nevertheless, we added dedupe logic into the serving pipeline to filter out the old one when both are returned.
One question that naturally comes up is that if your data structures only support the single-write and multiple-reads concurrent model, what if a single thread can’t handle all the writes in time? It does not seem like a good idea to blindly add more shards just to scale write throughput. While this is a valid concern, it has already been taken care of in our design.
The single-write and multiple-reads concurrent model used for data structures doesn’t mean we are not able to use multiple threads for writes. We planned to use the term-shard strategy to support writes with multiple threads. As shown in the above diagram, for a given document with a list of terms, each term would be always mapped to the fixed thread so that all data structures tailored for single-write and multiple-reads can be reused directly without any limitations.
Index refresh ability is a critical feature for our products, enabling quick turnaround and improving dev velocity. Generally, two approaches can be used to refresh the index in an efficient way, backfilling on the fly and reinstating from the offline built index, respectively.
We provide the ability to backfill documents at a reasonable throughput. To avoid impacting production freshness, we need a separate stream for backfill traffic with a lower priority. As a result, it is possible that two versions of a document are present in both streams and the old version overrides the new one. To overcome this, we need to introduce a versioning mechanism and a conflict resolver in the realtime indexing pipeline to decide which one is more fresh.
Reinstating from Offline Built Index
Sometimes, backfilling at a given speed for a full dataset would be too time-consuming. Another quicker index refresh approach we support is building an index offline, and then reinstating from it, with a synchronization mechanism between the offline built index and Kafka stream.
Failover and Auto-scaling
From time to time, we need to bring up new instances for various reasons, like failover and auto-scaling, etc. For static serving, it is easy to start a new instance with an immutable index downloaded from the index store. However, it becomes complicated for realtime serving with a constantly changing index. How do we ensure that new instances have the same copy of the index as others eventually?
We decided to use leader-based replication, as shown in the above diagram. Our process would look like this:
- The leader periodically takes a new snapshot and uploads it to the durable index store
- New instances download the latest snapshot, by default, from the index store
- New instances resume consuming messages from Kafka based on the checkpoint from the snapshot index
- New instances start serving traffic once they have caught up
There are some key points in the design worth pointing out:
The only responsibility of the leader is to take snapshots and upload the index periodically. This means we can afford to have no leader or have multiple leaders for a short period of time, up to hours. Therefore, we have some flexibility in choosing a leader election algorithm. For simplicity, we chose to use our cluster maintenance job to select a leader statically, where we periodically check if we have a good leader.
Usually, the new instance just connects to the leader to download the latest snapshot. In this approach, the snapshot downloading from new instances would potentially overload the leader, leading to cascading failures. Instead, we chose to upload snapshots periodically to the index store, trading space and freshness for stability. In addition, the uploaded snapshots are useful for error recovery, which will be covered shortly.
As aforementioned, error recovery is another challenge for realtime serving system. There are some specific scenarios involving data corruption we need to handle.
Input Data Corruption
We use Kafka as our input write stream; unfortunately, those messages are immutable due to the fact that a producer can just append messages to it but can’t change content for existing ones. This means once the data corruption is introduced into Kafka messages, it is permanent. Thanks to the uploaded snapshots, we have the ability to rewind our index to the point without corruption, skip corrupted messages, and then consume new messages with the fix.
Binary Bugs Caused Data Corruption
Although we have a mature indexing validation pipeline for static clusters to guarantee no issues with the new index and the new binary before swapping in the new version, it is still possible that some bugs sneak into production. Fortunately, we can fix the issue by rolling back the binary or index. It becomes much harder for realtime serving where rolling back the binary can’t roll back the errors in the index. Using our snapshot uploading mechanism, we are able to rollback the binary together with a rewinded index and then replay messages from Kafka to fix errors in the index.
As more scenarios are onboarded to Manas, we need to keep improving the system’s efficiency, scalability, and capability. Some interesting projects in our roadmap are as follows:
- Cohosting static and realtime clusters to simplify our serving stack
- Optimizing the system to support a large dataset
- Building a generic embedding based retrieval to power advanced scenarios
Acknowledgments: This post summarizes several quarter’s work that involved multiple teams. Thanks to Tim Koh, Haibin Xie, George Wu, Sheng Chen, Jiacheng Hong and Zheng Liu for their countless contributions. Thanks to Mukund Narasimhan, Angela Sheu, Ang Xu, Chengcheng Hu and Dumitru Daniliuc for many meaningful discussions and feedbacks. Thanks to Roger Wang and Randall Keller for the great leadership.
Manas Realtime — Enabling changes to be searchable in a blink of an eye was originally published in Pinterest Engineering Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.