How We Scaled Our MongoDB Instance Gracefully

byon August 18, 2022

As veterans of many fast-growing companies will tell you, the infrastructure that supports your minimum viable product (MVP) doesn’t always grow with you to handle tens or even thousands of customers. At Scale, we intentionally chose MongoDB as our NoSQL database, primarily due to its flexibility in supporting arbitrary data without a mandatory schema. As we addressed the needs of increasingly sophisticated customers, flexibility was essential to close new business and adapt our product to suit new customers. That said, flexibility doesn’t always equate to “easy to support” or “easy to modify to support more demanding customers.” With MongoDB, the burden fell on our Infrastructure team to determine how best to distribute our database nodes across our cloud infrastructure. Read on to learn how we split our MongoDB instance into several shards without downtime.

Why MongoDB:

MongoDB is the main metadata store for Scale. We host our MongoDB clusters on MongoDB Atlas, a MongoDB-provided and managed instance of their database that runs on our cloud provider. MongoDB enables rapid development and iteration, which proved crucial in the early days, when we wanted to develop features as fast as possible.

However, after our usage grew significantly over the intervening years, our MongoDB instance became more and more challenging to maintain. Toward the end of 2021, our cluster’s resource utilization reached the limit of what a single shard on Altas can handle, specifically 5TB of storage and 768GB of memory per node. There were three nodes in this replica set, and we put the majority of heavy reads into secondaries. We had over 150 collections in this cluster and tens of thousands of call sites spread across our codebase. At this stage, the cluster became almost unusable because of two significant problems:

  1. Low Availability: the following chart shows how many failovers we saw in this cluster over a two-month period. Each orange line indicates a failover. The primary node became overloaded and was the reason for the majority of the failovers.

  1. Capacity limits: the following chart shows the storage usage of our cluster. Starting in April of 2021, we kicked off multiple efforts to bring storage usage down, but it climbed up again quickly.

Through sharding and continuous data archival, we extended storage capacity and ensured uptime and performance going forward. In this article, we’d like to share our experiences scaling MongoDB clusters to support speed and user growth.

Disk space crunch

Our weekly task processing volume at the end of 2021 outstripped the beginning of 2021 by a factor of ten, which meant that in 2021, we stored more metadata than in all prior years combined. Throughout the second half of the year, the infrastructure team fought valiantly to overcome MongoDB’s space limit—every 1-2 months, we took measures to prevent it from running out of space. We deleted a large amount of unused data, moved large fields into blob stores, and moved large collections not in transactions into other clusters. The closest we came to filling our entire capacity occurred in early 2022 when we estimated that we had at most two weeks of storage space headroom remaining.

Over time, we’ve learned that we only need to keep the working set in the live MongoDB cluster while moving colder data out. We adopted the online archive feature provided by MongoDB Atlas: a managed `cron` job that periodically moves data from MongoDB to a cloud storage service like AWS S3.

For our specific use case, we set the archive date on relevant MongoDB document objects when a labeling task is completed. Then, when we need to redo some tasks, we unarchive the tasks, mark the unarchive date on the task, and finally extend the archive date on relevant documents.

We also needed to backfill more than 1 billion documents for the archive dates. We queried all the IDs, chunked them into files of small batches, and backfilled per batch. This way, we can resume in case unrecoverable failures happen.

Generally speaking, data archival works smoothly with this approach, but querying for data from Atlas online archives is much slower. Sometimes, users need to unarchive millions of documents within a short period. After some research, we eventually moved those unarchive queries over to our data warehouses, such as Snowflake. 

To date, more than 35% of data in the cluster has been online-archived.

Though online archival allowed us to save disk space, it also confused both internal and external customers. In line with industry standards, we eventually settled on a fixed retention period for most data, while also considering our customers’ retention period requests.


After learning from all the experience we gained from serving our customers, we’ve decided that sharding is inevitable if we want to scale out our MongoDB cluster’s further compute and storage capacity. MongoDB natively supports sharding with its mongos routing layer. However, when we started to shard, we found it still difficult to implement since there were too many factors to consider when choosing shard keys, feature incompatibilities between non-sharded and sharded collections, and a lack of tooling.

Shard key and shard type

The most difficult technical decision was to choose the shard key. A perfect shard key should have high cardinality and low frequency, result in even traffic distribution across shards, and avoid scatter/gather queries for single find requests.

Best practices for value distribution and traffic distribution tell us that we should avoid both hotspots and jumbo chunks. During our benchmark test, we observed one-third the throughput and three times higher latency when we switched from a 256MB to a 500GB chunk size.

Across our collections, we couldn’t find good shard keys for ranged sharding, so we chose hashed sharding for all the collections. We also realized that hashed sharding would result in more memory usage due to loss of locality, but we successfully verified that additional memory usage is acceptable.

How we chose shard keys

Since we’re sharding existing collections, we first resort to Application Performance Monitoring (APM) to identify queries that took the longest time in aggregate. A few candidates reveal themselves from within the query patterns. Then we validate their cardinality, frequency, and traffic distributions. Lastly, we determine the migration cost to convert scatter/gather reads to single reads and to make performance considerations for any special queries (e.g. cross-shard transactions). This step is usually the most time-consuming. We went through all call sites to count the number of queries to change as the migration cost, and made various performance benchmarks. For many of our collections, we found that _id:hashed was a natural shard key that works well.

Incompatible queries

As of MongoDB version 4.4, when we migrate, for aggregate queries, in the $lookup stage, the “from” collection can’t be a sharded collection. Also for single updates like findOneAndX, deleteOne, updateOne and update with {multi:false}, either _id or shard keys need to present the query. Thus, we built lint rules to make sure these specifics are captured as early as possible.

It is not always possible to include shard keys in single update queries. We encountered a case for which we had to replace a heavily used, highly raced findOneAndUpdate query. We ended up replacing it with a function, which first executes a cursor find, then repeatedly loops through the cursor and tries updateOne until it succeeds. Although this solution resulted in running more queries, it worked well in benchmarks and production usage.

Cross-shard transactions

We heavily utilized transactions to guarantee consistencies across collections. They would become cross-shard once we shard our collections.

We were concerned about potential performance degradations since 2-phase commit was used for distributed transactions. Thus, we conducted benchmarks for cross-shard transactions (results in Performance Benchmarks).

Alternatively, we considered possibly eliminating cross-shard transactions by co-locating data in transactions to the same shard. These collections need to share the same shard keys. More importantly, we can’t rely on MongoDB’s balancer to balance the chunks across shards since it moves chunks randomly and locality can’t be maintained. Instead, we found we needed to manage chunk movement ourselves. We did not move forward due to the operational complexity of chunk management and still acceptable cross-shard transaction performance.

Unique index

Unique indexes in MongoDB make sure combinations of keys are unique within a collection. Since indexes are local within each shard, MongoDB requires unique indexes to contain shard keys to guarantee uniqueness. This requirement posed some challenges in choosing shard keys for some collections.

There are some official suggestions on enforcing uniqueness for arbitrary keys on sharded collections. A unique index is built on a secondary collection to guarantee uniqueness for the main collection during insertion. Although it may be easy to implement, we find this solution doesn’t handle failure scenarios consistently. Inconsistency happens when the insertion to the secondary collection succeeds, but insertion to the main collection fails.

We weren’t satisfied with this result, so we proposed a solution that used an ephemeral distributed lock to temporarily lock the unique key updates: first, we locked on the unique keys, then checked whether the given unique keys already appeared in the collection—if so, we rejected the request and unlocked the key; otherwise, we accepted the update and unlocked the key.

Implementation-wise, we utilized Mongoose pre- and post-hooks to capture unique index keys change and lock or unlock accordingly. We set expirations on locks with MongoDB TTLed collections to account for application failures. We also generated thread-specific lock IDs to avoid incorrect cross-thread unlocking.

Performance benchmarks

We benchmarked an empty MongoDB 4.4 cluster with one- and two-shard configurations to establish read / write / transaction query characteristics. Our rough conclusions were:

  1. Finds that involve only one shard achieved around 30% more throughput.
  2. Finds that scatter or gather to all shards were around 20% less throughput.
  3. The create operation achieved more than 40% more throughput.
  4. Cross-shard transactions (two write one read) achieved on-par, if not more throughput.

We also replicated production data to a 4-shard cluster. With this configuration, we could validate that single finds throughput was much higher than the single shard configuration. We also spent lots of time benchmarking the findOneAndUpdate query replacement mentioned above and verified 6x headroom from peak traffic.

Disaster recovery strategy

We were very cautious about sharding a collection since there’s no easy way to unshard. We considered two ways to go back to the unsharded state:

  1. Live replicate the main cluster to an unsharded shadow cluster, and shard the main cluster in place. For disaster recovery, promote the shadow cluster as the main cluster.


  • No data loss when promoting shadow cluster during outage.
  • Changes happen gradually.


  • Significant cost increase during the period of sharding.
  • Balancer might degrade cluster performance for 1-2 months due to high disk IOPS incurred from chunk movement, based on our experiences on sharding a smaller cluster.

  1. Blue-green update. Live replicate the main cluster to a sharded shadow cluster, and make the switch once sharding is done in the shadow cluster. For disaster recovery, we switch back to the main cluster.


  • No performance impact to the existing cluster.
  • Fast sharding since no re-balancing is needed. In our experience sharding can be done within 2 days.


  • There will be a small amount of data loss if we needed to switch back to the main cluster.
  • It’s a sudden, unpredictable change to all collections.

We chose the blue-green update since we favored the shorter sharding period, lower cost, and lower performance impact on the main cluster.


Since we decided to implement a blue-green update, the replication would be from an unsharded cluster to a sharded cluster. We considered some options:

  1. Mongomirror. This tool only works for replication between replicasets.
  2. Atlas live migration. This tool only works for two clusters with the same number of shards.
  3. Application side dual writes. We concluded it would be too difficult to maintain consistency since we want to make sure only successful writes were captured. Also, writes need to be applied in the same order between the two clusters.
  4. Mongopush (This third-party tool started as an internal MongoDB project, then the author rewrote it as hummingbird). Mongopush performs replication between two clusters with different topologies, and was exactly what we needed. However, during our test we found its oplog replication too slow and the shadow cluster was not able to catch up with the main cluster.

After discussions with MongoDB engineers, we decided to utilize mongopush for the initial sync, and implement our own replication to the shadow cluster.

Oplog replication

We can either apply the change stream records, or tail the oplog and apply records with the `applyOps` command. We went with oplog replication, since MongoDB support suggested that it would perform better.

We did need to reverse engineer the oplog spec, because its record structure was not publicly available. Happily, mongorestore provided some excellent examples.

We replicated MongoDB’s hasher implementation to get the shard value based on the shard key. We fixed shard key boundaries between shards by sharding each empty collection with 4 initial chunks, and stopping the balancer, so we can easily calculate which shard each doc matches to.

We read oplog with read concern “majority” to avoid getting inconsistent data due to failover and rollbacks.

For transactions, the MongoDB open source repo has great details on how they are applied from oplogs. Luckily for us, single-shard transactions appear only all or none in oplogs, so we don’t need to pay special attention to them.

Performance optimization

Our initial Python implementation was quite slow (around 3 ms per oplog application), so we profiled our code and implemented some optimizations.

We batched and parallelized requests across collections, following how MongoDB secondaries apply oplog (doc), making sure applyOps within each collection runs sequentially. We filtered out “n” operations from oplogs, which appear to be used for retryable writes and not needed in our use case. Furthermore, we noticed applyOps commands can be applied in batches (source); thus we started to aggregate and apply oplog commands by shard.

With all the optimizations we reduced avg latency to ~0.25ms per oplog, which is 4-5 times faster than oplog production in the main cluster.


We set up various validation helper functions to ensure the migration can happen without application errors or data loss. We migrated unit tests to run on sharded clusters and manually verified scenarios with production data.

We built validations in the replication script to ensure data integrity between oplog and the shadow cluster. For every oplog applied, we validate the after-effect through both `mongos` and the replica set of the corresponding shard.

We made a script to compare main and shadow clusters during live replication to monitor consistency. It compares document count and document content. We loop through _id ranges by date (Unix epoch is a part of _id) to compare document count by date. We sample documents and compare them with stable jsonified forms.

Through comparisons, we found that mongopush failed to replicate certain documents, indexes, and collections, which we had to patch separately.

The big day

Finally, we decided on a date and time for the migration. We briefed relevant internal and external stakeholders to explain that a major database migration was going to happen and would likely result in some downtime.

At the time of the switch, we redirected all traffic to a maintenance page, stopped all services that connect to MongoDB, revoked developer permissions to MongoDB, double-checked there were no connections to the main cluster, switched out the MongoDB connection string, restarted all services and reenabled permissions. Things went smoothly.

Smoother sailing

By sharding the cluster into 4 shards, we could extend its storage capacity from 5TB to 20TB, while decreasing cost by using smaller machine types. Surprisingly, we also observed an immediate 500 error reduction, and that P-99 latency was cut in half.

Two weeks later, we noticed random occurrences of secondary nodes failing due to significant replication lags from primary. MongoDB support traced the issue back to bug SERVER-62685 and related WT-8924, which were related to cross-shard transaction replications. It was fixable with a restart, so we were initially not too concerned while waiting for the fix to backport. Unfortunately, this bug did take down our site three times before the fix was deployed. In each case, when both secondaries of our 3-node replica set went offline due to the bug, flow control kicked in and completely halted our write capacity. Since the fix, our cluster has experienced substantially improved stability. The number and duration of site outages were cut by more than 60%. The overall failover count fell significantly from around 17 times per month to less than 2 per month.


Sharding an existing cluster is a very complex process. For MongoDB, sharding a collection requires only a few commands, but there’s much more preparation required thanks to our large quantity of previously stored data objects and many existing queries. We could have simply sharded blindly, but the result might have been disastrous. Only through all the disaster recovery planning, careful shard key choice, benchmarks, and validations could we gain confidence in the technology, familiarize ourselves with all the caveats, and make the sharding operation happen smoothly.

We also concluded that official tooling for sharding a MongoDB cluster is lacking. No off-the-shelf tools ultimately met our needs, so we devoted significant efforts to the oplog replication script. We hope that MongoDB will soon offer an official tool to improve the experience of sharding a large single-shard cluster. We thank the MongoDB support team for their guidance throughout this process and the internal support across Scale’s various functions. We hope this explanation of our journey helps you and your infrastructure achieve more and more “nines” as you scale up your business!

The future of your industry starts here.