Storage Developer Conference - #159: Amazon Aurora Storage – Purpose Built Storage for Databases

Episode Date: January 4, 2022

...

Transcript
Discussion (0)
Starting point is 00:00:00 Hello, everybody. Mark Carlson here, SNEA Technical Council Co-Chair. Welcome to the SDC Podcast. Every week, the SDC Podcast presents important technical topics to the storage developer community. Each episode is hand-selected by the SNEA Technical Council from the presentations at our annual Storage Developer Conference. The link to the slides is available in the show notes at snea.org slash podcasts. You are listening to STC Podcast, Episode 159. Hello, everyone. I'm Murali Bhramadesam, Director of Engineering, Amazon Aurora. Today I'm going to talk about a purpose-built storage for databases that powers Amazon Aurora. First I'll talk about what Amazon Aurora is.
Starting point is 00:00:58 To understand the distributed purpose-built storage that we built for Aurora, it would be good to understand a little bit of the internals of the database and the motivation for building Aurora. And I'm going to talk briefly about how we made it cloud native, how we took advantage of the building for cloud than the traditional database architecture, how we achieve scalability, durability at a large scale that we operate in, and what are the performance results.
Starting point is 00:01:34 So let's jump in. What is Amazon Aurora? So Amazon Aurora is an enterprise class cloud-native database. Our customers wanted compatibility to MySQL and PostgreSQL, but then they realized that it doesn't have the enterprise quality features like scalability for 128 terabytes size, the performance, the throughput, all of that, that they enjoyed with commercial databases. So essentially they asked us to build an open source compatible database for MySQL and PostgreSQL, which enterprise quality features. And that's what we did. And we made it with a simple pay-as-you-go pricing and delivered it as a managed service. What I mean by managed service is that customers do not have to manage the
Starting point is 00:02:17 database versions, do not have to manage the database instances, do not have to manage the distributed storage and so forth. So you come to AWS, we give database as a service that you can quickly onboard and use. So as I said, let's do a quick recap of database internals that would allow us to understand why we built a distributed storage system purpose built for databases. So this is just a simple explanation of databases. Databases use primarily B plus trees for storing the data for customers. So let's imagine that a customer has created a table with a few columns, say four columns, that data has to be stored somewhere organized and stored in memory.
Starting point is 00:03:08 And the data structure that is used is B plus tree. And in this case, you can see there is a root page, a couple of intermediate nodes, and the data pages that are the leaf pages. Data is organized in fixed-sized pages. Typically, in MySQL, we use 16 kilobytes size. And the data that is resident in memory, all these pages put together, it's referred as buffer pool. So when you create a table, these columns are to be stored, the customer data has to be stored, and those are serialized into the leaf pages and accessed through this tree and this could be sorted by the primary key for example so all of these are resident in memory
Starting point is 00:03:51 so then you would question that what happens if you insert a new row so if you are inserting a new row the database walks this tree figures out which page has to be modified and inserts a row. If you're modifying an existing row, similarly, the appropriate page that contains that row is identified and it is modified. And we all know that the main memory may not be enough for storing the whole database, in which case it has to be read from the storage. So you have main memory, which stores data in buffer pool, which are accessed. And there are cold data that is in the storage that is brought back to memory as needed. And that means that we need to also evict some pages of the main memory to the storage as needed. And to achieve those things,
Starting point is 00:04:46 these database pages are to be written into storage. And that is done periodically by database engine. And that is referred to typically as checkbox. So in this example, I just show that these database pages are stored in some storage in some order. And this is done periodically. So if the database has to bring back a VIX page, then it has to bring it back in.
Starting point is 00:05:11 It knows where to go read it from. So this is how the storage is done for databases. And we need to talk about interesting protocol that the database uses for updating the in-memory buffer buffers. So let's take an example. So you have a old state. This is a page. Let's call it a green page. The green page has a row and the customer is updating a particular column. Database comes to the page, identified that a column needs to be replaced to be modified as intended by the customer and that is done by an operation called as do essentially we are doing an operation to this page to achieve a new state so a column value is changed a new value is put
Starting point is 00:06:00 in place and this is scribbled in memory. So the buffer pool is updated. And to handle any failures, we need to remember the before and after images of the data. So the before value of the column and the after value of the column is stored durably in a place called as a log stream. It's called as a write ahead log. Essentially every update by the customer needs to be durably persisted in the log record so that we can retrieve it when needed.
Starting point is 00:06:33 Okay. And we'll talk about why we store write log ahead and not always write the pages because the pages are big in size. So we cannot always write for efficiency sake. So we always write the log record because that is a minimum amount of data that we need to durably store. But for efficiency sake, we'll checkpoint periodically, right? So in case we have to restart the database engine, the checkpoint has a old state. the log record is the committed transaction is there in the log sequence and we take the log record, we need to reapply that log record to get the new state. Because it's committed data, we cannot forget the change by the customer. Sometimes we have to roll back a particular state because your transaction is rolled back, for example. customer. Sometimes we have to roll back a particular state
Starting point is 00:07:25 because a transaction is rolled back, for example. In which case we have a new state, we have the log record, we need to undo that operation to get the old state. So this is just a mechanism that the database engine uses to manage the data that is stored in the storage. And the main memory and the storage are always in a different point in time,
Starting point is 00:07:51 contains different value, and we need to synchronize them using this protocol. So let's take an example. So there is a transaction TX1 represented in red color. A customer might be updating, inserting rows, updating some columns in existing rows and so forth. For all of those changes, the log records are produced as I explained before and they are durably stored.
Starting point is 00:08:18 They're durably stored in the storage. At time TC, just before that transaction T2 started, and the logs are being written to the durable storage, the checkpoint was initiated, which means that the modified pages in memory needs to be flushed to the durable storage. So in this particular case, the red transaction was committed and it is being flushed. So the durable storage contains the page
Starting point is 00:08:52 that has the modification done by the transaction TX1. It still doesn't have the changes done by the TX2. And there is another transaction TX3 that was done, committed. So now we have the log records for both the blue and green in the durable storage. Transaction TX4 was ongoing. At that time, a system failure happened at TF. So the log records are partially written to the durable storage and the system failure it could be database engine restart a machine failure a hardware failure it could have been any one of those so now the database when it is restarted we need to bring back the database to the state that
Starting point is 00:09:39 was before the system failure essentially the database should know about the transaction one, two, and three, and transaction four should be rolled back. So how do we achieve it, right? The durable storage has the correct version of the red transaction, the TX1, but it doesn't have the data from TX2, TX3, right? So what we do is we use the protocol that I just explained before, do, redo, undo protocol.
Starting point is 00:10:08 So we read the checkpoint. We look at the logs sequences. We know that the checkpoint was done at this particular time. And we see that a blue page has been modified. So the previous version of the blue page from the previous checkpoint is read back. And this log record is applied back in and we get the new change of the blue page. And we do the same thing for the green page. Then we see that the transaction TX4 is not yet committed. So there are some log records and it has to be rolled back in.
Starting point is 00:10:41 So that log record is undone from the database and the pages are not modified. So this is how the transaction processing with respect to storage is done. So now let's look at the cost of this IO. So for every change that customer is doing, log records are created. These are typically small in size. So for every change that customer is doing, log records are created. These are typically small in size. So if a column is updated, the before and after image with some
Starting point is 00:11:12 metadata has to be stored durably in the storage. So you can imagine it as tens of bytes or hundreds of bytes. It's not usually big. It all depends on the schema, but usually the logs are much smaller. Whenever we write the checkpoint, we need to worry about what we call the torn page issue. Essentially, when a 16 kilobytes of page is serialized into the storage, the underlying device may not support atomic writes for this size, in which case, if there is a system failure during the write operation, the 16 kilobytes of data may not be actually durable, only partially it might be durable. So that's why it's called a staunch page.
Starting point is 00:11:57 So to avoid that problem, typically databases do double writes so that we can always get one complete version of a page and we can move forward right so what we see this arrive at is a database the user modified a small amount of data in the database that resulted in a write inflation so for the double writes is 16k at a time so there's 32 kilobytes at a time. And then there is absolutely what we need to write is the log records. And this checkpoint is causing a lot of IO to be done. Of course, the database implementation, all of the implementations know that it is very costly.
Starting point is 00:12:38 So they amortize it. So they don't checkpoint that often. They accumulate the logs and checkpoint periodically. But you can see that the performance of the database is all dependent on how efficiently you do IO. So let's talk about what we can do in the cloud. Before that, we need to understand how the traditional architecture look like. So in this diagram, I show that there is a compute instance. This is the database instance, which does the SQL data processing. It does transaction support. It does caching, which is the buffer pooling and result caching. Anything that is related to main memory is done by this layer. And then the logging, which is essentially durably persisting the data to
Starting point is 00:13:25 the storage. So in this architecture, you can see that to achieve scalability, typical idea is to increase the IO bandwidth. So you put better hardware, a lot more devices, user storage that can scale. It's all direct attached and so forth. So you make this pipe wider and you also make it efficient, reduce the number of IOs. So this is the typical technique that the database development has done to
Starting point is 00:13:57 achieve performance. What we did in Aurora, we took a tenant that log is the database. I'll explain what we mean by that. So let's imagine there is a log stream from the beginning of the time of the database from time zero. So there are changes to the red pages, blue page, and so forth. So the logs are accumulated. With this log stream at any given point in time, we should be able to recreate the particular database page version, right? So let's take an example. If you want to know the current version of the database
Starting point is 00:14:34 page at time T5 for the red page, you have to take the log record from T1 and T5, apply them together, and get the version of the page. So we consider log as the database. That means that you can always get the current version of every page by processing the log stream. So the most important part to be durable is the log. There are, of course, efficiency problems in this statement, and I'll explain how we overcome that. The first one is relying only on the log stream for pages is not practical. So say you run the database for n number of hours, days, and so forth.
Starting point is 00:15:17 We cannot always process all the logs to get the particular version or the current version of the database page. So you need to do slightly better. And the solution is checkpoint. So now you might ask, this is what traditional databases always did. So what's the difference, right? So the database instance was burdened by checkpointing task. It needs CPU. It needs to do a lot of IO and so forth.
Starting point is 00:15:41 That was the bottleneck, right? So what we did was we said, okay, let's take that burden off the database instance and push it to a distributed storage fleet that understands this log stream and does continuous checkpointing. So essentially, instead of the database instance doing the checkpointing, a distributed storage fleet can do the checkpointing using the logs streams. So now it's a completely parallelizable problem. The database sends only the log records to the distributed storage and the distributed storage can continuously checkpoint how much ever frequently it has to be done.
Starting point is 00:16:26 Of course, durably persisting the log is the key part because log is the database in our approach. So with this approach, what we are able to do is we can separate out compute and storage. And what we can see is that naturally the compute and storage behaves differently. So compute instances are ephemeral. It can be replaced. It can fail. Customer can shut it down to save cost. If you're not using the database, you can shut it down. You can also make it cost efficient by scaling up and down only based on the needs of the load. But the storage operates in a different dimension. It has to be long lived. It has to be durably present.
Starting point is 00:17:08 Even if the compute is not present, the data has to be there. So we need to achieve scalability, availability, and durability. But this can be independently achieved. So compute can have its own dimension of scalability. And the storage can have its own dimension of scalability and the storage can have its own dimension of scalability. So we have two different components that can be worked upon differently. So in Aurora, what we came to conclusion and built was a log structure distributed storage system. This storage system is purpose-built for databases. Essentially, this storage system understands log records,
Starting point is 00:17:47 database pages, formats, versions, and so forth. So the architecture became like this. So you have SQL data processing, you have transactions, you have caching, and the lower part, which was the logging layer, is changed. You have a small client library that understands this distributed system, but most of the activity happens on the distributed storage system. And you can imagine that once the lower half is done,
Starting point is 00:18:14 we would have had to build a lot of systems to make this happen. And we took advantage of the services that are already built in a scalable fashion in cloud, like Amazon dynamo db we use that for our metadata so to figure out which databases contain what type of data how it is spread on a distributed storage all that metadata is stored in the dynamo db we use route 53 which is a dns layer we use ec. We use all the instances that are available for any customer, AWS customer. We are not using any special hardware. And we use Amazon S3, which is the blob storage for long-term retention of our backup. The data itself is stored in NVMe SSDs on a distributed fleet that we built.
Starting point is 00:19:12 So let's dive in to the depths of what happens within a storage node. So in this diagram, you can see there is a database instance, and it is going to send some data to the storage node. And the storage node itself has peers that can be used for redundancy and for replication and so forth. So let's see how database uses the distributed storage. So when a customer modifies a database by inserting, deleting, updating the data in the database, the database instance generates log records. It sends it to a storage node. The storage node sorts them, keeps them in an in-memory queue, and then immediately purses on the NVMe storage, and we call it hotlog. And once it is durably persisted, we acknowledge to the database instance.
Starting point is 00:19:57 From that point onwards, the database knows that the data is durably persisted. It can proceed with its other activities. Within the storage, now the log is sorted. And if there is a gap within the log, for whatever reason, there could be a network issue or the database, the storage might have been restarting and so forth. A number of reasons why there could be gaps within the logs, in which case the storage nodes know to talk to its peers to pull the data in to fill in the gaps. Once the gaps are filled,
Starting point is 00:20:35 the storage node continuously applies these log records to the previous versions of the data pages and persists them. And that processing is called a coalescing. This is a continuous checkpointing that I talked about. So it periodically takes these data from the hotlog and then applies those log records to the previous versions of the data pages. And this is done independently from the database instance. And we continuously backup this data. So all the data from the log records are continuously sent to S3 for long-term retention and customer can restore at any point in time
Starting point is 00:21:14 at a log record granularity. And for efficiency sake, we are copying the database pages periodically also. And because we are continuously checkpointing, there are multiple versions of the database pages periodically also. And because we are continuously checkpointing, there are multiple versions of the database pages. So we garbage collect them for storage efficiency. And for correctness purpose, we also scrub all the pieces of data
Starting point is 00:21:37 to make sure that there is no corruption. We use checksums. So from this diagram, you can see that the hot path or the data path that the database instance is involved in is a very short path. It sends the log records. It is stored in memory and immediately persisted
Starting point is 00:21:56 and then acknowledged. All the other operations which can be long running are all asynchronously done, and it doesn't affect the database write performance. Now that we know what happens within a storage node, let's talk about the distributed system. When you have a distributed system with hundreds or thousands of storage nodes that is taking part in giving storage for a database,
Starting point is 00:22:23 we know that there could be independent failures that you cannot control. There could be failing nodes, devices can fail, disks can fail, switches can fail, and so forth. So how does a typical system handle these type of failures? So the solution is replication. So instead of storing one copy of data, store it multiple times. The common approach is replicated three ways and do a write and read
Starting point is 00:22:52 quorum of two out of three. Okay. So in this example, what I call about AZ is availability zone. Availability zone is a AWS concept where within a geographical zone, there are fall domains called as availability zone. And within each availability zones, imagine that there are thousands of storage nodes that are present. So for a better system, instead of putting three copies within the same availability zone,
Starting point is 00:23:20 we can spread that across three availability zones, make one copy per AZ, and then do the write-corum, write and read-corum of two out of three. So this is a good system, but it has its own issues. Let's talk about AZ failure. So the reason we have multiple fault domains called as availability zone is that AZ can fail
Starting point is 00:23:44 for whatever reason a power failure a network failure and whatnot so in the system which does two out of three quorum how does it handle az failure right so if one az goes down we know that it had only one copy of the data that means there are two availability zones each with one copy of the data, that means there are two availability zones, each with one copy of data. So we can still do two out of three copies. We can establish quorum. There is no data loss. It works, right?
Starting point is 00:24:13 What happens if there is an AZ plus one failure? What we mean by plus one failure is that within an availability zone, there are thousands of nodes that take part in providing storage. And there is a non-zero probability that a particular storage node within thousands of nodes is down for whatever reason. When that happens, the volume is not operational anymore because the AZ lost one copy. And now another copy is lost.
Starting point is 00:24:42 So we are left with only one copy right so if you do two out of three quorum you lose the quorum there's chance that you have also lost data right so how do we overcome that in aurora in aurora we do six-way replication with two copies per availability zone and instead of two out of three quorum we use four out of six quorum technique so let's go through the failure cases what happens if the availability zone fails if two co if availability zone fails two copies of data are down but we still have four copies left two per the remaining standing availability zones. So we still maintain right availability. What happens if AZ plus one fails
Starting point is 00:25:28 wherein one of the storage nodes in one of the availability zone fails while an AZ is already down? We still have three copies of data. There is no data loss. Of course, there is right availability loss in which case we quickly rebuild the failed copy in one of the availability zones from the remaining copies and recover the write availability.
Starting point is 00:25:49 So there is no data loss. There is a brief period of write loss. So this approach is much more durable than the two out of three logic. And this is why we did the six way replication. So now we can ask, OK, you have a distributed storage. You have thousands of nodes. How do you spread the data? So we take this partition, the volume into fixed size segments, and each segment is replicated six ways.
Starting point is 00:26:20 That's the first level of split. So now the question is how many segments should we create if we create smaller segments the failures are more likely if we create too big of a segment the repairs are going to take too long so we had to balance between the number of segments and the size of the segment and what we found was that with 10 g size, you know, it's a good number we picked. We were able to repair in less than a minute and this was acceptable to us. So whenever there is a failure, if you can repair within a minute, then the system continues to operate. Remember that the data,
Starting point is 00:27:01 the storage operates at a four out of six quorum. So if there is one failure, the customer does not see it. The database continues to operating with the available five segments. So it does four out of five until the repair finishes. So I'll briefly talk about how we deal with the replication metadata. So let's take an example. So let's imagine that one segment is replicated six ways on the six machines that are shown here, A to F.
Starting point is 00:27:34 We call this the quorum set and we version it. So there is an APOC one with all the nodes healthy. At some point, the monitoring system finds that the machine F is failing. So it is marking it suspect. And it found a replacement node to take the data and mark it G. And what we do is we create a concept of a dual quorum wherein there is the current quorum and the future quorum. Both are used for the quorums by the database instance. That means that in this particular case,
Starting point is 00:28:16 we don't immediately promote G as the future quorum, as the current quorum. We still keep both. And we start writing to both the quorums. So we and we achieve four out of six. So in this particular case because five nodes are overlapping between these two quorums we will achieve four out of six on both. In this case it might be four out of five. In this case it might be four out of six. So G is operational immediately while it is replicating the data from the remaining peers. And once the monitoring system has completely
Starting point is 00:28:51 found out that this suspect state is actually failed state, we mark it failed, and then we remove that quorum set wherein a garbage collection system would go and delete the segment from F if that mission is still around or just clean up the metadata. And we promote the future quorum to be the current quorum. So the key part here is that this whole logic allows us to move forward and backward without losing any data. For example, if there was no physical failure of the node, but we had to do heat management,
Starting point is 00:29:30 so F was just hot and we had to move some data off to G, so we can do this by creating the dual quorum and the database will start writing to G while G is catching up from its peer. But we can also change our mind. So say during this replication, G might fail, in which case we might say, okay, a hot node is okay. We'll replicate to a new node, call us Hedge,
Starting point is 00:29:54 in which case we just abandon this whole quorum set and then we revert back to the original quorum and then create a new quorum. So it allows us to move forward and backward without losing any data. The database continues to work seamlessly. So this is what I meant by compute and storage separation. We can independently deal with the systems problems
Starting point is 00:30:18 and challenges and scalability appropriately without impacting the database. So let's look at the performance results. In this diagram, I show a typical traditional MySQL implementation with better availability across two availability zones. In this particular case, there is a primary instance that is installed in availability zone one, and all the data is stored in this particular case, a blocked device storage system
Starting point is 00:30:49 called as Elastic Block Service. The data is written to this EBS volume and the EBS volume itself mirrors it for durability. So the database writes are done in sequentially in this system. And then the data is read back and sent to a different replica in a different availability zone. And the same thing happens on a replicated block storage. And in this particular type of setup, when we run a popular MySQL benchmark called a sysbench for 30 minutes, we are able to achieve 780,000 transactions overall.
Starting point is 00:31:29 And the average IO we saw was 7.4 IOs per transaction. Let's look at what happens in Aurora. In case of Aurora, the architecture is much different. We have a primary instance that does not have direct attached storage. It is connected to a distributed storage, as I explained before, across three availability zones, and it does four out of six quorum.
Starting point is 00:31:53 So when the data is modified by the customer, it is sent to this distributed storage in parallel. And then there is multiple read replicas that are attached to the writer instance, which can also read from this common storage. That's why we call it multi-attached storage. So the database instance is sending data to the storage and also to the read replica in parallel. And the read replicas are capable of serving read traffic. So now you can see that
Starting point is 00:32:26 we are sending only log records. We are not doing check pointing. So obviously we are doing much less IO. And in the same benchmark, which is run for 30 minutes, we are able to achieve 27 million transactions. That is 35 times more. And we are doing only 0.95 IOs per transaction. Whereas in the traditional system, we were doing 7.95 IOs per transaction, whereas in the traditional system, we were doing 7.4 IOs per transaction.
Starting point is 00:32:50 So it's less IO, better performance, and we are able to push down a bunch of work to the distributed storage. In terms of the throughput, Aurora is up to five times faster than standard MySQL databases. So in this case, in the left diagram, I show the right throughput, which is essentially the number of write statements on a sysbench workload. The first one is Aurora MySQL 5.7 and Aurora MySQL 5.6 is the next graph. And the comparison of the traditional databases set up on MySQL 5.6 and 5.7 is much lower.
Starting point is 00:33:32 So similar difference happens in the read throughput. We are more than 3x faster in 5.7 and 5.6 number of read statements that we are able to do. So in this case, we ran with two tables, 200,000 rows on a particular database instance type called as R416 Excel in EC2. So this was a short presentation, but we have described all these details
Starting point is 00:34:03 and much more that you might find interesting in two papers that we presented in 2017 and 2018. In SIGMOD, one talks about the architecture and the design considerations that I briefly talked about today, why we had to build a purpose-built storage and how it worked. And the second paper talks about the need for doing the membership changes the way we did, how we do not do consensus for IOs, commits. I think these papers are very much interesting, so I highly recommend. Thanks for listening. If you have questions about the material presented in this podcast, be sure and join our developers mailing list by sending an email to developers-subscribe at sneha.org. Here you can ask questions and discuss this topic further with your peers in the storage developer community. For additional information about the Storage Developer Conference, visit www.storagedeveloper.org.

There aren't comments yet for this episode. Click on any sentence in the transcript to leave a comment.