Storage Developer Conference - #159: Amazon Aurora Storage – Purpose Built Storage for Databases
Episode Date: January 4, 2022...
Transcript
Discussion (0)
Hello, everybody. Mark Carlson here, SNEA Technical Council Co-Chair. Welcome to the
SDC Podcast. Every week, the SDC Podcast presents important technical topics to the storage
developer community. Each episode is hand-selected by the SNEA Technical Council from the presentations at our annual Storage
Developer Conference. The link to the slides is available in the show notes at snea.org
slash podcasts. You are listening to STC Podcast, Episode 159.
Hello, everyone. I'm Murali Bhramadesam, Director of Engineering, Amazon Aurora.
Today I'm going to talk about a purpose-built storage for databases that powers Amazon Aurora.
First I'll talk about what Amazon Aurora is.
To understand the distributed purpose-built storage that we built for Aurora,
it would be good to understand a little bit of the internals of the database
and the motivation for building Aurora.
And I'm going to talk briefly about how we made it cloud native,
how we took advantage of the building for cloud
than the traditional database architecture,
how we achieve scalability, durability at a large scale that we operate in,
and what are the performance results.
So let's jump in. What is Amazon Aurora? So Amazon Aurora is an enterprise class cloud-native database. Our customers wanted compatibility to MySQL and PostgreSQL,
but then they realized that it doesn't have the enterprise quality features like scalability
for 128 terabytes size, the performance, the throughput, all of that, that they enjoyed
with commercial databases. So essentially they asked us to build an open source compatible
database for MySQL and PostgreSQL, which enterprise quality features. And that's what we did.
And we made it with a simple pay-as-you-go pricing and delivered it as a
managed service.
What I mean by managed service is that customers do not have to manage the
database versions, do not have to manage the database instances,
do not have to manage the distributed storage and so forth.
So you come to AWS, we give database as a service that you can quickly onboard and use.
So as I said, let's do a quick recap of database internals that would allow us to understand
why we built a distributed storage system purpose built for databases.
So this is just a simple explanation of databases. Databases use primarily B plus trees for storing
the data for customers. So let's imagine that a customer has created a table with a few columns,
say four columns, that data has to be stored somewhere organized and stored in memory.
And the data structure that is used is B plus tree.
And in this case, you can see there is a root page,
a couple of intermediate nodes, and the data pages that are the leaf pages.
Data is organized in fixed-sized pages.
Typically, in MySQL, we use 16 kilobytes size.
And the data that is resident in memory, all these pages put together, it's referred as buffer pool.
So when you create a table, these columns are to be stored, the customer data has to be stored, and those are serialized into the leaf pages and accessed through this tree
and this could be sorted by the primary key for example so all of these are resident in memory
so then you would question that what happens if you insert a new row so if you are inserting a
new row the database walks this tree figures out which page has to be modified and inserts a row. If you're modifying
an existing row, similarly, the appropriate page that contains that row is identified and it is
modified. And we all know that the main memory may not be enough for storing the whole database,
in which case it has to be read from the storage. So you have main memory,
which stores data in buffer pool, which are accessed. And there are cold data that is in
the storage that is brought back to memory as needed. And that means that we need to also
evict some pages of the main memory to the storage as needed. And to achieve those things,
these database pages are to be written into storage.
And that is done periodically by database engine.
And that is referred to typically as checkbox.
So in this example,
I just show that these database pages are stored
in some storage in some order.
And this is done periodically.
So if the database has to bring back a VIX page, then it has to bring it back in.
It knows where to go read it from.
So this is how the storage is done for databases.
And we need to talk about interesting protocol that the database uses for updating the in-memory buffer buffers.
So let's take an example. So you have a old state. This is a page. Let's call it a green page.
The green page has a row and the customer is updating a particular column.
Database comes to the page, identified that a column needs to be replaced to be modified
as intended by the customer and that is done by an operation called as do essentially we are doing
an operation to this page to achieve a new state so a column value is changed a new value is put
in place and this is scribbled in memory. So the buffer pool is updated.
And to handle any failures,
we need to remember the before and after images of the data.
So the before value of the column and the after value of the column
is stored durably in a place called as a log stream.
It's called as a write ahead log.
Essentially every update by the
customer needs to be durably persisted in the log record so that we can retrieve it when needed.
Okay. And we'll talk about why we store write log ahead and not always write the pages because the
pages are big in size. So we cannot always write for efficiency sake. So we
always write the log record because that is a minimum amount of data that we need to durably
store. But for efficiency sake, we'll checkpoint periodically, right? So in case we have to restart
the database engine, the checkpoint has a old state. the log record is the committed transaction is there in the log
sequence and we take the log record, we need to reapply that log record to get the new state.
Because it's committed data, we cannot forget the change by the customer. Sometimes we have to roll
back a particular state because your transaction is rolled back, for example. customer. Sometimes we have to roll back a particular state
because a transaction is rolled back, for example.
In which case we have a new state,
we have the log record,
we need to undo that operation to get the old state.
So this is just a mechanism
that the database engine uses
to manage the data that is stored in the storage.
And the main memory and the storage are always in a different point in time,
contains different value, and we need to synchronize them using this protocol.
So let's take an example.
So there is a transaction TX1 represented in red color.
A customer might be updating, inserting rows,
updating some columns in existing rows and so forth.
For all of those changes,
the log records are produced as I explained before
and they are durably stored.
They're durably stored in the storage.
At time TC, just before that transaction T2 started,
and the logs are being written to the durable storage,
the checkpoint was initiated,
which means that the modified pages in memory
needs to be flushed to the durable storage.
So in this particular case,
the red transaction was committed and it is being flushed. So the durable storage contains the page
that has the modification done by the transaction TX1. It still doesn't have the changes done by
the TX2. And there is another transaction TX3 that was done, committed.
So now we have the log records for both the blue and green in the durable storage.
Transaction TX4 was ongoing.
At that time, a system failure happened at TF.
So the log records are partially written to the durable storage and the system failure it
could be database engine restart a machine failure a hardware failure it could have been any one of
those so now the database when it is restarted we need to bring back the database to the state that
was before the system failure essentially the database should know about the transaction one, two, and three,
and transaction four should be rolled back.
So how do we achieve it, right?
The durable storage has the correct version
of the red transaction, the TX1,
but it doesn't have the data from TX2, TX3, right?
So what we do is we use the protocol
that I just explained before, do, redo, undo protocol.
So we read the checkpoint.
We look at the logs sequences.
We know that the checkpoint was done at this particular time.
And we see that a blue page has been modified.
So the previous version of the blue page from the previous checkpoint is read back. And this log record is applied back in and we get the new change of the blue page.
And we do the same thing for the green page.
Then we see that the transaction TX4 is not yet committed.
So there are some log records and it has to be rolled back in.
So that log record is undone from the database
and the pages are not modified.
So this is how the transaction processing
with respect to storage is done.
So now let's look at the cost of this IO.
So for every change that customer is doing,
log records are created. These are typically small in size. So for every change that customer is doing, log records are created.
These are typically small in size. So if a column is updated, the before and after image with some
metadata has to be stored durably in the storage. So you can imagine it as tens of bytes or hundreds
of bytes. It's not usually big. It all depends on the schema, but usually the logs are much smaller. Whenever we write the checkpoint, we need to worry about what we call the torn page issue.
Essentially, when a 16 kilobytes of page is serialized into the storage,
the underlying device may not support atomic writes for this size,
in which case, if there is a system failure during the write operation,
the 16 kilobytes of data may not be actually durable,
only partially it might be durable.
So that's why it's called a staunch page.
So to avoid that problem, typically databases do double writes
so that we can always get one complete version of a page and we can move forward
right so what we see this arrive at is a database the user modified a small amount of data in the
database that resulted in a write inflation so for the double writes is 16k at a time so there's 32
kilobytes at a time.
And then there is absolutely what we need to write is the log records.
And this checkpoint is causing a lot of IO to be done.
Of course, the database implementation, all of the implementations know that it is very costly.
So they amortize it. So they don't checkpoint that often.
They accumulate the logs and checkpoint periodically. But you can see that the performance of the database is all dependent on how efficiently you do IO.
So let's talk about what we can do in the cloud.
Before that, we need to understand how the traditional architecture look like.
So in this diagram, I show that there is a compute instance. This is the
database instance, which does the SQL data processing. It does transaction support. It does
caching, which is the buffer pooling and result caching. Anything that is related to main memory
is done by this layer. And then the logging, which is essentially durably persisting the data to
the storage.
So in this architecture, you can see that to achieve scalability, typical idea is to
increase the IO bandwidth.
So you put better hardware, a lot more devices, user storage that can scale.
It's all direct attached and so forth.
So you make this pipe wider and you also make it efficient,
reduce the number of IOs.
So this is the typical technique that the database development has done to
achieve performance. What we did in Aurora,
we took a tenant that log is the database.
I'll explain what we mean by that.
So let's imagine there is a log stream from the beginning of the time of the database from time zero.
So there are changes to the red pages, blue page, and so forth.
So the logs are accumulated.
With this log stream at any given point in time, we should be able to recreate the particular database page
version, right? So let's take an example. If you want to know the current version of the database
page at time T5 for the red page, you have to take the log record from T1 and T5, apply them together, and get the version of the page.
So we consider log as the database.
That means that you can always get the current version
of every page by processing the log stream.
So the most important part to be durable is the log.
There are, of course, efficiency problems in this statement, and I'll explain how we overcome that.
The first one is relying only on the log stream for pages is not practical.
So say you run the database for n number of hours, days, and so forth.
We cannot always process all the logs to get the particular version or the current version of the database page.
So you need to do slightly better.
And the solution is checkpoint.
So now you might ask, this is what traditional databases always did.
So what's the difference, right?
So the database instance was burdened by checkpointing task.
It needs CPU.
It needs to do a lot of IO and so forth.
That was the bottleneck, right?
So what we did was we said,
okay, let's take that burden off the database instance and push it to a distributed storage
fleet that understands this log stream and does continuous checkpointing. So essentially,
instead of the database instance doing the checkpointing, a distributed storage fleet
can do the checkpointing using the logs streams.
So now it's a completely parallelizable problem.
The database sends only the log records to the distributed storage and the distributed storage can continuously checkpoint how much ever frequently it has to be done.
Of course, durably persisting the log is the key part because log is the database in our approach. So with this approach, what we are able to do is we can separate out compute and storage.
And what we can see is that naturally the compute and storage behaves differently.
So compute instances are ephemeral.
It can be replaced. It can fail.
Customer can shut it down to save cost. If you're not using the database, you can shut it down.
You can also make it cost efficient by scaling up and down only based on the needs of the load.
But the storage operates in a different dimension. It has to be long lived.
It has to be durably present.
Even if the compute is not present, the data has to be there.
So we need to achieve scalability, availability, and durability.
But this can be independently achieved.
So compute can have its own dimension of scalability.
And the storage can have its own dimension of scalability and the storage can have its own dimension of scalability.
So we have two different components that can be worked upon differently.
So in Aurora, what we came to conclusion and built was a log structure distributed storage system.
This storage system is purpose-built for databases. Essentially, this storage system understands log records,
database pages, formats, versions, and so forth.
So the architecture became like this.
So you have SQL data processing, you have transactions,
you have caching, and the lower part,
which was the logging layer, is changed.
You have a small client library that understands this distributed system,
but most of the activity happens on the distributed storage system.
And you can imagine that once the lower half is done,
we would have had to build a lot of systems to make this happen.
And we took advantage of the services that are already built in a scalable
fashion in cloud, like Amazon dynamo db we use
that for our metadata so to figure out which databases contain what type of data how it is
spread on a distributed storage all that metadata is stored in the dynamo db we use route 53 which
is a dns layer we use ec. We use all the instances that are available
for any customer, AWS customer. We are not using any special hardware. And we use Amazon S3,
which is the blob storage for long-term retention of our backup. The data itself is stored in NVMe SSDs on a distributed fleet that we built.
So let's dive in to the depths of what happens within a storage node.
So in this diagram, you can see there is a database instance,
and it is going to send some data to the storage node.
And the storage node itself has peers that can be used for redundancy and for replication and so forth. So let's see how database uses the distributed storage. So when a customer modifies a database by inserting,
deleting, updating the data in the database, the database instance generates log records.
It sends it to a storage node. The storage node sorts them, keeps them in an in-memory queue,
and then immediately purses on the NVMe storage, and we call it hotlog.
And once it is durably persisted, we acknowledge to the database instance.
From that point onwards, the database knows that the data is durably persisted.
It can proceed with its other activities. Within the storage, now the log is sorted.
And if there is a gap within the log, for whatever reason,
there could be a network issue or the database,
the storage might have been restarting and so forth.
A number of reasons why there could be gaps within the logs,
in which case the storage nodes know
to talk to its peers to pull the data in to fill in the gaps. Once the gaps are filled,
the storage node continuously applies these log records to the previous versions of the data pages
and persists them. And that processing is called a coalescing.
This is a continuous checkpointing that I talked about.
So it periodically takes these data from the hotlog and then applies those log records to the previous versions of the data pages.
And this is done independently from the database instance.
And we continuously backup this data. So all the data from the log records
are continuously sent to S3 for long-term retention
and customer can restore at any point in time
at a log record granularity.
And for efficiency sake,
we are copying the database pages periodically also.
And because we are continuously checkpointing, there are multiple versions of the database pages periodically also. And because we are continuously checkpointing,
there are multiple versions of the database pages.
So we garbage collect them for storage efficiency.
And for correctness purpose,
we also scrub all the pieces of data
to make sure that there is no corruption.
We use checksums.
So from this diagram,
you can see that the hot path
or the data path that the database instance is involved in
is a very short path.
It sends the log records.
It is stored in memory and immediately persisted
and then acknowledged.
All the other operations which can be long running
are all asynchronously done,
and it doesn't affect the database write performance.
Now that we know what happens within a storage node,
let's talk about the distributed system.
When you have a distributed system with hundreds or thousands of storage nodes
that is taking part in giving storage for a database,
we know that there could be independent failures
that you cannot control.
There could be failing nodes, devices can fail,
disks can fail, switches can fail, and so forth.
So how does a typical system handle these type of failures?
So the solution is replication.
So instead of storing one copy of data,
store it multiple times. The common approach is replicated three ways and do a write and read
quorum of two out of three. Okay. So in this example, what I call about AZ is availability
zone. Availability zone is a AWS concept where within a geographical zone, there are fall domains called as availability zone.
And within each availability zones,
imagine that there are thousands of storage nodes
that are present.
So for a better system,
instead of putting three copies
within the same availability zone,
we can spread that across three availability zones,
make one copy per AZ,
and then do the write-corum,
write and read-corum of two out of three.
So this is a good system, but it has its own issues.
Let's talk about AZ failure.
So the reason we have multiple fault domains
called as availability zone is that AZ can fail
for whatever reason
a power failure a network failure and whatnot so in the system which does two out of three quorum
how does it handle az failure right so if one az goes down we know that it had only one copy of
the data that means there are two availability zones each with one copy of the data, that means there are two availability zones, each with one copy of data.
So we can still do two out of three copies.
We can establish quorum.
There is no data loss.
It works, right?
What happens if there is an AZ plus one failure?
What we mean by plus one failure
is that within an availability zone,
there are thousands of nodes
that take part in providing storage.
And there is a non-zero probability that a particular storage node within thousands of nodes is down for whatever reason.
When that happens, the volume is not operational anymore because the AZ lost one copy.
And now another copy is lost.
So we are left with only one copy right so if you do two out of
three quorum you lose the quorum there's chance that you have also lost data right so how do we
overcome that in aurora in aurora we do six-way replication with two copies per availability zone
and instead of two out of three quorum we use four out of six quorum
technique so let's go through the failure cases what happens if the availability zone fails
if two co if availability zone fails two copies of data are down but we still have four copies
left two per the remaining standing availability zones. So we still maintain right availability.
What happens if AZ plus one fails
wherein one of the storage nodes
in one of the availability zone fails
while an AZ is already down?
We still have three copies of data.
There is no data loss.
Of course, there is right availability loss
in which case we quickly rebuild
the failed copy in one of the availability zones from the remaining copies and recover the write availability.
So there is no data loss. There is a brief period of write loss.
So this approach is much more durable than the two out of three logic.
And this is why we did the six way replication.
So now we can ask, OK, you have a distributed storage.
You have thousands of nodes.
How do you spread the data?
So we take this partition, the volume into fixed size segments,
and each segment is replicated six ways.
That's the first level of split.
So now the question is how many segments should we create
if we create smaller segments the failures are more likely if we create too big of a
segment the repairs are going to take too long so we had to balance between the number of segments
and the size of the segment and what we found was that with 10 g size, you know, it's a good number we picked.
We were able to repair in less than a minute and this was acceptable to us.
So whenever there is a failure, if you can repair within a minute,
then the system continues to operate. Remember that the data,
the storage operates at a four out of six quorum.
So if there is one failure, the customer does not see it.
The database continues to operating with the available five segments.
So it does four out of five until the repair finishes.
So I'll briefly talk about how we deal with the replication metadata.
So let's take an example.
So let's imagine that one segment is replicated six ways
on the six machines that are shown here, A to F.
We call this the quorum set and we version it.
So there is an APOC one with all the nodes healthy.
At some point, the monitoring system finds that the machine F is failing.
So it is marking it suspect.
And it found a replacement node to take the data and mark it G.
And what we do is we create a concept of a dual quorum wherein there is the current quorum and the future quorum.
Both are used for the quorums by the database instance.
That means that in this particular case,
we don't immediately promote G as the future quorum,
as the current quorum.
We still keep both.
And we start writing to both the quorums. So we
and we achieve four out of six. So in this particular case because five nodes are overlapping
between these two quorums we will achieve four out of six on both. In this case it might be four
out of five. In this case it might be four out of six. So G is operational immediately while it is
replicating the data from the remaining peers. And once the monitoring system has completely
found out that this suspect state is actually failed state, we mark it failed, and then we
remove that quorum set wherein a garbage collection system would go and delete the segment from F
if that mission is still around or just clean up the metadata.
And we promote the future quorum to be the current quorum.
So the key part here is that this whole logic allows us to move forward
and backward without losing any data.
For example, if there was no physical failure of the node,
but we had to do heat management,
so F was just hot and we had to move some data off to G,
so we can do this by creating the dual quorum
and the database will start writing to G
while G is catching up from its peer.
But we can also change our mind.
So say during this replication, G might fail,
in which case we might say, okay, a hot node is okay.
We'll replicate to a new node, call us Hedge,
in which case we just abandon this whole quorum set
and then we revert back to the original quorum
and then create a new quorum.
So it allows us to move forward and backward
without losing any data.
The database continues to work seamlessly.
So this is what I meant by compute and storage separation.
We can independently deal with the systems problems
and challenges and scalability appropriately
without impacting the database.
So let's look at the performance results.
In this diagram, I show a typical traditional MySQL implementation with better availability
across two availability zones.
In this particular case, there is a primary instance that is installed in availability
zone one, and all the data is stored in this particular case,
a blocked device storage system
called as Elastic Block Service.
The data is written to this EBS volume
and the EBS volume itself mirrors it for durability.
So the database writes are done in sequentially
in this system.
And then the data is read back and sent to a different replica in a different availability zone.
And the same thing happens on a replicated block storage.
And in this particular type of setup, when we run a popular MySQL benchmark called a sysbench for 30 minutes, we are able to achieve 780,000 transactions overall.
And the average IO we saw was 7.4 IOs per transaction.
Let's look at what happens in Aurora.
In case of Aurora, the architecture is much different.
We have a primary instance that does not have direct attached storage.
It is connected to a distributed storage,
as I explained before,
across three availability zones,
and it does four out of six quorum.
So when the data is modified by the customer,
it is sent to this distributed storage in parallel.
And then there is multiple read replicas
that are attached to the writer instance, which can also read from this common storage.
That's why we call it multi-attached storage.
So the database instance is sending data to the storage and also to the read replica in parallel.
And the read replicas are capable of serving read traffic.
So now you can see that
we are sending only log records. We are not doing check pointing.
So obviously we are doing much less IO.
And in the same benchmark, which is run for 30 minutes,
we are able to achieve 27 million transactions.
That is 35 times more.
And we are doing only 0.95 IOs per transaction.
Whereas in the traditional system, we were doing 7.95 IOs per transaction, whereas in the traditional system,
we were doing 7.4 IOs per transaction.
So it's less IO, better performance,
and we are able to push down a bunch of work
to the distributed storage.
In terms of the throughput,
Aurora is up to five times faster than standard MySQL databases.
So in this case, in the left diagram, I show the right throughput, which is essentially the number of write statements on a sysbench workload.
The first one is Aurora MySQL 5.7 and Aurora MySQL 5.6 is the next graph. And the comparison of the traditional databases
set up on MySQL 5.6 and 5.7 is much lower.
So similar difference happens in the read throughput.
We are more than 3x faster in 5.7 and 5.6
number of read statements that we are able to do.
So in this case, we ran with two tables, 200,000 rows
on a particular database instance type
called as R416 Excel in EC2.
So this was a short presentation,
but we have described all these details
and much more that you might find interesting in two papers that we presented in 2017 and 2018.
In SIGMOD, one talks about the architecture and the design considerations that I briefly talked about today, why we had to build a purpose-built storage and how it worked. And the second paper talks about the need for doing the membership
changes the way we did, how we do not do consensus for IOs, commits. I think these papers are
very much interesting, so I highly recommend. Thanks for listening. If you have questions
about the material presented in this podcast, be sure and join our developers mailing
list by sending an email to developers-subscribe at sneha.org. Here you can ask questions and
discuss this topic further with your peers in the storage developer community. For additional information about the Storage Developer Conference,
visit www.storagedeveloper.org.