
TLDR: Row stores are fast to write but slow to read. Column stores are fast to
read but slow to write. Load data from Mongo into Parquet files for fast
querying using AWS Athena.
MongoDB is a schema-less NoSQL document store that uses a JSON-like format for
each document. Each document in Mongo WiredTiger is stored as a contiguous
binary blob, which makes our MongoDB instance a row store. AWS Athena is a
SaaS offering by Amazon that queries files on S3 using Presto, a distributed
SQL query engine that can query high-performance columnar formats like Parquet
and ORC.
Here at Scale we use MongoDB backed by WiredTiger as our primary data store.
This blog post describes why and how we load data from MongoDB into AWS Athena
using Parquet files in order to get fast ad-hoc analytics queries on our data.
Suppose you want to get the sum of a single column(field) over all the
rows(documents) in a table(collection).
With a Row Store, without an index, to get a field you need to fetch the
document, and to get the same field for all documents you would have to
essentially load the entire table from disk. All the columns in your table
(even columns that do not appear in your query) get fetched as well, which is
slow:
You can speed up this query by building a covering index. However, each index
you add increases disk usage and slows down inserts. Also, when doing data
exploration it can be difficult to predict what indices you need ahead of
time.
With a Column Store, the query is much more efficient. A column store puts
data from the same column together. This would allow us to just retrieve the
columns we are filtering / aggregating over when doing queries. This allows us
to get good performance on ad-hoc queries, without indices:
Column stores, however, make it very difficult to update a single record - due
to the way data is arranged you have to move all of the data to change one
record. Also, column stores require us to commit to a schema, to define the
columns ahead of time.
The slowness of column-store writes is related to the size of the table,
because you need to rewrite the whole table to change a single record. By
representing each row-store table as multiple column-store tables, we can get
the column store to have faster writes at the expense of slowing down its
reads.
For our implementation, this means we map each Mongo collection to multiple
Parquet files (each file actually contains multiple row groups, with each row
group being column-major, but we use files as a unit for convenience), and
refresh all the documents in a Parquet file whenever any one of them changes.
We use smaller file sizes than what would be optimal for reads, in order to
make writes faster and reduce replication lag.
When updating the column store from reading the row store, we need to
To determine staleness, we need to know the identity of the document and the
time of modification. This data is obtained by reading the Mongo oplog, a data
structure that represents all inserts/updates/deletes to the database
chronologically, available on Mongo replica sets for internal use in
replication from primary to secondary nodes within the cluster.
For efficiency, we want to ensure that documents to be modified belong in as
few files as possible. We achieve this by grouping records by their Mongo
ObjectIDs. We are making use of the fact that, like in
Generational Garbage Collection, young documents tend to get modified more than old ones. We maintain an
index recording the ObjectID ranges of each file, starting a new file whenever
the newest bucket is filled with more than N documents.
class Buckets(object):
    def __init__(self, size_limit):
        self.size_limit = size_limit
        self.buckets = [ObjectId("0" * 24)]
        self.max_bucket = set()
    def find_le(self, x):
        'Find rightmost value less than x'
        i = bisect.bisect_left(self.buckets, x)
        if i:
            return i - 1
        raise ValueError
    def get_bucket(self, oid):
        i = self.find_le(oid)
        # oid is in one of lower buckets
        if i < len(self.buckets) - 1:
            return self.buckets[i], self.buckets[i + 1]
        # oid is in maximum bucket
        self.max_bucket.add(oid)
        if len(self.max_bucket) < self.size_limit:
            return self.buckets[i], max(self.max_bucket)
        # maximum bucket is oversized, start a new bucket
        self.buckets.append(max(self.max_bucket))
        self.max_bucket = set()
        return self.buckets[i], self.buckets[i + 1]
def islice_docs(docs, size):
    tot = 0
    for doc in docs:
        tot += len(doc.raw)
        yield doc
        if tot > size * 1024**2:
            return
def batch_docs(docs, size):
    sourceiter = iter(docs)
    while True:
        batchiter = islice_docs(sourceiter, size)
        try:
            yield itertools.chain([next(batchiter)], batchiter)
        except StopIteration:
            return
  def realize_raw_bson(d):
    if isinstance(d, RawBSONDocument):
        return bson.BSON(d.raw).decode()
    if isinstance(d, list):
        return [realize_raw_bson(v) for v in d]
    return d
@contextmanager
  def atomic_parquet_writer(output, *args):
      os.makedirs(os.path.dirname(output), exist_ok=True)
      with atomic_write(output, mode="wb", overwrite=True) as f:
          with pq.ParquetWriter(f, *args) as writer:
              yield writer