Storage Developer Conference - #152: SkyhookDM: Storage and Management of Tabular Data in Ceph
Episode Date: August 24, 2021...
Transcript
Discussion (0)
Hello, everybody. Mark Carlson here, SNEA Technical Council Co-Chair. Welcome to the
SDC Podcast. Every week, the SDC Podcast presents important technical topics to the storage
developer community. Each episode is hand-selected by the SNEA Technical Council from the presentations at our annual Storage
Developer Conference. The link to the slides is available in the show notes at snea.org
slash podcasts. You are listening to SDC Podcast, Episode 152.
Okay, hello there and welcome. Today I'm going to talk to you about Skyhook Data Management Project.
This is storage and management of tabular data in Ceph.
I'm Jeff Lefevre.
This is work also with Professor Carlos Monson at the Center for Research in Open Source Software at UC Santa Cruz.
So the Skyhook Data Management Project, it's open source.
It's LGPL 2.1 license.
And crucially, it's built on top of Ceph.
So Skyhook is computational storage for tagging their data effectively.
And we like to say extensible, scalable, and generic.
These are properties that are desirable for us.
It's extensible because we're going to build on top of Ceph using Ceph's built-in extension mechanism.
We'll talk a bit about that, about classes and methods that users can define.
And it's scalable because these methods
and classes are defined on objects, right?
And objects are part of the core
entity of Ceph, and Ceph is a very scalable
system. So we provide this
classes and methods and objects, and as
they scale out, so does our
functionality.
And lastly, we like to keep it generic,
right? We like to not have a one-off sort of computational storage solution,
but a more generic approach, right?
And in this case, we're going to consider formatting our data in Google
Flatbuffers or Apache Arrow, and these are efficient serialization libraries
for memory access and allow some computation as well.
And in this respect, they're generic in that we can utilize these third-party
libraries that are also open source.
So the design goals,
as I mentioned,
is store process
and manage tabular data.
We'd like to dynamically
offload computation
to storage servers
with our methodology.
We also want to dynamically
reorganize physical data
configuration.
What we mean by this
is on disk or on cross objects,
how do we lay out the data?
And with the overall goal of reducing CPU resources
of the client, so offload some work to the storage,
and reduce network IO back to the client.
So reduce the amount of network that's injected
into the system.
We take a programmable storage approach.
For more information on this, you can see a page from our lab, programmability.us.
That's from our research lab.
And essentially, programmable storage means to combine, expose,
or extend existing software services toward new functionality that's desired.
In our case, we're going to develop data management methods on objects, as I mentioned,
and this will allow us to do in-storage execution of tasks.
And to do this, we're going to embed
external libraries in storage. For instance,
like I mentioned, Apache Arrow or
Google Flatbusters.
And importantly, we're going to interact directly
with Ceph's Liberados library. This is the core
object library, and all of the other interfaces
in Ceph, say CephFS or
Redos block devices, those are actually built
on top of the RADOS.
So since this is a storage developer conferences, I want to get
into a little bit more of the details of exactly how
this is done and some of the code
snippets and examples for you.
Most importantly, we're
going to look at the user-defined extensions
and stuff. In our case, we'll utilize this object class mechanism, also dubbed CLS.
And this is an extensible framework for objects.
So you can think of this as user-defined read-write methods that you can inject into the system.
This is the source tree at source CLS.
You can look at them there.
These classes are defined
and the methods are executed directly on objects,
directly by objects.
And these methods sit inside shared libraries
that are distributed to all OSDs,
so they're available to any OSD server
to execute on an object that's stored locally.
And again, the CLS classes,
these are utilized by Ceph internals, right?
This is an existing interface inside Ceph.
For instance, CephFS and RadoskAway and others actually use CLS quite a bit.
So it's a robust interface.
For example, here's a quick picture of the tree.
It's pretty active.
This was just from a week or two ago.
And you can see, as I mentioned, there's CephFS, there's journaling, there's Lua.
There's a Lua interface also.
There's RBD. And's journaling, there's Lua, there's a Lua interface also. There's RBD.
And each of these represents a class, and inside that folder is the class and its associated methods.
We'll get into this a little bit more.
There's also an SDK for users to develop their own object classes.
Then the question is, are programmers actually using this, right?
Are they finding things that they want to do that don't exist,
or are they combining and extending services toward their usage?
And the answer is yes.
So this is a graph from a paper from a group in our lab,
and it shows the growth in object classes and methods over time.
So on the x-axis is 2010 to 2016,
and on the y-axis on the left-hand side is the interface count.
This includes the classes and methods,
and roughly there's lines of code on the right-hand side in C++.
You can see that over time,
our users are actually extending and writing their own classes.
For more information about this
and more examples of programmable storage,
you can reference this Eurassist publication
from our research group.
Next, what does read-write interface look like?
So there's two interfaces within CLS that you can read and write data to and from, right?
So there's first is the chunk store, right?
And this is the raw device access.
You can think of writing extents to disk, right?
And then there's the KV store, right?
And the KV store is actually a local instance of ROXDB on every object storage device in Ceph.
And Ceph calls this the OMAP interface.
And this is actually used for a lot of internal accounting in Ceph.
It handles transactions.
It handles writes and locks.
And it also handles, like, information metadata about the objects themselves.
But in our case, right, we want to map tabular data to a device and offset.
So in the first case, in the chunk store, we'd like to be able to take tables
and put them on disk, you know, wherever we like, for instance.
And then for the second case, for the KV store,
we like to consider storing our tabular data directly in the KV store.
For instance, you may want to store part of a table or some rows.
And or you may want to store metadata in the KV store.
And metadata may be
physical descriptions of where they are on disks or content-based description of what this table
contains and other information like its schema, maybe recent access times, something like that.
So there's a lot of metadata we can consider storing in this KV store. And the interesting
thing is it's queryable. So at runtime, when you execute a method on an object, it can quickly consult its local rocks DB and look up metadata
that might be interesting to help with query processing.
So here's a kind of a rough hand wavy example of some of the functions available.
This is read and write, and this is pseudocode.
So you can see for those methods, there's a CLS CXX read
that basically takes a IO context and stuff, an offset and a length to read from.
And then similarly for write.
Then there's one for replace.
This turns out to be very useful in our case if we want to truncate tables or overwrite tables and things like that.
Other metadata interface is the extended headers.
So there's X headers, and you can see there's a set method there and a get method as well.
You can also stat an object, for instance.
So you can really get a lot of runtime information by querying the extended attributes.
And then lastly, to utilize the local ROC CV instance, we can use the OMAP interface.
So this example shows the CXX map.
There's a set in getVals, takes the context as well,
and maybe a map of keys and values.
And then similarly, getVal, right, you can define a key
and a buffer and it'll return that value back to you.
And then there's an interesting getVals interface,
and this takes as input the start key.
So you can define a – remember, this is RocksDB,
so these are sorted string tables.
So you can define a string key to start from in the sorted order of string keys,
a number of maximum number of keys to get,
and there's your map where to store them.
And then, interestingly, it will also tell you, true or false,
if there's more keys remaining after you've already consumed the number of keys.
So you can kind of iterate over the key value map true or false if there's more keys remaining after you've already consumed the number of keys.
So you can kind of iterate over the key value map and continue to search through things that you're looking for
based on sorted order.
All those things are actually going to be interesting and useful for us.
So critically here, I want to mention the use of offset length.
This allows, in Ceph, it allows you to read and write partial objects.
So we can actually, kind of like I mentioned, we can split tables and put them or move them
around on objects where we like for different reasons.
It will become clearer as we go along.
And why would we want to do that?
So here's an example.
So assume we can have flexibility of the physical data layout within each object.
So given this light blue object here, maybe we put a table inside.
Maybe this is an entire table, for instance.
We might want to do this.
We might want to split it into two subtables.
We can consider these as different table subpartitions
or maybe materialized views,
maybe the subset of the first half of rows and the second half of rows.
Anything like that you might want to split. And the reason you might want to do this is control the amount of the first half of rows and the second half of rows, or anything like that you might want to split.
And the reason you might want to do this is control the amount of I.O.
rather than reading the entire object and all the table of two.
So you might want to read parts of the table.
How might we define those parts at runtime and be able to look them up
is that we might want to put them in a ROXDB, right?
So from this example, we could take the ROXDB.
We can insert a key value pairs
that define the physical location, the offsets of each of these subtables. And so this allows us to
look up the local position and just read the minimum amount that we need to off disk in some
cases. And another thing that we might want to do once we have this partial rewrite capability is
we might want to index the content of these subtables.
For instance, we want to keep track of every row that contains the word hat,
and we keep a pointer of the row number and also to which subtable,
and then we know the physical offset length,
so we can go look up that subtable 1 in this case, right,
and go right to the row number that contains the word hat, right?
We don't have to do a whole read of the entire table or scan every row looking for
hat we might have stored that in a rocks db so using these two things together gives us a great
amount of flexibility for query processing at runtime if this is at the object level as well
and note that individual objects are all different right they can have different
offsets and lengths and you can lay out data however you like, which we'll talk about.
So here's another example use case, right?
More concretely, imagine you might want to create a custom write method in CLS
to create thumbnails.
And this is a partial example taken from Noah Watkins' fluid talk.
Here we take a thumbnail of a picture there.
I mean, a picture we write it to disk in the object,
and maybe when we write that, we also want to write these thumbnails.
So we write three thumbnails.
Then we want to build an index on this information.
So now we would use our RocksDB and develop a custom read-write method
so we would read each picture,
and maybe we apply some kind of image filter software that tells us whether or not the image contains
mountains, and then we store the information in ROXDB.
And later, we might want to query the data, so we have a custom CLS read method, and our
read method is like a query that would, say, return a thumbnail if it contains mountains.
And so we can quickly look up the rocks DB and see if that's true or false
and go find the image that we're looking for and return that thumbnail.
Otherwise, right, we would have had to read the entire object,
the original image, full size, apply the object filtering
and look for, say, mountains or whatever.
And if it's true, then create a thumbnail on the fly, right?
Also possible with the read-write method, right?
You could generate a thumbnail on the fly if you didn't want to store it.
But given that we might have stored it, right, we can just go look it up, right?
So we can save a lot of runtime cost there to read by doing this extra work ahead of time to match our workload.
Here's another example.
So a little bit more details there.
If we look at on the left-hand side, there's a C++ interface.
On the right-hand side, there's a Lua interface.
And this is for computing an MP5.
This is Digest.
And the signature looks like I mentioned before.
There's a context handle.
There's an input and output buffer list.
And here you can think of the input buffer list as really the parameters to this MD5 function, for instance, if you wanted to pass parameters.
You can see there first this method uses the stat function.
It then creates a read.
So it reads the entire object in this case.
And then it applies this digest function.
And then at the end of that, it appends it to the output.
So that buffer list out is what's actually going to be stored as output from the object and come back to the user.
So that's kind of a quick summary of a couple examples and some code examples as well.
And on the right-hand side is similar as a Lua interface for that same function.
So in our case with the CLS usage model for Skyhook,
it's going to be methods that object you directly on the local data.
These are custom read-write methods that, in our case, perform these data transformations,
specifically like for query processing.
So you can think of transformations,
local data, select and project.
You can, of course, do other transformations
like compression and other things like that.
We're also going to use those transformations
for physical design or data layout.
So you can think of converting data
from format A to format B
or maybe laying it out differently on disk.
We provide a logical table abstraction
and then associated physical layout of that table abstraction is mapped
onto an individual object or each of the objects.
And then, as mentioned, we're going to store the table metadata in the local
RASDV instance. And this allows us to query that metadata at runtime. And then, as mentioned, we're going to store the table metadata in the local rocks.db instance,
and this allows us to query that metadata at runtime.
And this is together how we sort of exploit the capabilities that are available in CLS.
So now that I mentioned what Skyhook wants to do with it,
let's look at an actual Skyhook pseudocode or more or less pseudocode example here,
a C++ snippet.
So to register your class and method, right,
you can define a class name, in this case tabular.
You define a class handle and a handle for each method.
For instance, we have an execute query op method.
We have a build index method.
There can be many others.
And then in your init function, right, you register your tabular class with its handle.
Then you register your method. So notice that exec query op is registered as a CLS method read. So
this is a read method, which basically means we're only going to read data off of disk.
And then in addition to that, if you look at the build index method, it's a read and write method.
So build index isn't really going to return any data to the user, right? It's going to read the local data on a disk,
compute some metadata, extract some metadata from it, and it's going to store that in the local
rocks DB. So that's why that one's registered as a read and write method. And then on the right-hand
side, example of the query op, for instance, again, there's the context, right? So there's a
handle. There's an input and output buffer list. And again, the input contains the serialized user request. So imagine we have to somehow express how will
we tell the object function to execute a query op, say what columns you're interested in or what
predicates could be applied, right? So what is the query schema? What is the data schema? What
are the filters you'd like to apply? So in some way, you have to encode that into the buffer list.
So in our case, we have this query op struct.
There's many other possibilities.
And this struct encodes those things that I said, the query schema,
the projections and filters, other information like use an index or not,
things like that.
And then we decode it, basically.
So now we have to deserialize it out of the input
buffer list, and that gives us our query op instructions. And then from there, we have this
dot, dot, dot, just to indicate we can make decisions what to do after we examine the
instructions. For instance, we may look up metadata, right, so use the OMAP interface to get keys or
values. Then we may decide to read local data, right, at some offset or length. As we said,
we might have looked that up and decided, oh,
we know exactly where that row sits on disk,
and we can look it up at that particular position.
And then maybe we process data to apply those filters and such.
And then we can append that output result to the output buffer list
and giving it size.
And this is, again, how you set up the data to be serialized
and sent back to the user.
Okay.
We're putting it all together, right?
In summary, right, this is data partitioning and layout.
That's a big part of Skyhook.
So we're going to physical map data, table data on objects.
Remote processing, right? This is an offloading done by these custom CLS methods,
and I just gave you an example, for instance, to execute query ops
or to do data transformations on disk.
And lastly, remote indexing and queryable metadata.
So other custom CLS methods, right, that utilizes local ROCKCB instance
for all these features that I mentioned before.
And put that all together, we can get a pretty interesting data management system.
And next, let's examine the architecture.
So at the bottom, let's start at the bottom, there's a Ceph cluster. For instance, it has three OSDs, or object storage devices. These OSDs store these little green rectangles here
that represent objects.
As I mentioned, each OSD has its own local instance of RocksDB,
so you can see that is there.
And then we define this here as a self-cluster
with our Skyhook DM CLS extensions installed.
So as I mentioned earlier,
these extensions are compiled into shared libraries
and they're distributed to each OSD.
So these are available at runtime
for all the OSDs to use if needed.
And so that's the Ceph layer, right?
And on the right-hand side,
we just mentioned these objects are the entity
that provides this local data processing.
So no need to read every object,
maybe just a few of them,
whatever the application is interested in. And then RockDB gives us the variable metadata.
Okay, and then on the upper layer, there's some data management application, right? So this can,
there's many forms of this, which we'll discuss later. But the point of that is the application
wants to execute RADOS methods on the object, right? So it's going to call these skyhook methods
that are available, right?
And define them by class method and
input and output buffers, right? So again, these are the
parameters. So that's kind of what you'd like to
do is you'd like to have parallelism
there in the data management application
by issuing a lot of reads or
writes, right, to the sleep objects directly.
You can interact directly with objects, right? So that's
literally a direct interaction
with an object via libredos and stuff.
What are the benefits of Ceph?
Of course, as we already mentioned, it's extensible.
We showed how that's useful to us
or how it's going to be useful for us.
Of course, it's also reliable, adaptive, and distributed.
It's a reliable system.
It utilizes primary copy replication.
It's very scalable as well, right?
You can add or remove nodes,
and objects are automatically redistributed across the OSDs.
Then lastly, because we're interacting directly
with the LibreDOS object library itself, right,
so we can talk and execute methods directly on an object,
we get this inherent parallelism
through the number of objects in SAF
spread across the number of OSDs, right?
So you can kind of think of all this system resources
that's available via the application layer, right,
calling into the Redos object and executing that.
So any application there can take advantage of
those functionalities by offloading it to
using the Redis function
call that we described.
What data management
application is there? What would
this look like? You can think of a client-side
interface to the LibreDOS classes that
we just showed.
There's a number of approaches if possible, of course.
You can consider distributed processing application frameworks,
Spark or Dask or others.
For instance, Spark RDD partitions or Dask tasks.
Each task can interact with one or more objects directly, right?
So you can get a high degree of parallelism by adding a lot of tasks.
Databases have an external table interface. This is widely available in databases. For instance,
in Postgres, this is called the foreign data wrapper. And these allow you to interact with
data that's stored remotely, i.e. not stored and managed by the database itself, but some external
entity. And then also you can consider file APIs
that map the API for the file back onto themselves.
You can think of it as a pass-through, really, right?
One recent example of this is HDF5's virtual object layer revolve.
So we've done a little bit of work with that as well.
It's quite interesting.
So this looks similar in some ways to an external table interface.
We're bringing back the architecture picture.
What does this look like?
In the end, well, at the top level there, through the data management application,
you can get IO parallelism by assigning tasks to do these reads themselves.
And at the Ceph side, you can kind of get CPU parallelism.
So your data management application can offload some of the processing tasks
onto the storage system, so each object can do a little bit of that task
on its own data, and you effectively offload the CPU from, say,
one task across many, or one application across many objects
in the storage servers, and you can utilize their local memory and CPU.
Okay.
Data format, partitioning, and access
messages. This is crucial when I talk
about the generic nature of how we want
to scale up to evolve.
As I mentioned, we're going to
utilize fast memory serialization formats.
For instance, Google Flat Buffers or Flex Buffers.
We've gone through several iterations here, and we're still moving forward.
But, for instance, in Google Flat Buffers, right,
fields can be accessed without parsing or copying or deserializing
in object allocations.
That's kind of a benefit of Flat Buffers over, say, protocol buffers
or other things like that.
And flex buffers is the same as flat buffers.
It's the schema-free version of that.
It allows you to be a little bit more dynamic
if you don't know exactly the schema
when you originally store your data.
Another format that we've been converging to
is Apache Arrow.
So we added this also last year.
And effectively, this makes Skyhook an example
of an Arrow-native storage and processing system.
And we're still evolving toward this to use more of the Arrow APIs,
but right now we can actually store Arrow data
and process it natively inside Ceph objects.
Arrow, of course, is very popular for data exchange
and in big data world.
Very recently, it's been a stable release version,
which is quite nice.
It's the first version 1.0.
There's also now a compute API for Arrow tables.
So the compute API will allow you to do things
like select and project and other user-defined functions.
And more recently, there's a dataset API,
or very recently,
and this provides a table abstraction over a collection of remote files,
or you can think of this as file fragments or fragments of tables that are stored in different formats.
So this is something that we're working towards ourselves,
and it gives you an abstraction of a table at a high level
over many, many pieces, partitions of a table,
effectively.
These are called fragments in this terminology.
That's a bit about format.
Now we talk about partitioning briefly.
So imagine that the input table on the left-hand side, right?
Maybe we want to partition by column.
So vertical or horizontal partitioning is pretty common in database systems.
So in this case,
we have columns, the first column maybe blue, second light blue, and the third one gray.
And these become formatted partitions. So the blue column creates the blue object,
middle light blue, and so on for the gray object.
In this case, column-based partitioning, we usually use arrow, but that's certainly flexible.
Flat buffer is more suited toward a row-based access, but either are possible here.
That's the interesting thing about the flexibility by embedding these generic third-party libraries.
Or you might want to do horizontal partitioning, typically called row partitioning.
So again, the input table,
maybe the first three rows go together into one partition,
and this is formatted into arrow or flat buffers
for the object.
Same thing with the middle partition,
the middle two rows and the last two rows there
into the gray partition.
In this case, when we do row partitioning,
we actually use a jump consistent hash,
which is a nice hatching algorithm.
But other partitions are certainly possible.
You could take a brain, your month, or something like that.
But the idea is putting a partition,
formatting it with this given generic format, right,
and then storing that data directly into an object,
and that's the key feature here.
The key properties of a partition, and now that we've sort of partitioned the table and mapped the object, right, the important thing is that the format must retain the data semantics, right, and this is really just the data schema.
Right, so in Flatbuck we can define this itself. In Arrow, it's also defined internally
to the table. There's columns in schema
fields.
So that's important because the schema is needed
at the object level so that objects can do
that local processing based on that local
method that we've defined. We don't want them
to have to look up schema somewhere else or
shift the schema in all the time. These schemas
are self-defined and it's available locally
preferably.
Importantly, object names are generated, right?
So we don't want to deal with hundreds and thousands
and millions of objects and store big lists of names, right?
We'd like to be able to generate names
in some regularized naming pattern.
And this turns out to be important at scale.
And of course, a simple naming pattern would be,
you know, table T dot partition one, table T dot partition two, things like that, right? We can think course a simple naming pattern would be, you know, table t.partition1, table t.partition2, things like that.
We can think of very simple naming patterns that allow objects to retain prefix of name and even subdivide further given a prefix.
It's pretty flexible with name generation, or the feature of name generation gives a lot of flexibility.
And then based on name, objects are distributed by Seth based on the name.
So Seth placed them and then we'll put that object onto a server based on its name.
And this means that the object location is not needed to be maintained by Skyhawk at all.
In fact, so we don't have to maintain a giant list of names.
We can just retain a name generator function.
So this constrains greatly the amount of metadata that is needed to be stored
in order to generate names and look up objects because they're just name-based
retrieval.
So when you want to read it, you just give the name of the object and
Cephal will write it directly to the right place.
We don't need to know where it is physically.
So what does this look like in our architecture? Just to briefly cover it,
placement is done using the crush algorithm, hashing algorithm. Objects are mapped to placement
groups based on their object name, as I mentioned, and placement groups are handled in primary copy
replication, so primary replica, secondary, and third replica,
and so forth.
They provide isolation for fault tolerance,
so the idea of placement groups is to sort of isolate
isolations that create isolation zones.
And it's pseudo-random
as well for load balancing, right? So this
attempts to give a uniform distribution of
objects across all OSDs, right?
So this hopefully gives us a nice
distribution of similar number of objects
across servers.
In this example, not a very good example of uniform distribution,
but you can see a couple objects on the first OSD,
maybe one on the second and maybe three on the third.
So that's how they would be distributed and what we would like.
Data processing, right?
We consider methods and have implemented things like select, project,
aggregate, group by, and sorting.
So select is row-based, project is column-based, aggregation is maybe some computed value.
There's grouping, and you can think of a sorting also could be very useful at the object level.
And interestingly, right, as we move to the Arrow Compute API, once we're in progress of that,
we'll be able to support even more data processing and leverage all of the work done by that community the Arrow Compute API, once we're in progress of that, we'll be able to support even more data processing
and leverage all of the work done by that community
on the Compute API
so we don't have to have those functions defined.
And also notably,
the Compute API supports user-defined functions,
which is a very nice feature.
So given the processing methods, what metadata might we want to store in Rastabee?
In addition to things we've already said, like physical offsets, logical content,
think of it as creating an index on various columns.
You can also consider creating text indexing.
That can be very useful in some cases for log data storage.
Another important thing to consider is column statistics. So we recently implemented this, and you might want to implement statistics or even access patterns,
right? So if object or a particular range or column of an object is accessed frequently,
you may want to keep track of that in RockDB as metadata. And at some later time, query it and
decide, make decisions about what to do. We want to update it differently and so forth.
With statistics, right, the value distribution of columns is really important for query optimization.
So the idea would be typically you'd estimate the selectivity, right,
so if you're looking for rows with the word had, for instance, right,
there's only one or two of them, right,
it makes sense maybe to look it up in an index.
Otherwise, right, if there's going to be many of them,
you might as well just scan all of the rows, right?
So you might choose a scan plan over an index plan.
And in our case, this means, you know,
read from physical length on disk versus look up in the ROCKCB
and then just go get the particular row of interest.
And importantly, as we showed with the data semantics are embedded
and such, and this
object local metadata, this means that each object can sort of optimize itself.
So dynamically at runtime, it can make decisions about where to read and write data from or
what to do with looking at data and index and so forth.
This becomes critical.
And because this object is the entity, any functionality that is defined on the object,
it moves with the object as well, right?
So there's no problem when you scale up and down the number of nodes.
So then the question becomes when to push down or offload processing to the storage
or somewhere, in this case, directly to the objects, right?
So currently we just decide, yes or no.
However, it could be a runtime decision, right, by the query optimizer, right?
You certainly don't have to apply your special read method.
You could execute this traditional set read method based on cluster knowledge.
If you know there's a lot of busyness users in the system,
there's not a lot of extra capacity and storage to handle any processing methods,
you might not want to push down anything.
However, what if you do push it down and the storage server happens to be overloaded
and the query optimizer didn't know that?
That's more of a dynamic state, it might not be aware. So in this case, we want the
object to be able to reject processing, which we'll call pushback, right, so we're actually working on
this mechanism, and it's a really key consideration when offloading in our framework, right, because
if we want to force the object to do a read and then process some data as it's reading, right,
that object is busy that entire time.
So if we are unable to do it or don't have the capacity to do that, we can just do the standard
read and ship it back to the client and let the client apply the operation there. Or it's
some intermediate layer you can think of. Briefly, I'll talk about some physical design optimizations
just to give a bit of an idea.
This is a long-studied problem in databases,
and of course good physical design is really crucial for workload performance.
And typical features of a physical design are tuning process, right?
It includes partitioning, so let's say columns, rows, data formats,
right, depending on what your usage model is, data layouts,
indexing and materialized views.
These are typical components of physical design databases that we can apply these here.
So, for instance, a local data layout might look something like this.
So if we consider an arrow table format, right, an arrow table is a schema and a sequence of record patches.
Bringing back this object picture, right,
the object with subtable one and subtable two,
and then the ROXDB pointed to the offsets, for instance.
Well, given the arrow schema, we might do something like this.
And this looks exactly like that.
We might lay them out on disk, the schema and the record batches,
and the ROXDB could point to individual record batches, right?
And the idea here would be, you know,
we can read these one at a time off disk
or just read the ones that we want,
and we can conserve the amount of memory
and disk IO for any given read.
Another example, right, except those record batches, right,
we might want to lay out columns slightly differently.
For instance, maybe column one and two
are frequently accessed together,
and we keep those together,
but then we put column three, you know,
further down offset on disk.
And we just keep track of all that stuff in RSTP.
Okay, local versus distributed transformation.
So when we think of physical design, I kind of showed you a local example.
Go back real quick.
This is within an object, right?
Just rearranging the data inside the object.
Another way to think of this is to dynamically adapt between row or column layouts, right?
So we may change from row layout in an object to a column layout in an object.
And that would be to improve performance for a given workload.
We might be reformatting the data from, say, flat buffers to arrow format or physically reorganizing data, as we showed.
And a distributed transformation may redistribute or gather columns across objects.
So you may have row-based objects, and you'd prefer to have done, say, column-based partitioning or row-based partitioning in objects.
So each object has a set of rows, and you may rather have done columns so that each object has a column, for instance.
We like to do distributed transformation, which is effectively a gather from
many objects
into one object, or
one object to many objects. This allows us
to peel out rows or
columns from individual objects and move them to
others. We've done
this in a prototype
version in a paper there below, and the current version is in progress.
We call this RADOS Remote Reads.
Then we talked a little bit more in detail in our paper about physical design management and storage based on cost models. There's details that can be found there. But you can think of, as I said,
that RocksDB might keep track of access patterns
or interesting pieces of data,
interesting patterns of queries.
And then you may want to do cost modeling
to decide if you want to transform the row,
the column, or do a local transform,
or maybe distributed transform
and distribute those columns across multiple objects.
Just a quick picture on the left is LOFL.
So you take an object that's a row-based
transform to column-based.
On the right-hand side is a distributed transform.
You take that row-based object,
and now you ship its columns off to independent objects
or maybe group them together.
There's a lot of flexibility with this.
Okay, so let's talk about some experimental results briefly here.
So for setup, we typically use a TPC-H line item table. We are looking at 750 million rows. This
is their generator. We put those in 10,000 objects, so each object has an equivalent number of rows.
These objects are then spread across all the
set of OSDs by Ceph, not by us.
This is just stored in Ceph and distributed by
the question algorithm that we
mentioned. The data formats
we consider currently are flat buffer and flex buffer
for our row-based formats
and arrow for our column-based formats.
As I mentioned, we're moving
more toward arrow in the dataset API
exclusively.
If it's fine, queries here in this case, we're moving more toward Arrow and the dataset API exclusively. If it's fine.
Queries here, in this case, we look at select projects
and simple queries, and we rarely show activities
from 1 to 10 to 100%.
And the query would look something like that,
select star from line item,
where extended price is greater than some value.
And that would return, say, 1% of the data.
For hardware, we use NSF's CloudLab,
which is a bare-metal as-a-service infrastructure for academia.
We use, actually, hard to describe in this case.
And our Ceph deployment is installed with Skyhook DM extension.
That's those shared object libraries that have our methods.
And then we can use a simple client-side driver
so that we can focus on the storage processing.
And either we process it in the client or with the same code,
or we push it down to the storage using our extensions, right?
And we call that rados execute on our mass events
for each of the 10,000 options.
Scalability.
Let's look at this briefly.
On the x-axis is the number of storage servers.
So this is as we scale out the number of Ceph servers from 1 to 2 up to 16.
We can look at a slight 1% from that table that we mentioned, a query.
And the left-hand side there on the y-axis, there's the execution time.
So we can see that, you know, as it scales out, the execution time goes down.
And the black bar represents no processing, right?
So this is if we didn't push down anything, right?
And in the gray bar is if we did the push down,
calling our object storage methods.
So let's focus in on this briefly here and see what's going on here.
I think it's interesting.
So in this case, the query execution
time, previous case, I'm sorry, is very similar,
right? It's about the same.
Which means maybe there's some other bottleneck
in the system. But let's look at
did we achieve any of our goals,
which is offloading some CPU
from the client side onto the service.
So we look at
these graphs. On the left-hand side,
this column is the client sideside CPU usage over time,
like the first 60 seconds of a query.
And on the right-hand side is the server-side CPU usage for each of the eight servers,
and it's stacked there.
So at the first row, we call this a baseline.
So on the top row right, there's read cost only.
There's no query.
This is just the cost.
If you didn't push down anything, you didn't run any query,
you just did a regular read of those 10,000
objects. And you can see
the CPU percentage of the client is at about
5% there. And on the servers, it's
similar, about 5%
stack.
The next row down
is selecting 1%. So this
is no offload into storage.
We're just going to use the regular
read and then apply the selection filters at the client side.
And you can see that the client CPU actually goes up, right?
So the storage side looks about the same or nearly identical
because it's not doing any work.
It's just delivering the regular bytes,
doing a regular read and sending it back,
and the client's actually applying the selection query.
And then on the bottom row, right,
we can see what if we offload it to storage and use our methods.
So in the first column, you can see the client machine now.
The CPU has gone down very, very low, near zero, in fact.
And on the right-hand side there, the storage machines, if we look at the stack bars, right,
it went up a little bit, not a whole lot, but it did go up a little bit, right?
And so this shows we have effectively offloaded work
from the storage, from the client to the storage servers.
But interestingly, what's going on with the client machine
and why it goes down to zero, right?
Consider that the top graph for the client
is also not doing any query,
but it's doing a lot of work
because it has to receive all of those packets and data
from the storage servers, the eight storage servers.
That's the top row.
In the bottom row, the client isn't receiving many packets
because on the right-hand side, the storage servers
have already filtered the data out.
So only 1% of the data is coming back,
which means effectively 1% of the packets.
So there's not much work for the client to do
to just receive those few packets.
And you can see that it's not trivial, right? The packet building and receiving is not trivial
at all, right? So it's interesting to consider that if we're going to be building packets and
wasting CPU cycles, perhaps we should use them to do some data processing and filter out
data before we send it back, because we might want to do useful work rather than just building
packets that where half the data is going to be thrown away eventually anyways, or in that case, 99% of the data.
So that's just one example of CPU offloading. So what about network offloading?
So similarly here, the top row, we have column one
as a client and column two as one server, not stacked in this case, it's just one server to look at the data coming back. And you can see with no
offload to storage across the top, the client is
receiving a lot of data. There's a high amount of data coming back and the client is
receiving it. And on the server side on the right on the top, the server
is sending back a bit of data, this particular server.
We just look at one here instead of stacking them, but you can see that one server is sending back a bit of data, this particular server. We just look at one here instead of stacking them,
but you can see that one server is sending back a bit of data.
Now, in the bottom row, we can see exactly what happens
when we offload the storage, right?
The client now is not receiving much at all, very, very little.
And on the right-hand side, the server is not sending much data.
So that's back down to this 1% query,
so only 1% of the data is coming back.
So we've not only offloaded IO, data processing to servers,
reduced IO, but we reduced the work to send and receive the IO as well.
So we've kind of achieved the goals of offloading to the storage system
here in this example you can see.
Next, let's go back to this eight servers again
and examine this a little bit more.
Something interesting going on here that I want to talk about for selectivity.
So if we consider on the left-hand side, we have storage side processing, right, on the x-axis.
And on the right is the client-side processing.
And all of these costs are similar, right?
The execution time is on the y-axis.
You can see the cost is about 100 seconds for all these cases, right, more or less.
But there's something interesting going on right there at that bar.
It's a little bit higher.
So what's happening there is this is the extra work for 100% selectivity.
So the storage has to read and scan and apply the filter to every row.
Every row passes, it has to basically do all that work and build all the data and send it all back again, right?
So you're doing a lot more work by filtering all those rows that don't get filtered out.
They all match.
So this argues for the need for statistics.
Maybe we don't want to read them all.
Maybe we want to do an index.
Okay, next we'll look at briefly local versus distributed transform.
So we call local, distributed, and client-side transform.
And this is the execution time to do that transform.
You can see the local and distributed, these happen inside storage.
The local doesn't use any network.
Distributed does use network because objects send data back and forth to each other.
But it all stays within a storage layer.
And on the client-side transform, as if we would do this row to column by reading it into the client,
transforming it and running it back down.
You can see the transform cost is reduced by keeping it within storage. What is the benefit of that? So assuming we wanted to transform for a good reason
based on a cost model, right now let's look at a query that does select extended price. So we're
looking at one column from this table. And then on the y-axis, we have execution time.
So we start out with row format.
That's the black bar.
This is before the transform.
That's the cost of the query.
And then after the transform, the middle gray bar, this is reading that column.
And on the right-hand side, you can see the picture.
We have these rows organized back into columns.
And then what if we did something different, which we mentioned earlier,
if we took those rows and organized them into, say, one column or independent columns on disk, right?
So now the execution time,
if we just store the extended price column by itself,
we can peel that column right off of disk,
put it in memory, and return it.
So this shows quite a bit of benefit with transformations.
For ongoing work,
we have a lot of things happening right now with Skyhook.
We're adapting to the Apache Arrow dataset API, which I mentioned.
This will bring us even more toward end-to-end Arrow native processing.
We interact with the Arrow committee for feedback,
and we're trying to create a Libratos fragment.
It's a Libratos fragment class.
And eventually, we'd like to maybe push this
upstream into the dataset API.
So that's our goal for that.
It's a big community,
and it's exciting work for Skyhook.
We're also working on Rados reads
from remote objects.
So this is this collector distributed
sort of transform that we mentioned.
We now have deployment via Kubernetes and Rook
on CloudLab as well as other systems.
And we're expanding our work on data indexing for Arrow, right?
So we're moving more toward these Arrow native operations now.
And I'll do a real quick plug for CROSS.
This is the Center for Research in Open Source Software that this project is supported by.
It's an incubator stage, so this goal of
CROSS is to bridge the gap between student research and open source project. It's been
funded by an endowment from Sage Weil, who's the CEF founder, and other corporate memberships.
Right now we have Fujitsu, Seagate, and Keopsia. So if you're interested, please come talk to us in any of the work that we're doing.
Cross is leveraging this open source culture
in university research,
and also supports graduate research for graduate students.
And in this example, it incubates work beyond graduation,
right, toward viable commercial products, right?
There's a couple examples here in the Skyhook DM project
that I've been talking about.
Another one is the Popper project, right. This is a container native workflow execution engine that focuses on reproducibility in science.
Cross is directed by Carlos Maltzan. And you can look at the Cross website for more information or contact us directly.
We really appreciate your interest. Okay, move on. And finally, my acknowledgments are for the Cross Center NSF
grants, the Iris Hepp Software Institute. This is the Institute for Research and Innovation in
Software for High Energy Physics. They also support our work. Some of that work is done
through CERN. Our Cross corporate memberships, as I mentioned, Fujitsu, Seagate, and Kyoxia,
and everyone who's contributed to the Skyhook project,
especially Noah, Michael, Ivo, and Ken.
We really appreciate all the hard work.
We've had a lot of internal and external students,
as well as Google Summer of Code fellows for two years now,
Iris Hepp fellows as well,
and master's projects and thesis based on work done in Skyhook.
So thank you very much.
You can visit the skyhookdm.com website for more information,
and my email is there below.
Thanks for listening.
If you have questions about the material presented in this podcast,
be sure and join our developers mailing list by sending an email to
developers-subscribe at sneha.org.
Here you can ask questions and discuss this topic further with your peers in the storage developer community.
For additional information about the Storage Developer Conference, visit www.storagedeveloper.org.