TLDR: Row stores are fast to write but slow to read. Column stores are fast to
read but slow to write. Load data from Mongo into Parquet files for fast
querying using AWS Athena.
MongoDB is a schema-less NoSQL document store that uses a JSON-like format for
each document. Each document in Mongo WiredTiger is stored as a contiguous
binary blob, which makes our MongoDB instance a row store. AWS Athena is a
SaaS offering by Amazon that queries files on S3 using Presto, a distributed
SQL query engine that can query high-performance columnar formats like Parquet
and ORC.
Here at Scale we use MongoDB backed by WiredTiger as our primary data store.
This blog post describes why and how we load data from MongoDB into AWS Athena
using Parquet files in order to get fast ad-hoc analytics queries on our data.
Row stores and Column stores
Suppose you want to get the sum of a single column(field) over all the
rows(documents) in a table(collection).
With a Row Store, without an index, to get a field you need to fetch the
document, and to get the same field for all documents you would have to
essentially load the entire table from disk. All the columns in your table
(even columns that do not appear in your query) get fetched as well, which is
slow:
You can speed up this query by building a covering index. However, each index
you add increases disk usage and slows down inserts. Also, when doing data
exploration it can be difficult to predict what indices you need ahead of
time.
With a Column Store, the query is much more efficient. A column store puts
data from the same column together. This would allow us to just retrieve the
columns we are filtering / aggregating over when doing queries. This allows us
to get good performance on ad-hoc queries, without indices:
Column stores, however, make it very difficult to update a single record - due
to the way data is arranged you have to move all of the data to change one
record. Also, column stores require us to commit to a schema, to define the
columns ahead of time.
Trading off read and write performance
The slowness of column-store writes is related to the size of the table,
because you need to rewrite the whole table to change a single record. By
representing each row-store table as multiple column-store tables, we can get
the column store to have faster writes at the expense of slowing down its
reads.
For our implementation, this means we map each Mongo collection to multiple
Parquet files (each file actually contains multiple row groups, with each row
group being column-major, but we use files as a unit for convenience), and
refresh all the documents in a Parquet file whenever any one of them changes.
We use smaller file sizes than what would be optimal for reads, in order to
make writes faster and reduce replication lag.
Cache Invalidation
When updating the column store from reading the row store, we need to
- Figure out which records are stale
- Update those records efficiently
To determine staleness, we need to know the identity of the document and the
time of modification. This data is obtained by reading the Mongo oplog, a data
structure that represents all inserts/updates/deletes to the database
chronologically, available on Mongo replica sets for internal use in
replication from primary to secondary nodes within the cluster.
For efficiency, we want to ensure that documents to be modified belong in as
few files as possible. We achieve this by grouping records by their Mongo
ObjectIDs. We are making use of the fact that, like in
Generational Garbage Collection, young documents tend to get modified more than old ones. We maintain an
index recording the ObjectID ranges of each file, starting a new file whenever
the newest bucket is filled with more than N documents.
class Buckets(object): def __init__(self, size_limit): self.size_limit = size_limit self.buckets = [ObjectId("0" * 24)] self.max_bucket = set() def find_le(self, x): 'Find rightmost value less than x' i = bisect.bisect_left(self.buckets, x) if i: return i - 1 raise ValueError def get_bucket(self, oid): i = self.find_le(oid) # oid is in one of lower buckets if i < len(self.buckets) - 1: return self.buckets[i], self.buckets[i + 1] # oid is in maximum bucket self.max_bucket.add(oid) if len(self.max_bucket) < self.size_limit: return self.buckets[i], max(self.max_bucket) # maximum bucket is oversized, start a new bucket self.buckets.append(max(self.max_bucket)) self.max_bucket = set() return self.buckets[i], self.buckets[i + 1]
Implementation Details
- The scripts that read from mongo and create parquet files are written in
- Python and use the
- pyarrow library to write
- Parquet files. The first version implemented a filter-and-append strategy
- for updating Parquet files, which works faster than overwriting the entire
- file. This proved to be a mistake as pyarrow (8.9.0) support for reading is
- much less mature than that for writing. We would occasionally lose data when
- reading rows for rewriting into the new parquet file, and eventually gave up
- and went to an implementation that never read from the Parquet files.
- Each Parquet file is made of row groups, and there is a size limit for each
- row group. We batch documents by size to avoid this problem.
def islice_docs(docs, size): tot = 0 for doc in docs: tot += len(doc.raw) yield doc if tot > size * 1024**2: return def batch_docs(docs, size): sourceiter = iter(docs) while True: batchiter = islice_docs(sourceiter, size) try: yield itertools.chain([next(batchiter)], batchiter) except StopIteration: return
- When using pymongo, use RawBSONDocument to defer parsing. In the case of
- multiple processes writing different Parquet files, this means you avoid
- having parsing happen in the serial part of your program. You can
- selectively parse sub-documents using bson.BSON
def realize_raw_bson(d): if isinstance(d, RawBSONDocument): return bson.BSON(d.raw).decode() if isinstance(d, list): return [realize_raw_bson(v) for v in d] return d
- Use atomicwrites to
- make writing a new Parquet file all-or-nothing
@contextmanager def atomic_parquet_writer(output, *args): os.makedirs(os.path.dirname(output), exist_ok=True) with atomic_write(output, mode="wb", overwrite=True) as f: with pq.ParquetWriter(f, *args) as writer: yield writer
- On top of extracting explicitly enumerated fields, put the whole json in a
- column by itself for querying fields not in the schema
- For reading the Parquet files locally using SQL,
- Apache Drill is very convenient -
- just untar and run drill-embedded
- After updating a parquet file, it has to be copied to S3 (using sync, not
- cp). Using EC2 is essential here because we are constantly copying files
- from the server to S3, and the bandwidth costs would be prohibitive
- otherwise - traffic between EC2 and S3 in the same region is free. After
- copying the file to S3, use AWS Glue to discover the schema from the files -
- call Glue only when the schema changes, AWS charges you each time you call
- it.
- The oplog contains all the information you need to update the Parquet files.
- We run an instance of mongo on the same machine, and maintain a one-node
- slave as it were by using the applyOps command on oplog entries read from
- the main cluster. Beware, however - Mongo developers might
- remove the docs
- for applyOps completely in the future.