Storage Developer Conference - #152: SkyhookDM: Storage and Management of Tabular Data in Ceph

Episode Date: August 24, 2021

...

Transcript
Discussion (0)
Starting point is 00:00:00 Hello, everybody. Mark Carlson here, SNEA Technical Council Co-Chair. Welcome to the SDC Podcast. Every week, the SDC Podcast presents important technical topics to the storage developer community. Each episode is hand-selected by the SNEA Technical Council from the presentations at our annual Storage Developer Conference. The link to the slides is available in the show notes at snea.org slash podcasts. You are listening to SDC Podcast, Episode 152. Okay, hello there and welcome. Today I'm going to talk to you about Skyhook Data Management Project. This is storage and management of tabular data in Ceph. I'm Jeff Lefevre.
Starting point is 00:00:55 This is work also with Professor Carlos Monson at the Center for Research in Open Source Software at UC Santa Cruz. So the Skyhook Data Management Project, it's open source. It's LGPL 2.1 license. And crucially, it's built on top of Ceph. So Skyhook is computational storage for tagging their data effectively. And we like to say extensible, scalable, and generic. These are properties that are desirable for us. It's extensible because we're going to build on top of Ceph using Ceph's built-in extension mechanism.
Starting point is 00:01:22 We'll talk a bit about that, about classes and methods that users can define. And it's scalable because these methods and classes are defined on objects, right? And objects are part of the core entity of Ceph, and Ceph is a very scalable system. So we provide this classes and methods and objects, and as they scale out, so does our
Starting point is 00:01:40 functionality. And lastly, we like to keep it generic, right? We like to not have a one-off sort of computational storage solution, but a more generic approach, right? And in this case, we're going to consider formatting our data in Google Flatbuffers or Apache Arrow, and these are efficient serialization libraries for memory access and allow some computation as well. And in this respect, they're generic in that we can utilize these third-party
Starting point is 00:02:04 libraries that are also open source. So the design goals, as I mentioned, is store process and manage tabular data. We'd like to dynamically offload computation to storage servers
Starting point is 00:02:16 with our methodology. We also want to dynamically reorganize physical data configuration. What we mean by this is on disk or on cross objects, how do we lay out the data? And with the overall goal of reducing CPU resources
Starting point is 00:02:29 of the client, so offload some work to the storage, and reduce network IO back to the client. So reduce the amount of network that's injected into the system. We take a programmable storage approach. For more information on this, you can see a page from our lab, programmability.us. That's from our research lab. And essentially, programmable storage means to combine, expose,
Starting point is 00:02:53 or extend existing software services toward new functionality that's desired. In our case, we're going to develop data management methods on objects, as I mentioned, and this will allow us to do in-storage execution of tasks. And to do this, we're going to embed external libraries in storage. For instance, like I mentioned, Apache Arrow or Google Flatbusters. And importantly, we're going to interact directly
Starting point is 00:03:15 with Ceph's Liberados library. This is the core object library, and all of the other interfaces in Ceph, say CephFS or Redos block devices, those are actually built on top of the RADOS. So since this is a storage developer conferences, I want to get into a little bit more of the details of exactly how this is done and some of the code
Starting point is 00:03:35 snippets and examples for you. Most importantly, we're going to look at the user-defined extensions and stuff. In our case, we'll utilize this object class mechanism, also dubbed CLS. And this is an extensible framework for objects. So you can think of this as user-defined read-write methods that you can inject into the system. This is the source tree at source CLS. You can look at them there.
Starting point is 00:04:04 These classes are defined and the methods are executed directly on objects, directly by objects. And these methods sit inside shared libraries that are distributed to all OSDs, so they're available to any OSD server to execute on an object that's stored locally. And again, the CLS classes,
Starting point is 00:04:22 these are utilized by Ceph internals, right? This is an existing interface inside Ceph. For instance, CephFS and RadoskAway and others actually use CLS quite a bit. So it's a robust interface. For example, here's a quick picture of the tree. It's pretty active. This was just from a week or two ago. And you can see, as I mentioned, there's CephFS, there's journaling, there's Lua.
Starting point is 00:04:42 There's a Lua interface also. There's RBD. And's journaling, there's Lua, there's a Lua interface also. There's RBD. And each of these represents a class, and inside that folder is the class and its associated methods. We'll get into this a little bit more. There's also an SDK for users to develop their own object classes. Then the question is, are programmers actually using this, right? Are they finding things that they want to do that don't exist, or are they combining and extending services toward their usage?
Starting point is 00:05:06 And the answer is yes. So this is a graph from a paper from a group in our lab, and it shows the growth in object classes and methods over time. So on the x-axis is 2010 to 2016, and on the y-axis on the left-hand side is the interface count. This includes the classes and methods, and roughly there's lines of code on the right-hand side in C++. You can see that over time,
Starting point is 00:05:29 our users are actually extending and writing their own classes. For more information about this and more examples of programmable storage, you can reference this Eurassist publication from our research group. Next, what does read-write interface look like? So there's two interfaces within CLS that you can read and write data to and from, right? So there's first is the chunk store, right?
Starting point is 00:05:54 And this is the raw device access. You can think of writing extents to disk, right? And then there's the KV store, right? And the KV store is actually a local instance of ROXDB on every object storage device in Ceph. And Ceph calls this the OMAP interface. And this is actually used for a lot of internal accounting in Ceph. It handles transactions. It handles writes and locks.
Starting point is 00:06:18 And it also handles, like, information metadata about the objects themselves. But in our case, right, we want to map tabular data to a device and offset. So in the first case, in the chunk store, we'd like to be able to take tables and put them on disk, you know, wherever we like, for instance. And then for the second case, for the KV store, we like to consider storing our tabular data directly in the KV store. For instance, you may want to store part of a table or some rows. And or you may want to store metadata in the KV store.
Starting point is 00:06:44 And metadata may be physical descriptions of where they are on disks or content-based description of what this table contains and other information like its schema, maybe recent access times, something like that. So there's a lot of metadata we can consider storing in this KV store. And the interesting thing is it's queryable. So at runtime, when you execute a method on an object, it can quickly consult its local rocks DB and look up metadata that might be interesting to help with query processing. So here's a kind of a rough hand wavy example of some of the functions available. This is read and write, and this is pseudocode.
Starting point is 00:07:18 So you can see for those methods, there's a CLS CXX read that basically takes a IO context and stuff, an offset and a length to read from. And then similarly for write. Then there's one for replace. This turns out to be very useful in our case if we want to truncate tables or overwrite tables and things like that. Other metadata interface is the extended headers. So there's X headers, and you can see there's a set method there and a get method as well. You can also stat an object, for instance.
Starting point is 00:07:50 So you can really get a lot of runtime information by querying the extended attributes. And then lastly, to utilize the local ROC CV instance, we can use the OMAP interface. So this example shows the CXX map. There's a set in getVals, takes the context as well, and maybe a map of keys and values. And then similarly, getVal, right, you can define a key and a buffer and it'll return that value back to you. And then there's an interesting getVals interface,
Starting point is 00:08:22 and this takes as input the start key. So you can define a – remember, this is RocksDB, so these are sorted string tables. So you can define a string key to start from in the sorted order of string keys, a number of maximum number of keys to get, and there's your map where to store them. And then, interestingly, it will also tell you, true or false, if there's more keys remaining after you've already consumed the number of keys.
Starting point is 00:08:44 So you can kind of iterate over the key value map true or false if there's more keys remaining after you've already consumed the number of keys. So you can kind of iterate over the key value map and continue to search through things that you're looking for based on sorted order. All those things are actually going to be interesting and useful for us. So critically here, I want to mention the use of offset length. This allows, in Ceph, it allows you to read and write partial objects. So we can actually, kind of like I mentioned, we can split tables and put them or move them around on objects where we like for different reasons.
Starting point is 00:09:13 It will become clearer as we go along. And why would we want to do that? So here's an example. So assume we can have flexibility of the physical data layout within each object. So given this light blue object here, maybe we put a table inside. Maybe this is an entire table, for instance. We might want to do this. We might want to split it into two subtables.
Starting point is 00:09:35 We can consider these as different table subpartitions or maybe materialized views, maybe the subset of the first half of rows and the second half of rows. Anything like that you might want to split. And the reason you might want to do this is control the amount of the first half of rows and the second half of rows, or anything like that you might want to split. And the reason you might want to do this is control the amount of I.O. rather than reading the entire object and all the table of two. So you might want to read parts of the table. How might we define those parts at runtime and be able to look them up
Starting point is 00:09:59 is that we might want to put them in a ROXDB, right? So from this example, we could take the ROXDB. We can insert a key value pairs that define the physical location, the offsets of each of these subtables. And so this allows us to look up the local position and just read the minimum amount that we need to off disk in some cases. And another thing that we might want to do once we have this partial rewrite capability is we might want to index the content of these subtables. For instance, we want to keep track of every row that contains the word hat,
Starting point is 00:10:31 and we keep a pointer of the row number and also to which subtable, and then we know the physical offset length, so we can go look up that subtable 1 in this case, right, and go right to the row number that contains the word hat, right? We don't have to do a whole read of the entire table or scan every row looking for hat we might have stored that in a rocks db so using these two things together gives us a great amount of flexibility for query processing at runtime if this is at the object level as well and note that individual objects are all different right they can have different
Starting point is 00:11:02 offsets and lengths and you can lay out data however you like, which we'll talk about. So here's another example use case, right? More concretely, imagine you might want to create a custom write method in CLS to create thumbnails. And this is a partial example taken from Noah Watkins' fluid talk. Here we take a thumbnail of a picture there. I mean, a picture we write it to disk in the object, and maybe when we write that, we also want to write these thumbnails.
Starting point is 00:11:31 So we write three thumbnails. Then we want to build an index on this information. So now we would use our RocksDB and develop a custom read-write method so we would read each picture, and maybe we apply some kind of image filter software that tells us whether or not the image contains mountains, and then we store the information in ROXDB. And later, we might want to query the data, so we have a custom CLS read method, and our read method is like a query that would, say, return a thumbnail if it contains mountains.
Starting point is 00:12:06 And so we can quickly look up the rocks DB and see if that's true or false and go find the image that we're looking for and return that thumbnail. Otherwise, right, we would have had to read the entire object, the original image, full size, apply the object filtering and look for, say, mountains or whatever. And if it's true, then create a thumbnail on the fly, right? Also possible with the read-write method, right? You could generate a thumbnail on the fly if you didn't want to store it.
Starting point is 00:12:33 But given that we might have stored it, right, we can just go look it up, right? So we can save a lot of runtime cost there to read by doing this extra work ahead of time to match our workload. Here's another example. So a little bit more details there. If we look at on the left-hand side, there's a C++ interface. On the right-hand side, there's a Lua interface. And this is for computing an MP5. This is Digest.
Starting point is 00:13:01 And the signature looks like I mentioned before. There's a context handle. There's an input and output buffer list. And here you can think of the input buffer list as really the parameters to this MD5 function, for instance, if you wanted to pass parameters. You can see there first this method uses the stat function. It then creates a read. So it reads the entire object in this case. And then it applies this digest function.
Starting point is 00:13:32 And then at the end of that, it appends it to the output. So that buffer list out is what's actually going to be stored as output from the object and come back to the user. So that's kind of a quick summary of a couple examples and some code examples as well. And on the right-hand side is similar as a Lua interface for that same function. So in our case with the CLS usage model for Skyhook, it's going to be methods that object you directly on the local data. These are custom read-write methods that, in our case, perform these data transformations, specifically like for query processing.
Starting point is 00:14:09 So you can think of transformations, local data, select and project. You can, of course, do other transformations like compression and other things like that. We're also going to use those transformations for physical design or data layout. So you can think of converting data from format A to format B
Starting point is 00:14:24 or maybe laying it out differently on disk. We provide a logical table abstraction and then associated physical layout of that table abstraction is mapped onto an individual object or each of the objects. And then, as mentioned, we're going to store the table metadata in the local RASDV instance. And this allows us to query that metadata at runtime. And then, as mentioned, we're going to store the table metadata in the local rocks.db instance, and this allows us to query that metadata at runtime. And this is together how we sort of exploit the capabilities that are available in CLS.
Starting point is 00:14:58 So now that I mentioned what Skyhook wants to do with it, let's look at an actual Skyhook pseudocode or more or less pseudocode example here, a C++ snippet. So to register your class and method, right, you can define a class name, in this case tabular. You define a class handle and a handle for each method. For instance, we have an execute query op method. We have a build index method.
Starting point is 00:15:20 There can be many others. And then in your init function, right, you register your tabular class with its handle. Then you register your method. So notice that exec query op is registered as a CLS method read. So this is a read method, which basically means we're only going to read data off of disk. And then in addition to that, if you look at the build index method, it's a read and write method. So build index isn't really going to return any data to the user, right? It's going to read the local data on a disk, compute some metadata, extract some metadata from it, and it's going to store that in the local rocks DB. So that's why that one's registered as a read and write method. And then on the right-hand
Starting point is 00:15:57 side, example of the query op, for instance, again, there's the context, right? So there's a handle. There's an input and output buffer list. And again, the input contains the serialized user request. So imagine we have to somehow express how will we tell the object function to execute a query op, say what columns you're interested in or what predicates could be applied, right? So what is the query schema? What is the data schema? What are the filters you'd like to apply? So in some way, you have to encode that into the buffer list. So in our case, we have this query op struct. There's many other possibilities. And this struct encodes those things that I said, the query schema,
Starting point is 00:16:35 the projections and filters, other information like use an index or not, things like that. And then we decode it, basically. So now we have to deserialize it out of the input buffer list, and that gives us our query op instructions. And then from there, we have this dot, dot, dot, just to indicate we can make decisions what to do after we examine the instructions. For instance, we may look up metadata, right, so use the OMAP interface to get keys or values. Then we may decide to read local data, right, at some offset or length. As we said,
Starting point is 00:17:04 we might have looked that up and decided, oh, we know exactly where that row sits on disk, and we can look it up at that particular position. And then maybe we process data to apply those filters and such. And then we can append that output result to the output buffer list and giving it size. And this is, again, how you set up the data to be serialized and sent back to the user.
Starting point is 00:17:26 Okay. We're putting it all together, right? In summary, right, this is data partitioning and layout. That's a big part of Skyhook. So we're going to physical map data, table data on objects. Remote processing, right? This is an offloading done by these custom CLS methods, and I just gave you an example, for instance, to execute query ops or to do data transformations on disk.
Starting point is 00:17:51 And lastly, remote indexing and queryable metadata. So other custom CLS methods, right, that utilizes local ROCKCB instance for all these features that I mentioned before. And put that all together, we can get a pretty interesting data management system. And next, let's examine the architecture. So at the bottom, let's start at the bottom, there's a Ceph cluster. For instance, it has three OSDs, or object storage devices. These OSDs store these little green rectangles here that represent objects. As I mentioned, each OSD has its own local instance of RocksDB,
Starting point is 00:18:33 so you can see that is there. And then we define this here as a self-cluster with our Skyhook DM CLS extensions installed. So as I mentioned earlier, these extensions are compiled into shared libraries and they're distributed to each OSD. So these are available at runtime for all the OSDs to use if needed.
Starting point is 00:18:54 And so that's the Ceph layer, right? And on the right-hand side, we just mentioned these objects are the entity that provides this local data processing. So no need to read every object, maybe just a few of them, whatever the application is interested in. And then RockDB gives us the variable metadata. Okay, and then on the upper layer, there's some data management application, right? So this can,
Starting point is 00:19:15 there's many forms of this, which we'll discuss later. But the point of that is the application wants to execute RADOS methods on the object, right? So it's going to call these skyhook methods that are available, right? And define them by class method and input and output buffers, right? So again, these are the parameters. So that's kind of what you'd like to do is you'd like to have parallelism there in the data management application
Starting point is 00:19:37 by issuing a lot of reads or writes, right, to the sleep objects directly. You can interact directly with objects, right? So that's literally a direct interaction with an object via libredos and stuff. What are the benefits of Ceph? Of course, as we already mentioned, it's extensible. We showed how that's useful to us
Starting point is 00:19:57 or how it's going to be useful for us. Of course, it's also reliable, adaptive, and distributed. It's a reliable system. It utilizes primary copy replication. It's very scalable as well, right? You can add or remove nodes, and objects are automatically redistributed across the OSDs. Then lastly, because we're interacting directly
Starting point is 00:20:20 with the LibreDOS object library itself, right, so we can talk and execute methods directly on an object, we get this inherent parallelism through the number of objects in SAF spread across the number of OSDs, right? So you can kind of think of all this system resources that's available via the application layer, right, calling into the Redos object and executing that.
Starting point is 00:20:43 So any application there can take advantage of those functionalities by offloading it to using the Redis function call that we described. What data management application is there? What would this look like? You can think of a client-side interface to the LibreDOS classes that
Starting point is 00:21:01 we just showed. There's a number of approaches if possible, of course. You can consider distributed processing application frameworks, Spark or Dask or others. For instance, Spark RDD partitions or Dask tasks. Each task can interact with one or more objects directly, right? So you can get a high degree of parallelism by adding a lot of tasks. Databases have an external table interface. This is widely available in databases. For instance,
Starting point is 00:21:31 in Postgres, this is called the foreign data wrapper. And these allow you to interact with data that's stored remotely, i.e. not stored and managed by the database itself, but some external entity. And then also you can consider file APIs that map the API for the file back onto themselves. You can think of it as a pass-through, really, right? One recent example of this is HDF5's virtual object layer revolve. So we've done a little bit of work with that as well. It's quite interesting.
Starting point is 00:22:00 So this looks similar in some ways to an external table interface. We're bringing back the architecture picture. What does this look like? In the end, well, at the top level there, through the data management application, you can get IO parallelism by assigning tasks to do these reads themselves. And at the Ceph side, you can kind of get CPU parallelism. So your data management application can offload some of the processing tasks onto the storage system, so each object can do a little bit of that task
Starting point is 00:22:33 on its own data, and you effectively offload the CPU from, say, one task across many, or one application across many objects in the storage servers, and you can utilize their local memory and CPU. Okay. Data format, partitioning, and access messages. This is crucial when I talk about the generic nature of how we want to scale up to evolve.
Starting point is 00:23:02 As I mentioned, we're going to utilize fast memory serialization formats. For instance, Google Flat Buffers or Flex Buffers. We've gone through several iterations here, and we're still moving forward. But, for instance, in Google Flat Buffers, right, fields can be accessed without parsing or copying or deserializing in object allocations. That's kind of a benefit of Flat Buffers over, say, protocol buffers
Starting point is 00:23:24 or other things like that. And flex buffers is the same as flat buffers. It's the schema-free version of that. It allows you to be a little bit more dynamic if you don't know exactly the schema when you originally store your data. Another format that we've been converging to is Apache Arrow.
Starting point is 00:23:42 So we added this also last year. And effectively, this makes Skyhook an example of an Arrow-native storage and processing system. And we're still evolving toward this to use more of the Arrow APIs, but right now we can actually store Arrow data and process it natively inside Ceph objects. Arrow, of course, is very popular for data exchange and in big data world.
Starting point is 00:24:07 Very recently, it's been a stable release version, which is quite nice. It's the first version 1.0. There's also now a compute API for Arrow tables. So the compute API will allow you to do things like select and project and other user-defined functions. And more recently, there's a dataset API, or very recently,
Starting point is 00:24:30 and this provides a table abstraction over a collection of remote files, or you can think of this as file fragments or fragments of tables that are stored in different formats. So this is something that we're working towards ourselves, and it gives you an abstraction of a table at a high level over many, many pieces, partitions of a table, effectively. These are called fragments in this terminology. That's a bit about format.
Starting point is 00:24:53 Now we talk about partitioning briefly. So imagine that the input table on the left-hand side, right? Maybe we want to partition by column. So vertical or horizontal partitioning is pretty common in database systems. So in this case, we have columns, the first column maybe blue, second light blue, and the third one gray. And these become formatted partitions. So the blue column creates the blue object, middle light blue, and so on for the gray object.
Starting point is 00:25:21 In this case, column-based partitioning, we usually use arrow, but that's certainly flexible. Flat buffer is more suited toward a row-based access, but either are possible here. That's the interesting thing about the flexibility by embedding these generic third-party libraries. Or you might want to do horizontal partitioning, typically called row partitioning. So again, the input table, maybe the first three rows go together into one partition, and this is formatted into arrow or flat buffers for the object.
Starting point is 00:25:59 Same thing with the middle partition, the middle two rows and the last two rows there into the gray partition. In this case, when we do row partitioning, we actually use a jump consistent hash, which is a nice hatching algorithm. But other partitions are certainly possible. You could take a brain, your month, or something like that.
Starting point is 00:26:17 But the idea is putting a partition, formatting it with this given generic format, right, and then storing that data directly into an object, and that's the key feature here. The key properties of a partition, and now that we've sort of partitioned the table and mapped the object, right, the important thing is that the format must retain the data semantics, right, and this is really just the data schema. Right, so in Flatbuck we can define this itself. In Arrow, it's also defined internally to the table. There's columns in schema fields.
Starting point is 00:26:49 So that's important because the schema is needed at the object level so that objects can do that local processing based on that local method that we've defined. We don't want them to have to look up schema somewhere else or shift the schema in all the time. These schemas are self-defined and it's available locally preferably.
Starting point is 00:27:06 Importantly, object names are generated, right? So we don't want to deal with hundreds and thousands and millions of objects and store big lists of names, right? We'd like to be able to generate names in some regularized naming pattern. And this turns out to be important at scale. And of course, a simple naming pattern would be, you know, table T dot partition one, table T dot partition two, things like that, right? We can think course a simple naming pattern would be, you know, table t.partition1, table t.partition2, things like that.
Starting point is 00:27:27 We can think of very simple naming patterns that allow objects to retain prefix of name and even subdivide further given a prefix. It's pretty flexible with name generation, or the feature of name generation gives a lot of flexibility. And then based on name, objects are distributed by Seth based on the name. So Seth placed them and then we'll put that object onto a server based on its name. And this means that the object location is not needed to be maintained by Skyhawk at all. In fact, so we don't have to maintain a giant list of names. We can just retain a name generator function. So this constrains greatly the amount of metadata that is needed to be stored
Starting point is 00:28:11 in order to generate names and look up objects because they're just name-based retrieval. So when you want to read it, you just give the name of the object and Cephal will write it directly to the right place. We don't need to know where it is physically. So what does this look like in our architecture? Just to briefly cover it, placement is done using the crush algorithm, hashing algorithm. Objects are mapped to placement groups based on their object name, as I mentioned, and placement groups are handled in primary copy
Starting point is 00:28:40 replication, so primary replica, secondary, and third replica, and so forth. They provide isolation for fault tolerance, so the idea of placement groups is to sort of isolate isolations that create isolation zones. And it's pseudo-random as well for load balancing, right? So this attempts to give a uniform distribution of
Starting point is 00:28:59 objects across all OSDs, right? So this hopefully gives us a nice distribution of similar number of objects across servers. In this example, not a very good example of uniform distribution, but you can see a couple objects on the first OSD, maybe one on the second and maybe three on the third. So that's how they would be distributed and what we would like.
Starting point is 00:29:19 Data processing, right? We consider methods and have implemented things like select, project, aggregate, group by, and sorting. So select is row-based, project is column-based, aggregation is maybe some computed value. There's grouping, and you can think of a sorting also could be very useful at the object level. And interestingly, right, as we move to the Arrow Compute API, once we're in progress of that, we'll be able to support even more data processing and leverage all of the work done by that community the Arrow Compute API, once we're in progress of that, we'll be able to support even more data processing and leverage all of the work done by that community
Starting point is 00:29:47 on the Compute API so we don't have to have those functions defined. And also notably, the Compute API supports user-defined functions, which is a very nice feature. So given the processing methods, what metadata might we want to store in Rastabee? In addition to things we've already said, like physical offsets, logical content, think of it as creating an index on various columns.
Starting point is 00:30:18 You can also consider creating text indexing. That can be very useful in some cases for log data storage. Another important thing to consider is column statistics. So we recently implemented this, and you might want to implement statistics or even access patterns, right? So if object or a particular range or column of an object is accessed frequently, you may want to keep track of that in RockDB as metadata. And at some later time, query it and decide, make decisions about what to do. We want to update it differently and so forth. With statistics, right, the value distribution of columns is really important for query optimization. So the idea would be typically you'd estimate the selectivity, right,
Starting point is 00:30:57 so if you're looking for rows with the word had, for instance, right, there's only one or two of them, right, it makes sense maybe to look it up in an index. Otherwise, right, if there's going to be many of them, you might as well just scan all of the rows, right? So you might choose a scan plan over an index plan. And in our case, this means, you know, read from physical length on disk versus look up in the ROCKCB
Starting point is 00:31:16 and then just go get the particular row of interest. And importantly, as we showed with the data semantics are embedded and such, and this object local metadata, this means that each object can sort of optimize itself. So dynamically at runtime, it can make decisions about where to read and write data from or what to do with looking at data and index and so forth. This becomes critical. And because this object is the entity, any functionality that is defined on the object,
Starting point is 00:31:42 it moves with the object as well, right? So there's no problem when you scale up and down the number of nodes. So then the question becomes when to push down or offload processing to the storage or somewhere, in this case, directly to the objects, right? So currently we just decide, yes or no. However, it could be a runtime decision, right, by the query optimizer, right? You certainly don't have to apply your special read method. You could execute this traditional set read method based on cluster knowledge.
Starting point is 00:32:11 If you know there's a lot of busyness users in the system, there's not a lot of extra capacity and storage to handle any processing methods, you might not want to push down anything. However, what if you do push it down and the storage server happens to be overloaded and the query optimizer didn't know that? That's more of a dynamic state, it might not be aware. So in this case, we want the object to be able to reject processing, which we'll call pushback, right, so we're actually working on this mechanism, and it's a really key consideration when offloading in our framework, right, because
Starting point is 00:32:37 if we want to force the object to do a read and then process some data as it's reading, right, that object is busy that entire time. So if we are unable to do it or don't have the capacity to do that, we can just do the standard read and ship it back to the client and let the client apply the operation there. Or it's some intermediate layer you can think of. Briefly, I'll talk about some physical design optimizations just to give a bit of an idea. This is a long-studied problem in databases, and of course good physical design is really crucial for workload performance.
Starting point is 00:33:12 And typical features of a physical design are tuning process, right? It includes partitioning, so let's say columns, rows, data formats, right, depending on what your usage model is, data layouts, indexing and materialized views. These are typical components of physical design databases that we can apply these here. So, for instance, a local data layout might look something like this. So if we consider an arrow table format, right, an arrow table is a schema and a sequence of record patches. Bringing back this object picture, right,
Starting point is 00:33:45 the object with subtable one and subtable two, and then the ROXDB pointed to the offsets, for instance. Well, given the arrow schema, we might do something like this. And this looks exactly like that. We might lay them out on disk, the schema and the record batches, and the ROXDB could point to individual record batches, right? And the idea here would be, you know, we can read these one at a time off disk
Starting point is 00:34:05 or just read the ones that we want, and we can conserve the amount of memory and disk IO for any given read. Another example, right, except those record batches, right, we might want to lay out columns slightly differently. For instance, maybe column one and two are frequently accessed together, and we keep those together,
Starting point is 00:34:23 but then we put column three, you know, further down offset on disk. And we just keep track of all that stuff in RSTP. Okay, local versus distributed transformation. So when we think of physical design, I kind of showed you a local example. Go back real quick. This is within an object, right? Just rearranging the data inside the object.
Starting point is 00:34:50 Another way to think of this is to dynamically adapt between row or column layouts, right? So we may change from row layout in an object to a column layout in an object. And that would be to improve performance for a given workload. We might be reformatting the data from, say, flat buffers to arrow format or physically reorganizing data, as we showed. And a distributed transformation may redistribute or gather columns across objects. So you may have row-based objects, and you'd prefer to have done, say, column-based partitioning or row-based partitioning in objects. So each object has a set of rows, and you may rather have done columns so that each object has a column, for instance. We like to do distributed transformation, which is effectively a gather from
Starting point is 00:35:29 many objects into one object, or one object to many objects. This allows us to peel out rows or columns from individual objects and move them to others. We've done this in a prototype version in a paper there below, and the current version is in progress.
Starting point is 00:35:48 We call this RADOS Remote Reads. Then we talked a little bit more in detail in our paper about physical design management and storage based on cost models. There's details that can be found there. But you can think of, as I said, that RocksDB might keep track of access patterns or interesting pieces of data, interesting patterns of queries. And then you may want to do cost modeling to decide if you want to transform the row, the column, or do a local transform,
Starting point is 00:36:18 or maybe distributed transform and distribute those columns across multiple objects. Just a quick picture on the left is LOFL. So you take an object that's a row-based transform to column-based. On the right-hand side is a distributed transform. You take that row-based object, and now you ship its columns off to independent objects
Starting point is 00:36:37 or maybe group them together. There's a lot of flexibility with this. Okay, so let's talk about some experimental results briefly here. So for setup, we typically use a TPC-H line item table. We are looking at 750 million rows. This is their generator. We put those in 10,000 objects, so each object has an equivalent number of rows. These objects are then spread across all the set of OSDs by Ceph, not by us. This is just stored in Ceph and distributed by
Starting point is 00:37:09 the question algorithm that we mentioned. The data formats we consider currently are flat buffer and flex buffer for our row-based formats and arrow for our column-based formats. As I mentioned, we're moving more toward arrow in the dataset API exclusively.
Starting point is 00:37:25 If it's fine, queries here in this case, we're moving more toward Arrow and the dataset API exclusively. If it's fine. Queries here, in this case, we look at select projects and simple queries, and we rarely show activities from 1 to 10 to 100%. And the query would look something like that, select star from line item, where extended price is greater than some value. And that would return, say, 1% of the data.
Starting point is 00:37:44 For hardware, we use NSF's CloudLab, which is a bare-metal as-a-service infrastructure for academia. We use, actually, hard to describe in this case. And our Ceph deployment is installed with Skyhook DM extension. That's those shared object libraries that have our methods. And then we can use a simple client-side driver so that we can focus on the storage processing. And either we process it in the client or with the same code,
Starting point is 00:38:08 or we push it down to the storage using our extensions, right? And we call that rados execute on our mass events for each of the 10,000 options. Scalability. Let's look at this briefly. On the x-axis is the number of storage servers. So this is as we scale out the number of Ceph servers from 1 to 2 up to 16. We can look at a slight 1% from that table that we mentioned, a query.
Starting point is 00:38:34 And the left-hand side there on the y-axis, there's the execution time. So we can see that, you know, as it scales out, the execution time goes down. And the black bar represents no processing, right? So this is if we didn't push down anything, right? And in the gray bar is if we did the push down, calling our object storage methods. So let's focus in on this briefly here and see what's going on here. I think it's interesting.
Starting point is 00:39:03 So in this case, the query execution time, previous case, I'm sorry, is very similar, right? It's about the same. Which means maybe there's some other bottleneck in the system. But let's look at did we achieve any of our goals, which is offloading some CPU from the client side onto the service.
Starting point is 00:39:21 So we look at these graphs. On the left-hand side, this column is the client sideside CPU usage over time, like the first 60 seconds of a query. And on the right-hand side is the server-side CPU usage for each of the eight servers, and it's stacked there. So at the first row, we call this a baseline. So on the top row right, there's read cost only.
Starting point is 00:39:40 There's no query. This is just the cost. If you didn't push down anything, you didn't run any query, you just did a regular read of those 10,000 objects. And you can see the CPU percentage of the client is at about 5% there. And on the servers, it's similar, about 5%
Starting point is 00:39:52 stack. The next row down is selecting 1%. So this is no offload into storage. We're just going to use the regular read and then apply the selection filters at the client side. And you can see that the client CPU actually goes up, right? So the storage side looks about the same or nearly identical
Starting point is 00:40:12 because it's not doing any work. It's just delivering the regular bytes, doing a regular read and sending it back, and the client's actually applying the selection query. And then on the bottom row, right, we can see what if we offload it to storage and use our methods. So in the first column, you can see the client machine now. The CPU has gone down very, very low, near zero, in fact.
Starting point is 00:40:36 And on the right-hand side there, the storage machines, if we look at the stack bars, right, it went up a little bit, not a whole lot, but it did go up a little bit, right? And so this shows we have effectively offloaded work from the storage, from the client to the storage servers. But interestingly, what's going on with the client machine and why it goes down to zero, right? Consider that the top graph for the client is also not doing any query,
Starting point is 00:41:02 but it's doing a lot of work because it has to receive all of those packets and data from the storage servers, the eight storage servers. That's the top row. In the bottom row, the client isn't receiving many packets because on the right-hand side, the storage servers have already filtered the data out. So only 1% of the data is coming back,
Starting point is 00:41:19 which means effectively 1% of the packets. So there's not much work for the client to do to just receive those few packets. And you can see that it's not trivial, right? The packet building and receiving is not trivial at all, right? So it's interesting to consider that if we're going to be building packets and wasting CPU cycles, perhaps we should use them to do some data processing and filter out data before we send it back, because we might want to do useful work rather than just building packets that where half the data is going to be thrown away eventually anyways, or in that case, 99% of the data.
Starting point is 00:41:51 So that's just one example of CPU offloading. So what about network offloading? So similarly here, the top row, we have column one as a client and column two as one server, not stacked in this case, it's just one server to look at the data coming back. And you can see with no offload to storage across the top, the client is receiving a lot of data. There's a high amount of data coming back and the client is receiving it. And on the server side on the right on the top, the server is sending back a bit of data, this particular server. We just look at one here instead of stacking them, but you can see that one server is sending back a bit of data, this particular server. We just look at one here instead of stacking them,
Starting point is 00:42:25 but you can see that one server is sending back a bit of data. Now, in the bottom row, we can see exactly what happens when we offload the storage, right? The client now is not receiving much at all, very, very little. And on the right-hand side, the server is not sending much data. So that's back down to this 1% query, so only 1% of the data is coming back. So we've not only offloaded IO, data processing to servers,
Starting point is 00:42:50 reduced IO, but we reduced the work to send and receive the IO as well. So we've kind of achieved the goals of offloading to the storage system here in this example you can see. Next, let's go back to this eight servers again and examine this a little bit more. Something interesting going on here that I want to talk about for selectivity. So if we consider on the left-hand side, we have storage side processing, right, on the x-axis. And on the right is the client-side processing.
Starting point is 00:43:19 And all of these costs are similar, right? The execution time is on the y-axis. You can see the cost is about 100 seconds for all these cases, right, more or less. But there's something interesting going on right there at that bar. It's a little bit higher. So what's happening there is this is the extra work for 100% selectivity. So the storage has to read and scan and apply the filter to every row. Every row passes, it has to basically do all that work and build all the data and send it all back again, right?
Starting point is 00:43:45 So you're doing a lot more work by filtering all those rows that don't get filtered out. They all match. So this argues for the need for statistics. Maybe we don't want to read them all. Maybe we want to do an index. Okay, next we'll look at briefly local versus distributed transform. So we call local, distributed, and client-side transform. And this is the execution time to do that transform.
Starting point is 00:44:09 You can see the local and distributed, these happen inside storage. The local doesn't use any network. Distributed does use network because objects send data back and forth to each other. But it all stays within a storage layer. And on the client-side transform, as if we would do this row to column by reading it into the client, transforming it and running it back down. You can see the transform cost is reduced by keeping it within storage. What is the benefit of that? So assuming we wanted to transform for a good reason based on a cost model, right now let's look at a query that does select extended price. So we're
Starting point is 00:44:40 looking at one column from this table. And then on the y-axis, we have execution time. So we start out with row format. That's the black bar. This is before the transform. That's the cost of the query. And then after the transform, the middle gray bar, this is reading that column. And on the right-hand side, you can see the picture. We have these rows organized back into columns.
Starting point is 00:45:05 And then what if we did something different, which we mentioned earlier, if we took those rows and organized them into, say, one column or independent columns on disk, right? So now the execution time, if we just store the extended price column by itself, we can peel that column right off of disk, put it in memory, and return it. So this shows quite a bit of benefit with transformations. For ongoing work,
Starting point is 00:45:24 we have a lot of things happening right now with Skyhook. We're adapting to the Apache Arrow dataset API, which I mentioned. This will bring us even more toward end-to-end Arrow native processing. We interact with the Arrow committee for feedback, and we're trying to create a Libratos fragment. It's a Libratos fragment class. And eventually, we'd like to maybe push this upstream into the dataset API.
Starting point is 00:45:48 So that's our goal for that. It's a big community, and it's exciting work for Skyhook. We're also working on Rados reads from remote objects. So this is this collector distributed sort of transform that we mentioned. We now have deployment via Kubernetes and Rook
Starting point is 00:46:02 on CloudLab as well as other systems. And we're expanding our work on data indexing for Arrow, right? So we're moving more toward these Arrow native operations now. And I'll do a real quick plug for CROSS. This is the Center for Research in Open Source Software that this project is supported by. It's an incubator stage, so this goal of CROSS is to bridge the gap between student research and open source project. It's been funded by an endowment from Sage Weil, who's the CEF founder, and other corporate memberships.
Starting point is 00:46:38 Right now we have Fujitsu, Seagate, and Keopsia. So if you're interested, please come talk to us in any of the work that we're doing. Cross is leveraging this open source culture in university research, and also supports graduate research for graduate students. And in this example, it incubates work beyond graduation, right, toward viable commercial products, right? There's a couple examples here in the Skyhook DM project that I've been talking about.
Starting point is 00:47:03 Another one is the Popper project, right. This is a container native workflow execution engine that focuses on reproducibility in science. Cross is directed by Carlos Maltzan. And you can look at the Cross website for more information or contact us directly. We really appreciate your interest. Okay, move on. And finally, my acknowledgments are for the Cross Center NSF grants, the Iris Hepp Software Institute. This is the Institute for Research and Innovation in Software for High Energy Physics. They also support our work. Some of that work is done through CERN. Our Cross corporate memberships, as I mentioned, Fujitsu, Seagate, and Kyoxia, and everyone who's contributed to the Skyhook project, especially Noah, Michael, Ivo, and Ken.
Starting point is 00:47:51 We really appreciate all the hard work. We've had a lot of internal and external students, as well as Google Summer of Code fellows for two years now, Iris Hepp fellows as well, and master's projects and thesis based on work done in Skyhook. So thank you very much. You can visit the skyhookdm.com website for more information, and my email is there below.
Starting point is 00:48:12 Thanks for listening. If you have questions about the material presented in this podcast, be sure and join our developers mailing list by sending an email to developers-subscribe at sneha.org. Here you can ask questions and discuss this topic further with your peers in the storage developer community. For additional information about the Storage Developer Conference, visit www.storagedeveloper.org.

There aren't comments yet for this episode. Click on any sentence in the transcript to leave a comment.