Storing a fully-ordered, versioned, complex data structure in DynamoDB and S3
We’ve been busy here at Simply Stated. We’re still building Omniscient XState Observability but it’s now part of a much more ambitious project. We’ll talk more about our expanded vision soon but we thought it would be fun to share some details about an interesting architecture we’ve been working on in the meantime.
We are building a collaboration product for serious work. We call the core document that users will be collaboratively working on a project. Like other collaboration apps, we want to make sure that multiple users can edit a project in real time, everyone can see each others’ edits quickly, and edits don’t conflict with each other in a user-visible way.
We settled on this set of requirements:
We stand on the shoulders of giants
A major thank you to Replicache, Figma, and, as usual, Rich Hickey, this time with Datomic for sharing their thinking about similar problem spaces.
The high-level scheme
We will take a Replicache-like approach of encoding mutations as data (similar to a redux action). On the client, we’ll compute our current state by applying all local mutations to the canonical data we last received from the server. We’ll send a batch of mutations to the server, where the server will decide how to apply them to the now-current canonical data, which may differ from what the client believed to be the canonical data when it applied those mutations. The server-side application of a particular mutation might not result in the same state as the optimistic, client-side application of that mutation and that’s ok! The mutation logic will handle our conflict resolution. After the server applies mutations, it will send the client back the new state and the ID of the latest mutation it applied. The client can then update its canonical state and apply all local mutations after the latest mutation included in the canonical state.
First, our datastore selection
Typically, we start with consistency requirements to determine our datastore. Do we need multi-key transactions, compare-and-swap semantics, etc.? Due to other architectural choices we’ll talk about later, the only consistency guarantee that we actually require of our datastore is read-after-write consistency. That is, we need at least the option of performing a read that will return the data inserted by the most recent write. And because we’re maintaining a version history, we’re actually able to run entirely append-only, so update semantics don’t matter to us.
Within the AWS ecosystem, both DynamoDB and S3 support read-your-own write consistency models. Dynamo supports a ConsistentRead option on queries and S3 (miraculously!) supports strong read-after-write consistency for all operations.
Data modeling
First, we’d like to share just a bit about the data we’re working with.
Projects–our primary versioned entity–are composed of a few scalars (things like name, description, etc. that we can restrict to ~1.5kb max, typically <250 bytes) and a few (we have 2 right now) collections where the cardinality of each collection is fairly small (hundreds would be very rare) but the items in the collections are rich, potentially medium-sized (tens to hundreds of kbs) structures. We’ll assume we have some long-lasting identifier for a particular client (an entity collaborating on a project), which we’ll call a client ID.
Before we look at our write path, let’s examine the queries that we’ll need to make.
First, let’s look at some simple approaches.
In a relational model, we could transactionally copy every record related to a project, re-write all of them for the new version, and store an ordered list of client IDs and their mutations, as they were applied.
Somewhat similarly, we could store all of the data for a project in one record, either entirely in DynamoDB, entirely in S3, or in DynamoDB but spilling out to S3 if we exceed the 400kb record size limit. Then, whenever we process an update, we can take out a lock, read everything for the project, apply the mutations, and write everything for the new project version along with a list of client IDs and mutations.
These approaches share a common issue: significant write amplification. That’s an effect where a small change to one part of one item in a collection would require writing the entire structure again. As we said, some of these structures may be large-ish and DynamoDB charges per 1kb chunk on writes. Even with compression, we would be duplicating quite a bit of repetitive data, driving up costs.
However, we would be able to fulfill all of our query needs with these approaches in any of our potential datastores in a fairly reasonable manner (S3 would appear trickier here but S3 Select would allow us to retrieve only the portion of the content that we needed). Unfortunately, while DynamoDB and S3 allow us to project only a portion of our data for queries, they charge based on the full object size.
We were concerned about the cost of DynamoDB storage and writes with so much write amplification, and were concerned about the storage costs of duplicating everything for every project version in S3.
So we decided to take a multi-tier approach to our data storage, inspired partly by git and partly by Datomic. We’ll divide our data into versioned entities and blob entities.
First, we’ll define a level of granularity at which we’ll treat data as content-addressed blobs. Above that level of granularity, we’ll maintain versioned records that point into those blobs. For us, we defined items within collections as our blob layer and everything above that as our versioned layer. So, for example, projects, and collections are monotonically versioned while collection items are content-addressed. We’ll store our blobs in S3 and our versioned entities in DynamoDB. This allows us consistently fast access to versioned entities and their metadata without suffering from the full write amplification that came along with the simple approaches we discussed above.
We store our blobs (each collection item) in S3, content-addressed, under the project namespace using a key of the form: …/{sha256(item contents)}.json.gz.
We store the project structure, with pointers to our content-addressed items, in DynamoDB, using a single-table schema to satisfy all of the queries we mentioned above. As we’ll see, in our usage, a single DynamoDB table is used more for its scaling and cost benefits than for its query benefits.
Let’s take a look at our DynamoDB data. We have four types of entities that we’ll be storing: projects, project versions, collection versions, and mutations.
First, we store our project IDs by the organization that each belongs to. This data is essentially write-once, at project creation, and supports listing the projects for an organization by querying by the partition key or retrieving the organization for a project by querying the inverted index by project ID.
Next, we have our project versions. Project versions are keyed by the project ID they belong to and have a lexicographically increasing version number. This allows us to easily query for the most recent version. Each record contains the scalars we mentioned for the project and points to the version number of each collection for the project. We also store an ordered list of mutations that were applied to produce that project version.
We already introduced our collection version numbers in our project version schema. Each new version of a collection within our project (e.g. adding or modifying an item) has a record, keyed off of the project ID and the collection name and version number (+ a sequence number in case we need to spill over multiple records due to Dynamo’s 400kb item limit, a very unlikely scenario). Each record contains a map from item ids to the hash of the item’s contents, which is its address in S3. So, if we want to lookup the items for Collection1 for project p_4567, we can first query for the latest project version (pk = “Project#p_4567” and begins_with(sk, “ProjectVersion#”), sort descending by sort key, limit 1), get the collection1Version from it, then query for the collection version (pk = “Project#p_4567” and begins_with(sk, “Collection1#c1v_000002#”)), and lookup the collection items in S3 using the provided versions. With some clever query logic, we can actually optimize out the project version lookup by taking advantage of our monotonic collection versions.
Finally, we have our mutations. We add a record for every mutation to our project, with a partition key that includes the project ID and a sort key that includes the client ID and the mutation ID, where it is the clients’ responsibility to ensure monotonically-increasing mutation IDs.
Example
So, let’s look at how a simple processor would deal with a batch of mutations for a particular project.
Imagine we start with data like this, representing a project whose name was just changed from “My project” to “My project (edited)” in the same “commit” as the contents of the “id1” item in collection 1 changed such that its hash was “abc” but is now “xyz”:
Then, one client with client ID c_client1 submits a mutation with ID “m_mut1” to modify the item with ID “id1” in collection 1 of project “p_4567”.
The processor performs the following:
1. Check if our mutation has already been applied
1a. Query for pk = “Mutation#c_client1#m_mut1” and begins_with(sk, “Project#”)
1b. We see that the mutation has not yet been applied so we continue
2. Find the current project version (we can optimize out this query depending on our query patterns)
2a. Query for pk = “Project#p_4567” and begins_with(sk, “ProjectVersion#”), sorting descending.
2b. We find this item:
3. Query the collection version to find the pointer to the data for the id1 item in S3
3a. Query for pk = “Project#p_4567” and begins_with(sk, “Collection1#c1v_000002#”)
3b. We find this item:
4. Load the id1 item from S3 with key: /o_123/projects/p_4567/Collection1/def.json.gz
5. Execute our mutation against the id1 data
6. Hash the new contents of id1 to determine the new version identifier, let’s call it newHash
7. Write id1 to S3 at key: …/{newHash}.json.gz
8. Increment the project version number (pv_000010) to find our new project version: pv_000011
9. Increment the Collection1 version (c1v_000002) to find our new Collection1 version: c1v_000003
10. Transactionally write our updates:
11. Now, we’ve stored our new id1 item in S3, wrote our new collection version, pointing to the id1 item in S3, appended our new project version, pointing to our new collection version, and wrote a record of our mutation.
Guarantees
Now let’s examine what could go wrong.
Architecture
We still have some outstanding issues that our datastore structure left unaddressed. Specifically, we need to ensure that mutations are retried, we need those retries to remain ordered across retries and new mutations coming from the same client for the same project, and we need to ensure that only one processor is applying mutations to any given project at any given time.
To satisfy those requirements, we will use an SQS FIFO queue with each message specifying the project ID as its MessageGroupID. We will configure a lambda processor for our queue, fulfilling our serverless goal.
Let’s ensure that we’ve addressed our outstanding issues:
The client side
We’ll cover the client side of our sync solution in another post. Stay tuned!