TLDR: Row stores are fast to write but slow to read. Column stores are fast to read but slow to write. Load data from Mongo into Parquet files for fast querying using AWS Athena.
MongoDB is a schema-less NoSQL document store that uses a JSON-like format for each document. Each document in Mongo WiredTiger is stored as a contiguous binary blob, which makes our MongoDB instance a row store. AWS Athena is a SaaS offering by Amazon that queries files on S3 using Presto, a distributed SQL query engine that can query high-performance columnar formats like Parquet and ORC.
Here at Scale we use MongoDB backed by WiredTiger as our primary data store. This blog post describes why and how we load data from MongoDB into AWS Athena using Parquet files in order to get fast ad-hoc analytics queries on our data.
Row stores and Column stores
Suppose you want to get the sum of a single column(field) over all the rows(documents) in a table(collection).
With a Row Store, without an index, to get a field you need to fetch the document, and to get the same field for all documents you would have to essentially load the entire table from disk. All the columns in your table (even columns that do not appear in your query) get fetched as well, which is slow:
You can speed up this query by building a covering index. However, each index you add increases disk usage and slows down inserts. Also, when doing data exploration it can be difficult to predict what indices you need ahead of time.
With a Column Store, the query is much more efficient. A column store puts data from the same column together. This would allow us to just retrieve the columns we are filtering / aggregating over when doing queries. This allows us to get good performance on ad-hoc queries, without indices:
Column stores, however, make it very difficult to update a single record - due to the way data is arranged you have to move all of the data to change one record. Also, column stores require us to commit to a schema, to define the columns ahead of time.
Trading off read and write performance
The slowness of column-store writes is related to the size of the table, because you need to rewrite the whole table to change a single record. By representing each row-store table as multiple column-store tables, we can get the column store to have faster writes at the expense of slowing down its reads.
For our implementation, this means we map each Mongo collection to multiple Parquet files (each file actually contains multiple row groups, with each row group being column-major, but we use files as a unit for convenience), and refresh all the documents in a Parquet file whenever any one of them changes. We use smaller file sizes than what would be optimal for reads, in order to make writes faster and reduce replication lag.
When updating the column store from reading the row store, we need to
- Figure out which records are stale
- Update those records efficiently
To determine staleness, we need to know the identity of the document and the time of modification. This data is obtained by reading the Mongo oplog, a data structure that represents all inserts/updates/deletes to the database chronologically, available on Mongo replica sets for internal use in replication from primary to secondary nodes within the cluster.
For efficiency, we want to ensure that documents to be modified belong in as few files as possible. We achieve this by grouping records by their Mongo ObjectIDs. We are making use of the fact that, like in Generational Garbage Collection, young documents tend to get modified more than old ones. We maintain an index recording the ObjectID ranges of each file, starting a new file whenever the newest bucket is filled with more than N documents.
1class Buckets(object): 2 def __init__(self, size_limit): 3 self.size_limit = size_limit 4 self.buckets = [ObjectId("0" * 24)] 5 self.max_bucket = set() 6 7 def find_le(self, x): 8 'Find rightmost value less than x' 9 i = bisect.bisect_left(self.buckets, x) 10 if i: 11 return i - 1 12 raise ValueError 13 14 def get_bucket(self, oid): 15 i = self.find_le(oid) 16 17 # oid is in one of lower buckets 18 if i < len(self.buckets) - 1: 19 return self.buckets[i], self.buckets[i + 1] 20 21 # oid is in maximum bucket 22 self.max_bucket.add(oid) 23 24 if len(self.max_bucket) < self.size_limit: 25 return self.buckets[i], max(self.max_bucket) 26 27 # maximum bucket is oversized, start a new bucket 28 self.buckets.append(max(self.max_bucket)) 29 self.max_bucket = set() 30 return self.buckets[i], self.buckets[i + 1]
- The scripts that read from mongo and create parquet files are written in Python and use the pyarrow library to write Parquet files. The first version implemented a filter-and-append strategy for updating Parquet files, which works faster than overwriting the entire file. This proved to be a mistake as pyarrow (8.9.0) support for reading is much less mature than that for writing. We would occasionally lose data when reading rows for rewriting into the new parquet file, and eventually gave up and went to an implementation that never read from the Parquet files.
- Each Parquet file is made of row groups, and there is a size limit for each row group. We batch documents by size to avoid this problem.
1def islice_docs(docs, size): 2 tot = 0 3 for doc in docs: 4 tot += len(doc.raw) 5 yield doc 6 if tot > size * 1024**2: 7 return 8 9def batch_docs(docs, size): 10 sourceiter = iter(docs) 11 while True: 12 batchiter = islice_docs(sourceiter, size) 13 try: 14 yield itertools.chain([next(batchiter)], batchiter) 15 except StopIteration: 16 return
- When using pymongo, use RawBSONDocument to defer parsing. In the case of multiple processes writing different Parquet files, this means you avoid having parsing happen in the serial part of your program. You can selectively parse sub-documents using bson.BSON
1 def realize_raw_bson(d): 2 if isinstance(d, RawBSONDocument): 3 return bson.BSON(d.raw).decode() 4 if isinstance(d, list): 5 return [realize_raw_bson(v) for v in d] 6 return d
- Use atomicwrites to make writing a new Parquet file all-or-nothing
1@contextmanager 2 def atomic_parquet_writer(output, *args): 3 os.makedirs(os.path.dirname(output), exist_ok=True) 4 with atomic_write(output, mode="wb", overwrite=True) as f: 5 with pq.ParquetWriter(f, *args) as writer: 6 yield writer
- On top of extracting explicitly enumerated fields, put the whole json in a column by itself for querying fields not in the schema
- For reading the Parquet files locally using SQL, Apache Drill is very convenient - just untar and run drill-embedded
- After updating a parquet file, it has to be copied to S3 (using sync, not cp). Using EC2 is essential here because we are constantly copying files from the server to S3, and the bandwidth costs would be prohibitive otherwise - traffic between EC2 and S3 in the same region is free. After copying the file to S3, use AWS Glue to discover the schema from the files - call Glue only when the schema changes, AWS charges you each time you call it.
- The oplog contains all the information you need to update the Parquet files. We run an instance of mongo on the same machine, and maintain a one-node slave as it were by using the applyOps command on oplog entries read from the main cluster. Beware, however - Mongo developers might remove the docs for applyOps completely in the future.