Storage Developer Conference - #190: Kinetic Campaign: Speeding Up Scientific Data Analytics with Computational Storage

Episode Date: May 3, 2023

...

Transcript
Discussion (0)
Starting point is 00:00:00 Hello, everybody. Mark Carlson here, SNEA Technical Council Co-Chair. Welcome to the SDC Podcast. Every week, the SDC Podcast presents important technical topics to the storage developer community. Each episode is hand-selected by the SNEA Technical Council from the presentations at our annual Storage Developer Conference. The link to the slides is available in the show notes at snea.org slash podcasts. You are listening to STC Podcast, episode number 190. So good afternoon, everyone. So in the next 40 or so minutes,
Starting point is 00:00:47 I'm going to talk about kinetic campaign, which is archival storage that we are building at Los Alamos for speeding up scientific data analytics with computational storage drive and multilevel erasure coding technologies. So kinetic campaign not only is able to speed up multilevel erasure coding technologies. So, Canada Campaign not only is able to speed
Starting point is 00:01:08 up some of the largest simulations in the world, it is also perhaps the world's first storage system that is able to push down analytics all the way to the individual drives while continue to allow data to be able to erasure
Starting point is 00:01:23 code it. So, we are not limited to three-way replication, which tends to be pretty expensive to run where we are talking about storing petabytes of data. So really quick about myself. I am an HPC storage scientist at Los Alamos National Laboratory. And before that, I was a PhD student at Carnegie Mellon, and I graduated in 2021. And I do distributed file system metadata management, key value stored, and scientific data analytics. And here is a link to my homepage.
Starting point is 00:01:58 So today we will go over four topics. So first, we are going to talk about why computational storage is important to HPC in terms of large-scale scientific data analytics. And then we will quickly introduce MyFast, which is Lano's current archival storage using erasure coding technologies. And then the Kinetic Platform, which is Seagate's novel active disk research platform, providing in-drive computational capability.
Starting point is 00:02:31 And then we will introduce C2, which is Lano's next-generation archival storage that is able to combine both the in-drive computing capability provided by Seagate Kinetic Disks and erasure coding technologies provided by Seagate Kinetic Disks, and initial coding technologies provided by MarFS to form a single system to achieve both cost-effective data storage and
Starting point is 00:02:53 protection, while be able to accelerate scientific discovery. So this is a collaboration work between Osanomos and Seagate, and as we mentioned before, C2 is maybe the world's first system to be able to do both in-drive computing and iteration coding. So let's get started.
Starting point is 00:03:13 So before we dive into the details, just to bring everyone into the same page, and we will talk about why computational storage is important in terms of HPC. So at Los Alamos, we run some of the world's largest simulations. And when those scientific simulations run, they run on compute nodes. And when they run, they will periodically stop and write their data to an underlying parallel file system. Scientific applications do that because they can later, after the simulation, read back those data for in-memory analytics, such as making a movie. Scientific data usually don't compress well,
Starting point is 00:04:01 and performance usually depends on provisioning the right amount of bandwidth and fully utilizing those bandwidths. So there are two trends that is happening right now. The first trend is that the storage is becoming increasingly multi-tiered. And this is because as we increase, as we bigger larger and larger machines, we increase our data size, and that data size require us to provide a higher bandwidth for those compute nodes. And we are living in a marketplace where there's no single media type that can simultaneously provide speed
Starting point is 00:04:35 and the capacities that the computer needs at the same time, given a certain cost of budget. So the result is that the overall compute platform is becoming more diverse to have multiple tiers of storage using different storage media for different purposes. So here is a more recent supercomputer at Los Animos, which has three layers of storage. So there is a layer of Buster buffer made of flash devices providing performance tier in front of the parallel file system. There is also a capacity tier after the parallel file system using high-density DICs to provide the capacity needed by the compute platforms. So the result is storage becoming increasingly tiered. And some of the cause of it is that we are seeing asymmetric read and write performance.
Starting point is 00:05:28 So with this tiered storage, writing from the compute nodes to the storage can continue to be fast because the compute node is writing to a bus buffer which is made of Flash. But the reading from the storage may not be fast because the data may be staged all the way back to the longer-term storage made of high-density disks. In that case, when data is read from there, the read bandwidth will be much slower than the user will see if the data was read from the Buster buffer. So this has caused a problem for scientific data analytics because when the scientists run analytics, the data may already be
Starting point is 00:06:09 staged all the way out to the longer term data storage which has much lower read bandwidth. So that's challenge number one. At the same time, what we are seeing is modern scientific data analytics are becoming increasingly selective. So in the past, scientific data analytics tend to require reading back all of the simulation output to the compute nodes. And then the performance
Starting point is 00:06:35 is basically about being able to read those data as quickly as possible. But today, the queries, the scientific data queries, tend to only target a small subset of data. So one example is the movie that is showing on the slide.
Starting point is 00:06:50 So this is a movie made 10 years ago. So in that movie, all particles in the simulation was rendered on the movie. But this is no longer the case today. So today, the simulations are much larger, and scientists are often interested in only a tiny amount of data inside the simulation. So, for example, in this slide, it's showing a query that the scientists are only asking for particles with kinetic energies that are bigger than a certain level.
Starting point is 00:07:17 So, in that case, it's not we are going to read all data, and oftentimes, only 0% of the data needs to be read from the storage. So the problem is no longer read all the data as quickly as possible. It is about, can I only read the data that the query is targeting? I don't want to read any more data. Because reading more data, when the data is in the slower storage tier, it's going to be very costly at increasing the query processing time. So,
Starting point is 00:07:58 when we ask that, if the query only is interested in some high-energy particles, why we cannot have the simulation? At the end of that simulation, the simulation just writes those high-energy particles in a different place, so it's easier for the scientist to do investigation on these particles. The thing is that usually a scientific simulation runs for weeks or months.
Starting point is 00:08:28 So if we have the simulation down to the high energy particles, the simulation can only do it for the very last state of the particles. But the scientists are often interested in the state of particle during the entire course of the simulation. So why does the energy can do that for the very last time step, it cannot do it for all the time steps before it. This is because as the simulation run,
Starting point is 00:08:51 the simulation itself does not know which particle will become high energy when the simulation ends in like three weeks. So, if we do that, all of the particles has to be dumped during the entire course of the simulation. And if we want to create indices, we have to index all the particles because any of those particles might become the high energy particles when the simulation ends. Then the problem becomes that if we go ahead and create those indices,
Starting point is 00:09:22 we cannot do it on computer nodes because the compute node memory are basically reserved to run simulation. So there won't be much left to do the indexing work. We may be able to enforce and order other compute nodes by sorting particles.
Starting point is 00:09:41 That is doable. It's just that sorting can only help one query. So if the particle has multiple columns, then we can sort particles according to one column, and only query against that column can be sped up. So multidimensional queries with predicates on different columns still require reading all the data back because we don't have the indices. So that's the challenging part of reading
Starting point is 00:10:08 only interesting particles. So there are many existing solutions in the research community trying to speed up scientific data analytics. So there are basically three of them. Post-processing mechanisms, in-transit computing, and in-situ processing. But they kind of all share the same thing, is that they all kind of suck.
Starting point is 00:10:28 So, for example, if we go through the post-processing mechanism, what it does is that we run the simulation, have the simulation write data to storage. Then after the simulation, we launch a post-processing job. What the job does is that it reads those particle data from storage, build indices in computer memory, and then write indexed versions of the data back to storage. And then the query can sped up because it has those indices. The problem is that the post-processing job requires reading all data in memory and build indices and write all data back. So that's called excessive data movement.
Starting point is 00:11:05 And we know that with tiered storage, it's very costly to move data. And for in-transit computing, the idea is that we kind of want to avoid moving lots of data. So what we do is that while the simulation is running, consuming a portion of compute nodes, we use another portion of compute nodes
Starting point is 00:11:23 to run in parallel with the simulation. So when a simulation writes data, it writes data to the compute nodes that is reserved for indexing. And then the compute node can just index data before it writes data to storage. So in that case, we only write data to storage once, but the problem for that mechanism is that
Starting point is 00:11:44 we have to reserve compute nodes to build indices. So for a very large simulation that needs to consume an entire supercomputer, we cannot do it because we don't have any computer left to build indices. So this intras computing mechanism kind of only works for smaller jobs. It cannot work for the jobs that consume the entire computer. And finally, there is in-situ processing. The idea is that the mechanism is trying to reuse some of the idle CPU cycles available on ComputeNode when a simulation is writing
Starting point is 00:12:16 data to storage. That kind of works. The problem is that because the ComputeNode resources are so limited, this mechanism can usually produce indices on a few columns because the resources are limited. So it usually cannot be able to build indices for all the columns, so multidimensional queries kind of still does not get benefit from this mechanism.
Starting point is 00:12:44 So I think computational storage is providing new opportunities to speed up scientific data analytics. And the problem is that today, all the computation has to happen on a compute node. So either we are talking about building indices for scientific data or reading data back for in-memory scanning. They have to happen on the computer. That's why we have to move the data
Starting point is 00:13:09 all the way from archive storage back to the computer. So to reduce excessive data movement, we either use in-situ processing, which kind of leaves us with very reduced index quality, cannot serve multidimensional queries, or we do in-transit processing which does not work for large jobs. So to break these trade-offs, computational allows for overcoming those
Starting point is 00:13:35 mechanisms, limitations, by offloading compute to storage nodes and reduce data movement. So that's kind of the motivation why computational storage is going to be an important component to HPC. Now let's do a quick introduction on MyFS, which is Lionel's current archival storage, serving as the cool storage tier before the tape and underneath the parallel file system. And we will highlight its multi-level
Starting point is 00:14:05 erasure coding mechanism to achieve cost-effective data storage and protection. So this is a recap that currently at Los Alamos, we have like 40 years of storage. We have Buster Buffer, which is Flash. We have parallel file system, which is the primary storage. And we have campaign storage, which is where the MyFS is at. So it provides six terabytes of storage as a capacity here in front of the tape. And currently the bandwidth is 50 to 100 gigabytes per second, which is five to ten times faster than the tape. So it can be seen as a disk-like tape storage tier in the lab.
Starting point is 00:14:48 So what I want to highlight is that the MyFS is currently using multilevel erasure coding mechanisms to achieve cost-effective data protection. So different from many other erasure coding storage systems, MyFS is using two levels of data protection domains. The first domain is the global protection domain, which is achieved by MarFS gateway node distributing data across all the storage nodes. So that's the first level of data protection.
Starting point is 00:15:21 The second level of data protection is achieved at each storage node. This is done by the storage node distributing data to all the disk drives that it manages. And currently it is using ZFS RAID technology to be able to do that. So both levels know how to load balancing all the storage nodes and the drives. So not a single drive is only used for parity.
Starting point is 00:15:49 And basically every drive is serving data for some subset of data. So the bandwidth of this drive and the nodes can all be utilized. So why multilevel erasure coding is more beneficial? So in this figure, we're trying to compare one level of erasure coding with two levels of erasure coding. So we call this localization. So at the right part of the slide, this is where we have only one level of erasure coding. So let's say we have a client node and a set of OSD nodes for data storage. So usually in this storage system, what we have is the client would have a client library
Starting point is 00:16:34 that the client process can link to to be able to communicate with OSD. And then the user-level library will try to divide the user data into fixed-size chunks. And then for each of those chunks, the library will find a master OSD node responsible for that chunk. And it will send that chunk to that master OSD. The master OSD receives the chunk, and it will do erasure coding inside the OSD memory. And after that, it will distribute all the erasure-coded components to all other OSD memory. And after that, it will distribute all the initial coded components to all other OSDs.
Starting point is 00:17:07 So what it means is that doing initial coding require OSDs to communicate with other OSDs. So the performance kind of depends on the inter-OSD network communications, inter-OSD network bandwidth. So on the left-hand side, which is what the MyFS is doing, so MyFS has two levels of erasure coding,
Starting point is 00:17:29 so what it does is that the client will send data to the gateway node, and the gateway node will maintain a large buffer and do the first level of erasure coding there and distribute data to all the OSD nodes. And then the OSD node will receive its portion and it will do its level of erasure coding there and distribute data to other OSD nodes. And then the OSD node receives its portion and will do its level of erasure coding locally or within that OSD without requiring inter-OSD communications. So in that sense,
Starting point is 00:17:57 we can achieve, we can provide higher data bandwidth without requiring us to have a very high level of internal OSD communications. What we mostly need to do is to provision enough network bandwidth for the gateway node. But for the OSD node, we don't need that higher network bandwidth. So that says the benefit of doing multilevel Azure coding is to enable us to further reduce the cost of running to protect and store a large amount of data. The cost of doing multilevel erasure coding is that we are doing two levels of erasure,
Starting point is 00:18:37 so we generate more parity. So the space overhead is larger than if we just only do one level. But because the capacity is currently cheaper than network bandwidth, so that's why it's a big win. So now we kind of solved the problem of being able to cost-effectively store and protect data. The next step is to add a computational piece into the CurrentMRFS so that we can not only store data cheaply, but we can also accelerate scientific data analytics. So here is our master plan.
Starting point is 00:19:12 So what we will do is that we will imagine that the upper layer will write coordinate data to our campaign storage. And then the reason we focus on coordinate data is because we want all the columns to be indexed so that we can serve multidimensional queries issued by scientists. And then when the data lands in the campaign storage drives, then we ask the campaign storage to build indices
Starting point is 00:19:40 on these drives asynchronously in the background. This step is actually optional because while the data is transferred from parallel file system to can be storage, we kind of already is building some light-weight indices such as the minimum and the maximum value of each column for every megabytes of data.
Starting point is 00:20:03 So those statistics data is already able to help us reduce the amount of data we need to scan for a query, but that does not work for all workloads. So for the scientific applications that require more detailed indices, per row indices, and it can ask the drive to do it for the application. Then when the step two is done, then the compute storage is ready to serve queries. And then the scientists basically just directly send the multi-dimensional query to the compute storage,
Starting point is 00:20:40 and the storage will just return only the data that match the queries. So there's no need to read all data to the compute node to do massive scan operations. That is why it can accelerate scientific data analytics. We will use Seagate's Kinetic platform to implement our in-drive computation capability.
Starting point is 00:21:06 So Kinetic is Seagate's research active disk platform and provides in-drive computation. So next we will do a quick introduction on Seagate's Kinetic platform. So the Kinetic disk consists of a high-density storage disk plus an envoy card, so that the envoy card is attached to the disk and provides computational capability. Inside the envoy card, there are currently two ARM CPU cores, one gigabyte of memory, and it runs an ARM version of the Ubuntu OS. And each drive will have two ethernet ports. Each port is 2.5 gigabits per second.
Starting point is 00:21:54 So SIGA is able to reuse the SAS connector of the disk and reuse that disk to transfer ethernet signals. So while the disk can speak Ethernet, it doesn't have the usual Ethernet port. So the idea is that the shape of the disk doesn't need to change. It kind of can plug into the chassis in a compact way. So that's pretty nice.
Starting point is 00:22:21 So the current kinetic chassis is 96 drives per chassis. Each drive is the current prototype. Each drive is 16 terabytes. So 96 of them give us 1.5 petabytes of storage per chassis. And in this chassis, each drive will have an IP address that the host can communicate with the driver like it communicates with a server. And then the chassis itself has a switch. So each drive is connected to the chassis switch.
Starting point is 00:22:56 There are 96 of them, each providing 2.5 gigabits of Ethernet for a total of 240 gigabits of total driver bandwidth. The switch is fully subscribed, so while the 96 drive with 2.5 gigabits per second is going to the switch, on the other side, it's 40 gigabits plus six links to the outside layer. So both sides have 240 gigabits, so it's a fully subscribed environment. The chassis is able to provide high availability by providing two switches and have each drive have two Ethernet ports.
Starting point is 00:23:36 So if one side of the switch is broken, then the other side can be used. Can I use both sides of the switch to just both simultaneously do data communication? We can adjust that. It's not that useful because each drive, the driver media is not able to saturate two Ethernet connections. So you use one connection is kind of enough. The kinetic drive itself works like a key value store, and it's, so it's not speaking, it's not you use the block device interface, it's using a key value interface. So on the right-hand side of the slide, it's showing the high-level
Starting point is 00:24:26 architecture of the drive. So the drive has an Android card, which has two CPU cores and one gigabyte of memory. On top of that, it's running an Ubuntu OS, and inside the OS, it's running a kinetic server process that we're listening to a port for host communication. At the host side, it can run a client application process, and the client process will be linked with the user-based kinetic library, and the library knows how to communicate with the kinetic server on the disk using RPC over Ethernet.
Starting point is 00:25:00 So that's the high-level usage scenario for the disk. So the Connect API has a bunch of useful primitives. So first we can ping the drive to see if it exists, if it's performing well. We can also do a device erase. So it works like a make file system. So it gives us a new device, brand new. It's just that the makeFS gave us a new file system while the driver gave us a new device, brand new. It's just that makeFS gave us a new file system, while the driver
Starting point is 00:25:27 gave us a new key value store. The key value store supports point operations like put, get, delete, and range operations like get previous, get next, and get a range of keys matching a certain search criteria. It is also able to execute a user-defined function and send a function to the drive for operations. So the most sexy forms of those functions are written as eBPF. So that is an active industry effort to moving forward with that.
Starting point is 00:26:08 But the program can also just written in native C++ program. So this will give us the most flexibility. For more information, we can go to this website and check the API of the Kinetic. So currently, the Kinetic is more like a research platform. The final product may differ a little bit. So our goal is to replace the current regular
Starting point is 00:26:34 SMR drives with those kinetic drives so we can have the connected drive implement the in-drive computation for us. So here is our plan. So we will reuse the current MyFS data path. We will continue to do two levels of erasure coding to ensure cost-effective data protection
Starting point is 00:26:55 and storage. And besides the data path, we will add a new analytics path. The analytics path will use the kinetic HDD to do computation for us. They basically do two jobs. One is to asynchronous build indices.
Starting point is 00:27:13 The second is to do query processing. So we send the query to the drive and drive will do the query for us. To be able to have the drive know where the data stores, we will use the MyFS metadata information to translate each MyFS file name to the underlying ZFS files that is responsible for that high-level MyFS file. And we will additionally use the ZFS metadata information to be able to translate each ZFS file name to the underlying disk
Starting point is 00:27:48 LBAs that is responsible for that ZFS file. And then we basically can map a high-level file, high-level user-level file, to all the underlying disk locations that is responsible for the file. And then we imagine a stream layer that can translate disk LBAs to kinetic key values, key value pairs. This is because the kinetic is a key value device.
Starting point is 00:28:14 It's not a regular block device. And then we will just have everything work out. And when we want to have the drive do something, we send analytics code, which is either the indexing code or the query processing code, in eBPF format or in native C++ format to the drive,
Starting point is 00:28:35 and the program will run there. The input of the program will be key value pairs, and those key value pairs, we can get them from the MyFS and CFS metadata information. And the output of those analytics code is also key value pair. And then the user can just do a second key value pair retrieval to get a result. So that's the high-level workflow. So this basically leads us to the C2,
Starting point is 00:29:08 our lab's next-generation confidence storage that combines both in-drive computing capability and ratio coding into a single system to accelerate scientific analytics. So here is our current prototype implementation. So the C2 will look similar to the existing MyFS architecture. So it will have a gateway node, and beneath the gateway node, it will have a bunch of OSD nodes, and each OSD node will use CFFS to provide the last level erasure coding. But each drive will not be the regular
Starting point is 00:29:47 SAS drive, it will be the kinetic drive. And on the button right, this is our longer term design. So that is each drive is acting as a key value store. And for the ZFS to communicate with the drive, we imagined a kinetic kernel module to translate the key value device into a block device and expose that block device to the ZFS. So from the ZFS point of view, it's talking to a block storage. But from the kinetic point of view, it's just serving a key value client. So that's the data path.
Starting point is 00:30:27 For the analytics path, what I imagine is that first we'll have a libZDB code, which is our lab's stripped-down version of ZDB that is able to map any ZFS file name, any valid ZFS file name, to the underlying disk LBAs that respond for the file. And then this libzdb can be embedded inside the analysis process.
Starting point is 00:30:53 And then the analysis process uses libzdb to get all disk LBAs that store the particular data set and then send those LBAs to the underlying kinetic disk so the disk knows which key value pair to process. So that's the longer-term design. For the current prototype, what we are doing is that we are not using the key value interface of the kinetic drive because developing that kernel module is pretty heavyweight. So what we do is that we have the drive
Starting point is 00:31:28 to serve as an NVMe target, and we directly expose the drive to the ZFS using NVMe-OF so the ZFS will directly communicate with a block device. On the analytics part, we still have the libcdb, but in addition to sending the... We have our custom reader program, and we have a custom client
Starting point is 00:32:00 to communicate with the drive and to run this custom reader program to implement the search functions that we need for scientific data analytics. So that's kind of the current implementation. So one difficulty of having in-drive computation and initial coding to coexist in a single system is the data alignment issue.
Starting point is 00:32:25 So basically, many applications store their data. We can see that applications storing data as an array of indivisible units. So what are indivisible units? So those are kind of the, sometimes they are called block, sometimes they are called page. The basic idea is that those are a topic unit for that application use to do data encoding, data compression, or data checksumming. So that's why the entire small chunk of data has to be read entirely into the memory
Starting point is 00:32:57 before we can start to decode the data from, read the data from that page, because it's encoded, maybe compressed, and checksummed as an atomic unit. So the problem would be when the storage is writing those application data to the disk, it does not necessarily recognize those indivisible unit boundaries.
Starting point is 00:33:23 So it may happen that some indivisible unit boundaries. So it may happen that some indivisible unit is split over different drives. So then that individual drive cannot be, cannot be able to decode that data. It cannot do analytics on it. So in this example we have a user file. The file has two layer of data structures, trunk and a page.
Starting point is 00:33:44 In this case the page is the indivisible unit. The page is where the application will do encoding, compression, and check summing. And in this example, we have page 0 and some portion of page 1 stored in OneDrive and another portion of page 1 and page 2 on another drive. So the drive is able to do computation on page zero and page two, but it cannot do anything on page one because each drive only gets partial information, and with that information, it cannot read the data out of that page.
Starting point is 00:34:18 So to overcome this problem, we kind of realized that we have to co-design data with the underlying original coding schemes. So in our case, we are of realized that we have to co-design data with the underlying erasure coding schemes. So in our case, we are using ZFS, so we have to co-design data with the ZFS RAID-C schemes. So in general, when we write data as files into ZFS, ZFS internally will divide the files into what the ZFS call records. So when ZFS divides the data into records, each record is individually rated across all the drives. So in this case, we have a rate pool that is eight plus three, so eight data disks,
Starting point is 00:35:00 three parity disks. So the key to co-design data with erosional coding is to configure the ZFS record size to match the underlying rate configurations. So we have an 8 plus 3 rate, but if we configure the ZFS record size to be 8 megabytes, so in that case, the ZFS will divide the data into 8 megabyte chunks, and for each 8 megabyte chunk, it will do a rate. And because the underlying rate is 8 plus 3, so in that case, the 8 megabyte chunk will write evenly across all the disks.
Starting point is 00:35:41 Each disk gets 1 megabyte of data. So disk 1 gets the first megabyte, disk two gets the second megabyte, something like that. So that's step one. And step two is that we ensure that the applications in the visible unit happen to match the ZFS real core size.
Starting point is 00:36:03 So in this case, if we fix the application in the visible unit to be one megabyte, and then when the application writes its data to ZFS, it will be one megabyte, one megabyte, one megabyte basic storage unit. Then eight megabytes will become a ZFS record, and ZFS will rate it across the disk, and each disk will just get 1 megabyte, which will happen to be that indivisible unit of the application. And then the drive will be able to, is guaranteed to be able to read the data out of it and do analytics operations.
Starting point is 00:36:41 So that kind of is the key. So in situ, we focus on coordinate data, and now we are using the, currently we are using the Apache Parquet as our format. So in general, the Apache Parquet format looks something like the figure shoes. We have a header, and we have a bunch of row groups, and we have a footer. The row groups can be viewed as a horizontal partition of the data set. If we use the standard ParquetWriter to write Parquet files,
Starting point is 00:37:16 then we will have row groups that are of different sizes. So what we do is that we build our own custom Parquet Writer on top of the standard one to making sure that each row group is exactly one megabyte. So this is done basically by having a one megabyte buffer in memory and pay special attention to the alignment so that each megabyte is always to be one megabyte exactly. So in addition to doing that,
Starting point is 00:37:47 the our Parquet, custom Parquet writer will also put the per row group metadata, which is describing the minimum and the maximum value of each column, the scheme of each column, those kind of stuff, at the end of that megabyte, will continue to make sure that the row group is one megabyte exactly. So usually in the standard packet format,
Starting point is 00:38:11 those information will be put on the footer. But in our case, those row groups will be distributed to drives and if we put the metadata in the footer, that's too far away so the driver won't be able to see it. So what we do is that we replicate that per row group metadata. We store one version in the footer, so the entire file is still a valid file, but we still another copy at the end of each row group, so the disk drive is able to see it when the row group is distributed across different drives. So we make sure each row group is one megabyte. And when those logical parquet file is written to ZFS,
Starting point is 00:38:58 we do a second transformation such as that. We create a directory for each logical parquet file. And inside that directory, we store two container files. One container file is used to only store the header and the footer. This is because we want to ensure alignment, and the header in the beginning of the file is preventing me from achieving that. So we create a metadata container
Starting point is 00:39:18 to only store the footer and the header, and create another data container that is used to store those row groups only. So in that case, because each row group is one megabyte each, so we just write it to ZFS, ZFS will divide data, we set the record size to eight megabytes, so ZFS will divide its data to eight megsize a chunk, and eight megabytes size a second chunk,
Starting point is 00:39:43 and that size will, to match the underlying rate configuration, which is eight plus three. And then each drive will just get one megabyte of data of that record, and that one megabyte will just guarantee to be some row group. And the row group has the metadata information at the end.
Starting point is 00:40:02 So the drive will be able to, is guaranteed to be able to do analytics on it. Then the last step is just, we use our libcdb to match the file name to the underlying disk LBAs, and we have the LBAs, the disk will basically know all the LBAs of that file, and then can just do operations
Starting point is 00:40:25 on this particular location of the disk. So one thing, one cost of doing this is that we are kind of adding paddings and we replicated the metadata information for each row group. But our experiment shows that if we do that, we are only increasing the overall storage overhead by 0.4%, so it's not actually pretty big.
Starting point is 00:40:54 And this is mostly because the row group is one megabyte, which is pretty large. So here we want to do an experiment to showcase how this C2 design is able to achieve both, allow in-drive computation and the original coding to coexist while be able to accept science. So the data set we are using is a particle data set dumped from a real-world scientific simulation. So we have 500 million particles. Each particle is 24 bytes, and the total data size is 12 gigabytes.
Starting point is 00:41:34 And at the right, we are showing the schema for each particle. So it has five columns, and the most interesting column is the last column, which is the KE, which stands for kinetic energy. So that's a floating point. It's four bytes each. If we use our own Parquet writer to generate the data, and we have about 11K Parquet row groups, those 11K Parquet row groups will be distributed across the five kinetic drives. Each kinetic drive will get about 2,000 row groups.
Starting point is 00:42:09 And we do queries on this data set, and we basically do is query all the columns, but we kind of specify a condition that we want the particles only with energies bigger than a certain value, and we have five values for it. From left to right, the left is kinetic energy bigger than 0.7, so this is the most selective query
Starting point is 00:42:36 to the point where no particle will hit. So this is the extreme case. And the far right, we have kinetic energy bigger than 0.3, This is the least selective, and about one million particles will match the query. And we have two runs. The baseline run, we just ignore the in-drive computational capability of the kinetic disk, and we just run the parquet reader on top of the ZFS and just read it as a regular file system. And with the kinetic acceleration run,
Starting point is 00:43:11 what we run on the host is a query coordinator. And then that coordinator is acting like the master, so it will use libcdb to figure out all the disk locations that is storing the ParquetRow groups. And then it sends those disk locations that is storing the Parquet row groups. And then it sends those disk locations to the drive, and the drive will run a custom reader program just directly to the disk to do the query operation, and then return data to the query coordinator. So that's the acceleration path.
Starting point is 00:43:40 So here we are showing the result, is that comparing this baseline and the kinetic path, the kinetic is able to deliver up to five times faster query performance. So the query time is five times lower. So the biggest speedup show up when the query is exhibiting very high selectivity. So in the extreme case, we have a query with energy bigger than 3.7, then no particle would hit. So what the original ParquetReader would do is that it also tries to be smart. So what it does that before reading that row group
Starting point is 00:44:23 to the host, it will first just do a small read of the metadata information of that row group to determine the mean and the mean max value of that row group. So if the row group has the data that match the query, it will then go ahead to read the entire row group. If it does not match, it just will skip that row group
Starting point is 00:44:46 entirely and go to the next row group. So even though the baseline is not reading all the data from the disk, we can see that it's still like five times slower than kinetic. And this is because even reading the metadata is requiring a remote seek operations sent to the disk. And it's 11k per k row group, so it's sending 11k seek requests to the drives.
Starting point is 00:45:11 That's why it's giving it five times slower query time. So as the query selectivity lower, more particles will become hit, and here we are seeing that the kinetic speedup is reducing, and this is because the kinetic has higher CPU overhead, which is slowing down the kinetic drive runtime. So basically as more data is hit, at the driver side, the driver needs to do more on decoding the file from the parquet format and return data to the client. So do more processing.
Starting point is 00:45:48 And because the kinetic is using ARM CPU core, it's pretty slow. That's why it has higher CPU overhead and it's kind of slowing down. So as the Q-Rig selectivity becomes even lower, more particle is written, and we can see that the kinetic is performing almost as fast as the host. even lower, more particle is written, and we can see that the kinetic is performing
Starting point is 00:46:05 almost as fast as the host. And the reason is that the host is mainly bottlenecked by data movement, but the kinetic is basically bottlenecked by CPU, so they are bottlenecked by different stuff, although the result seems similar. So we expect that currently we only have one host and five kinetic drives. But as we increase the driver count, we are talking about one host, 80 drives.
Starting point is 00:46:37 In that case, we are expecting that for the rightmost cases, the kinetic is still able to provide a much higher query acceleration. But we will show that result next time. So here is our future work. The first is that we'll increase our scale. We will add more drives to the experiment.
Starting point is 00:46:59 And then we will add asynchronous index construction idea to the drive so it can build more detailed indices, per row indices, and not just the bin max for every row group, things like that. And we will add more levels of error coding. So currently it's one, we will add more. Potentially two and three levels of error coding. So here is the conclusion.
Starting point is 00:47:29 So massive data movement has become a key bottleneck for large scale scientific data analytics. And near data computation is provide opportunity to address that bottleneck. And on top of that, having the flexibility to move compute to where it performs the best will become increasingly important as the market is involved rapidly. So with C2, we kind of just demonstrated that in-drive computation can coexist with
Starting point is 00:47:55 erasure coding. So we have the flexibility to move compute to the drive, even though we are using erasure coding, and erasure coding is not limiting us, giving us reduced possibilities. So that's it. So thanks. That's my size. Thanks.
Starting point is 00:48:18 Thanks for listening. If you have questions about the material presented in this podcast, be sure and join our developers mailing list by sending an email to developers-subscribe at sneha.org. Here you can ask questions and discuss this topic further with your peers in the storage developer community. For additional information about the Storage Developer Conference, visit www.storagedeveloper.org.

There aren't comments yet for this episode. Click on any sentence in the transcript to leave a comment.