Disseminate: The Computer Science Research Podcast - George Theodorakis | Scabbard: Single-Node Fault-Tolerant Stream Processing | #12
Episode Date: November 21, 2022Summary (VLDB abstract):Single-node multi-core stream processing engines (SPEs) can process hundreds of millions of tuples per second. Yet making them fault-tolerant with exactly-once semantics while ...retaining this performance is an open challenge: due to the limited I/O bandwidth of a single-node, it becomes infeasible to persist all stream data and operator state during execution. Instead, single-node SPEs rely on upstream distributed systems, such as Apache Kafka, to recover stream data after failure, necessitating complex clusterbased deployments. This lack of built-in fault-tolerance features has hindered the adoption of single-node SPEs. We describe Scabbard, the frst single-node SPE that supports exactly-once fault-tolerance semantics despite limited local I/O bandwidth. Scabbard achieves this by integrating persistence operations with the query workload. Within the operator graph, Scabbard determines when to persist streams based on the selectivity of operators: by persisting streams after operators that discard data, it can substantially reduce the required I/O bandwidth. As part of the operator graph, Scabbard supports parallel persistence operations and uses markers to decide when to discard persisted data. The persisted data volume is further reduced using workload-specifc compression: Scabbard monitors stream statistics and dynamically generates computationally efcient compression operators. Our experiments show that Scabbard can execute stream queries that process over 200 million tuples per second while recovering from failures with sub-second latencies.Questions:Can start off by explaining what stream processing is and its common use cases? How did you end up researching in this area? What is Scabbard? Can you explain the differences between single-node and distributed SPEs? What are the advantages of single-node SPEs? What are the pitfalls that have limited single-node SPEs adoption?What were your design goals when developing Scabbard?What is the key idea underpinning Scabbard?In the paper you state there are 3 main contributions in Scabbard can you talk us through each one;How did you implement Scabbard? Give an overview of architecture?What was your approach to evaluating Scabbard? What were the questions you were trying to answer?What did you compare Scabbard against? What was the experimental set up?What were the key results?Are there any situations when Scabbard’s performance is sub-optimal? What are the limitations? Is Scabbard publicly available? As a software developer how do I interact with Scabbard? What are the most interesting and perhaps unexpected lessons that you have learned while working on Scabbard?Progress in research is non-linear, from the conception of the idea for Scabbard to the publication, were there things you tried that failed? What do you have planned for future research with Scabbard?Can you tell the listeners about your other research? How do you approach idea generation and selecting projects? What do you think is the biggest challenge in your research area now? What’s the one key thing you want listeners to take away from your research?Links:PaperGitHubGeorge's homepage Hosted on Acast. See acast.com/privacy for more information.
Transcript
Discussion (0)
Hello and welcome to Disseminate, the computer science research podcast. I'm your host, Jack Wardby.
I'm delighted to say I'm joined today by George Theodorakis, who'll be talking about his VLDB22 paper, Scabbard, Single Node Fault Tolerance Stream Processing.
George was a PhD student in the large-scale distributed systems group at Imperial College London, and he recently submitted his thesis.
And he also recently joined Neo4j in their research lab. Distributed Systems Group at Imperial College London, and he recently submitted his thesis.
And he also recently joined Neo4j in their research lab.
George's research interests lie in the areas of distributed and parallel computing,
database management systems, and stream processing engines.
George, thanks for joining us on the show.
Yeah, thanks for having me.
It's interesting to discuss about our latest work on stream processing.
Brilliant, let's dive straight in then. Can you start off by explaining to the listeners what stream processing is and what its common use cases are?
All right, cool. So in the last years, we've seen an explosive growth of data volumes and
velocity, and at the same time, we've seen the emergence of a new type of applications that are long-running.
These applications run for hours, for months, for days, for even years, if you can make
them work that way, and execute continuously.
So they execute as new data arrives.
And this data can be possibly in the form of infinite streams, right?
So you have to deal with the problem of data that are infinite.
You don't know their size in advance.
So pretty much this is stream processing.
It's the processing paradigm that allows you to process real-time data
that are infinite.
According to estimates, we expect like 30% of the data in the future
to be produced and analyzed in real time.
And this makes stream processing a paradigm that is established
between transactional processing, online analytics, and reporting.
Some common applications for stream processing are high-frequency trading,
card-for-detection, sensor monitoring.
And the common characteristic of these applications is that apart from having to deal with very high throughput of data,
you also have to report in sub-second latency.
You have this requirement where you have to respond very fast
and whenever you delay your responses you get you have adverse effects so
for example in the case of health monitoring you get you have to respond within seconds right in
order to matter or in in the case of a high frequency trading you you have to the significance
of your results decays with over time so you have to respond as fast as possible to make more profit. Awesome.
So how did you end up researching in this area?
So yeah, like six years ago, I was trying to find a topic for my undergrad thesis.
And it happened that I worked with some people from Imperial College London at the time. So they just had published their stream processing engine
and we wanted to collaborate with them.
So I worked with a system they had back at the time
trying to do stream query optimization
at the logical and physical level.
And they use Apache CloudSight,
which is an extensive data management framework.
So, yeah, we worked on how we parse streaming queries, how we optimize them.
And at that time, there was also this hype with Apache Flame, Spark, and Kafka.
So I decided it would be cool to work in the field.
Also, one of the reasons that I chose the field was that back at the time it wasn't that mature. So there was a lot of space to experiment and try things
that people haven't tried.
Cool. So let's dive into the paper then. Can you tell us what is scabbard? And am I
pronouncing it right?
Yeah. So scabbard is like a holster for your sword, right? Something to protect your weapons.
So, yeah, we have this thing that we name our projects after weapons.
Okay.
Yeah, the missile project was Saber, then its extension was Lightsaber,
and then Scabbard is the one that protects the Lightsaber.
Yeah, anyway, so we had this single node engine,
this single node system that ran efficiently on multicore systems,
and we wanted to make it fault tolerant.
So this means, by default, okay, let's step back.
For the context, fault tolerance means that we
have to be able to handle failures because we are running our software in
either local clusters or on the cloud there are cases where things can fail
either because of bugs or because the node crashed right so you have to be
able to handle failures and hide their effects completely.
And we try to design a system that can achieve this in a single node, which is kind of counterintuitive, given how people are doing this.
You mentioned earlier on systems like Apache Flink and whatnot.
So they're distributed stream processing engines and this is a single node stream processing engine.
Can you explain what the differences are between these two systems? Yeah, a system like Apache Spark or
Flink adopt a cert-nothing execution model and their whole design allows you to parallelize
your computations to many nodes. You get, not for free, but easier features like fault tolerance.
They have very nice APIs.
They are mature.
You can do many stuff with them.
But because of their design, they face many challenges.
The first challenge they face is that because we are now doing computations over the network,
we have to pay the price, right?
So we have cross-process and network communication overheads.
We have to serialize our data.
We have to design our data structures and our algorithms
with a different way.
We don't have shared memory.
So we increase our resource and maintenance footprint, definitely.
So this is one challenge.
The second challenge is that because we are over the network,
we cannot really have predictable performance guarantees.
So earlier I mentioned that mainstream processing applications
require very low latency,
and you cannot guarantee this
when you do processing over the network,
where you try to scale very high-throughput streams.
And one third issue is that many parallel computations,
many operators, are very hard to parallelize efficiently
in a distributed environment.
For example, aggregations with Windows.
So Windows is a form of a structure
that allows you to process this infinite data. So in the case
for example of aggregations over Windows, when you
do them in parallel in a distributed environment, you have to either perform
redundant computations or you have to approximate your window results because
you don't have a global clock in a distributed environment.
I think the most important bit is that you have to pay the tax of distributed execution.
Yeah, it reminds me of the cost paper, right?
Yeah, it inspired us a lot for our project.
So yeah, and at the same time, you see that there's this rise of shared multi-core CPUs,
where you have many CPUs, many sockets,
you have terabytes of memory, you have accelerators like GPUs, FPGAs,
so you can do a lot of stuff with a single node, right?
You can optimize for latency, you can optimize for throughput,
you can redesign your algorithms, your data structures, and also now with the high-speed networking that's been enabled by RDMA, remote direct memory access, you can ingest a lot of data, and usually from most of the use cases we've seen
where you have to handle hundreds of gigabytes of data,
you can fit them in a single node.
So it's possible to have this as an alternative.
Okay, cool.
So you kind of summarized in your last answer there
what the advantages of the single-node stream processing engines are.
But what are the significant downsides, what are the pitfalls
that have maybe limited the adoption of single-node stream processing engines in practice?
What I forgot to mention before is that with single-node implementations
that we've seen mostly from academia,
you get usually at least an order of magnitude better performance
in terms of throughput and latency.
So this is the benefit that you get
when you don't have the tax of distributed execution.
So, I mean, these are all nice,
but nobody really uses these systems in practice.
And that's mainly, we think that happens mainly
because they lack fault tolerance mechanisms.
As I said earlier, when you run continuously and long-running applications,
there's a higher chance that your node is going to fail
and you have to restart and produce correct results.
For many applications, it really matters.
I think that one of the biggest drawbacks is that
these systems don't have built-in fault tolerance mechanisms.
And while the strawman solution would be to just take the approaches from distributed execution
and plug them into a single node, this would end up blowing up again your cluster size, right?
Because you will need many nodes to perform
persistence, many nodes to...
I mean, the way they're implemented right now,
many nodes to perform persistence or
recovery or the
bits that are part of the
fault tolerance mechanisms.
So we...
In the paper, we had this interesting
experiment where we're using a single node
system, and we... So this system could ingest In the paper, we had this interesting experiment where we're using a single node system.
And we, so this system could ingest like tens of gigabytes of data per second, right?
And in order to make it fault tolerant, the simplest solution would be of at least 20 Kafka nodes to persist the data streams that we needed.
So, yeah, it kind of comes back to the efficiency argument, right?
And if you have a single node and then you need 20 Kafka nodes, you end up in a distributed environment again.
Back to square one.
Yeah.
So yeah.
Cool.
So kind of, I guess, yeah, building off that, what were the design goals that you had in
mind when you were developing Scabber?
So we already had a system that had a very high throughput and low latency.
Maybe you could maybe mention the previous, you mentioned earlier on the journey from sort of,
what was the first one?
The first one was called Saber.
Then Lightzaber, then Scammered, right?
That's the flow.
So yeah, maybe we can talk about those.
I didn't work with Saber.
I mean, I didn't work on the paper.
So Saber was like the first step
that was trying to utilize a single node with GPUs.
Okay.
And do this, it was designed to utilize a single node with GPUs and do this. It was
designed to like schedule tasks on both CPUs and GPUs and run them in parallel.
And LightSaber was kind of the next step, an optimized version of Saber trying to
use NUMA-aware execution, SIMD instructions, code generation. So we had
like this system, this efficient multi-core system.
There are many similar to that, right?
It wasn't just this one.
And we just wanted to...
So we already had high throughput and low latency.
We just want to ensure that we can still have
these performance guarantees during the execution
that doesn't have failures, which is the common case, right? So execution that doesn't have failures, which is like the common case, right?
So whenever we didn't have failures,
we just want to ensure that we have these requirements.
And the second design goal had to do with the limitations
that we have to deal with in a single node.
So when you're in a single node, you have limited resources
in terms of network and disk bandwidth.
So you have to be careful of how you use these resources and which data you choose to store and which data you choose to seep over the network.
So, yeah, our second design goal was to deal with these limitations.
And the third goal was to guarantee fast recovery, because
we are in a single-node environment.
Of course, we don't have high availability,
but we have to,
because we have a single point of failure, we have to
provide sub-second recovery
so that we decrease the
downtime of the system
when it's running. So yeah,
I have performance guarantees,
deal with
limited resources of a single node,
and guarantee fast recovery.
That was the key goals.
Cool.
So I guess, how did you go about addressing these design goals?
What's the key idea underpinning Scabbard?
And yeah, let's start with that,
and then we can dive into it.
I know in the paper you say there's three main contributions,
and we can talk about those in a second.
But yeah, what's the kind of key idea that you had to address these design goals?
So when we're trying to implement the system...
Okay, a step back, which is something I haven't mentioned properly.
When you're in the streaming world, in order to achieve fault tolerance,
you have to persist at least all your input streams so that you can replay them and cut the correct recovery state.
And as an optimization, you can always checkpoint your operator state so that you can cut down from your input streams. Now in our context in a single
node execution, we're ingesting streams in the order of gigabytes per
second and also your state would reach the same magnitude, right? So you would
have to persist per second tens of gigabytes, right? And this would have to
happen with SSD disks, simple disks that have a bandwidth lower than gigabyte probably.
So like commodity disks that you can find.
And you already see that there is a bottleneck there.
You cannot store all this data with a single SSD.
And while you don't have this problem with distributed execution because everything is partitioned, everything works fine in a single node,
that was our key issue, right?
So the idea that we tried to...
The main idea behind Scabart is that you have to reduce what you store to disk
in order to accelerate fault tolerance.
And you have to do it in a matter that adapts to different workloads.
So it has to be adaptive to different workloads
so that you can get the best performance possible.
Cool. So let's go through the main aspects to achieving this.
So the first thing you mentioned in the paper is this idea of a persistent operator graph model.
Can you tell us more about that?
Yeah, for our first contribution, we wanted to come up with a model that allows us to have the right abstractions to perform persistence and fault tolerance in a single node. So we wanted
to integrate the persistence with the query execution so that we perform a series of optimizations.
What this really means is that we have encoded persistence as an operation in our data flow graph, execution data flow, and therefore we can reorder persistence as you would reorder,
for example, a filter or a projection, and we can apply it after we have discarded data
with highly selective operators
so we can for example
when we try to persist our input
stream we can persist it after we have
applied filters, projections
or we have applied compression for example
that I will discuss next
and we can
also identify opportunities where we can even
inject operators like projections so that we can reduce identify opportunities where we can even inject operators like projections
so that we can reduce what we store to disk.
Yeah, we have this set of optimizations and then we have the abstractions
that allow us to efficiently persist both the data streams and the operator state
that I mentioned earlier that we want to persist.
And we also have some abstractions that allow us to coordinate all
these operations. And based on all these abstractions, like the coordination abstractions
and the persistent abstractions, we built a protocol that allows us to provide exactly
what's output. Now, what this means is that we can, in the event of failure, we can hide completely its effects
and provide the exact same results as if nothing happens.
And yeah, I mean, this model tries to capture
only the essential information that you need to persist this
so that you can recover with exactly one's output.
Okay, cool. So once we have this model and the next component of
Scabbard is this query-specific adaptive compression. How does this work?
So I mean, after doing the optimization from before where you
discard data and you pick what you're going to store from your streams and
your operator state, we found out that there was still... disk was still the bottleneck, right? So the most natural thing to go after then was
compression. So we decided to compress whatever was left to be stored for disk, what was essential
for us to store to disk, and we decided to go for adaptive compression. Now because as I mentioned
earlier we have to deal with long-running applications we also have
to deal with changing data characteristics so it was essential for
us to capture these changes by collecting lightweight statistics at
runtime and choosing the most suitable compression algorithm based
on these statistics.
Because our initial system was using code generation for all the other operations, we
decided to use also code generation so that we can reduce the overheads when you inject
new compression schemes into the operator graph.
And yeah, this is how the adaptive compression bit comes from.
Cool. So what sort of statistics do you gather?
I mean, it's very simple. It's about the data distribution. So we, for example, we gather
minimum, maximum values at runtime, or we gather, for example, we try to use run length
encoding if it makes sense so we gather
how many
consecutive values
are equal
statistics like this
so yeah
it's
very basic statistics
but still
they help us
to identify
which algorithm
to choose
and sorry
one last
bit that we collect
is
that we measure
is the bit precision
for integers
and this allows us to to pick the right algorithm
to compress integers even better.
Cool. I was going to ask,
how does these statistics then map
to the choice of algorithm you use
and what algorithms are supported?
And what is better for what?
I mean, we don't have the proper optimizer there.
It's just like we have probably two or three different algorithms
for different data types, for example, for integers, for floats, for...
It's not actually a data type,
but for timestamps, we consider it as a data type.
Or for strings.
But, yeah, the logic of how you choose
different algorithms is very simple. So basically we did some analysis and we figured out which
algorithm to choose depending on your bit precision, your, as I said before, how many
sequences of equal values you have and so on
so we just have agreed the approach
of picking the right algorithm
I guess it's maybe an area for future research
yeah
and I guess we've got
the persistent operator graph model
the query specific adaptive compression
and the last thing that is a big contribution
in the paper is the efficient failure
recovery mechanism.
So can we dig into that a little bit?
I mean...
And how it fits in with everything else, I guess, right?
Yes.
So the other two are kind of more conceptual contributions, so high level.
This is kind of more an implementation one, so it's kind of the third contribution that
you have to put in your paper just to submit it, I guess, in a way.
So, yeah, I mean, as I said earlier, you have to recover fast, right?
So we were trying to optimize this path of recovery.
And so when we spin the system, you have to generate code for different operations, for your compression, for your operators and so on.
And all these things take time.
So we figure out that instead of going through all the optimization phases all over again, trying to generate the code,
we would just store the binaries and then load them.
So this saved us a lot of time.
And we also optimized checkpoints so that we can perform them frequently
without disrupting execution.
And by performing frequent checkpoints, you kind of optimize for recovery.
So you can recover without having to replay a lot of data.
And finally, we have a garbage collection mechanism
that deletes data that you don't want
to load when you recover. So we try to minimize the amount of data overall that you
load from disk and how much time it takes to spin up the system.
Cool. So I guess we mentioned implementation there. So let's talk about the implementation
a little bit more. So there's this path between Saber, Lightsaber, and Scabbard.
How did you go about implementing Scabbard?
Is it all in the same code base?
How does it all fit together?
How did you go about implementing it?
Okay, so we implemented Scabbard atop Lightsaber.
So Lightsaber was written in C++,
and we used its query execution engine, its compiler,
and we extended them.
For example, we extended the compiler of Lightsaber to generate additional types of tasks.
So we generate tasks for persistence, for compression, for checkpointing,
and we try to integrate everything as a task in our system.
We also implemented some features regarding networking, like how we do data ingestion
or how we send data using RDMA just to make it faster.
So anyway, everything in the system is a task, like from networking to persistence to compression
to how you execute the queries.
And all these tasks are executed asynchronously. And this helps our system design because you can hide all the
latencies from disk and network. And you can better
utilize the underlying hardware. So everything
is a task. Everything can be executed by all the cores that we have. Every
core in the system is a worker. And we also optimize
for locality. So all the tasks, the workers
that are closer to the data will try to execute the tasks. And only if they
don't have work, they will fetch something from another
circuit. So yeah, this was the way to
integrate all our logic with Lightsaber
just like to make everything a task and
have all the workers trying to, having all the workers pushing the state of the system forward.
Okay cool, so can I use lightsaber without scabber, is it pluggable or did it basically
have they now forked and there are two independent systems,
or can I turn it off and turn it on again,
or has it been a complete, obviously, refactor of the original system,
so it's something different now?
So the only thing that you have to do is that you have to set some flags
in your operators to make them fault-tolerant,
or to your data streams.
And then,
I mean, yeah, the API
is not clean at all, but you have to set,
let's say, some flags and
you choose either to use
lightsaber or scabbard.
So did
scabbard pre...
Sorry, yeah, lightsaber pre-scabbard
as
adding that code in
if I turn off the flags
is there any overhead there
by since you've added
scabbard in or am I basically getting
lightsaber as a baseline
and then I'm just kind of losing a bit of
performance by adding scabbard on
so
yeah definitely I haven't
engineered the code that well but
no don't say that it's brilliantly
engineered i'm sure it uh so yeah it just if you don't set the flaps just you just like follow
the lightsaber let's say code path and that's it you don't have any okay so you get it like as it
was okay that's fine cool so what was your approach to evaluating Scattered and what were the questions you were trying to answer?
Cool. Okay. So the first thing that we wanted to answer was whether, I mean, after doing all these optimizations,
all these new abstractions and all these things in a single node,
whether its performance was comparable or better to existing state-of-the-art solutions
like Apache Flink, for example, that is fault tolerant, right?
And we wanted also to see what's the overhead that we added.
I mean, you mentioned earlier the overhead.
We want to see what's the overhead compared to a system that's efficient single node system
and doesn't have fault tolerance.
So this is the first part, like our end-to-end experiment
in terms of fault tolerance
to see its overhead
and how it compares to state-of-the-art approaches.
And the second question that we want to answer
was whether our approach of persisting data,
like data streams,
was comparable or better to an approach
like the one used by Apache Kafka.
So, yeah, Apache Kafka is like this distributed messaging queue
that people use to persist their data streams.
So I want to compare with that.
What was the experimental setup?
And you mentioned there what you compared Scabr against.
But, yeah, what are the results?
So by setup, do you mean like the workloads and where do you run experiments?
Yeah, what are the experiments you run, basically?
I can reframe it as that if you want.
What are the experiments you run?
Okay, so in stream processing, you don't have a benchmark to use.
It's not a TPCC for a benchmark.
Yeah, TPCA or TPCC or whatever.
So people just make up their own benchmarks
or they use some that are not really good.
So yeah, we just went with 10 different queries
that capture different operations
like filters, joins, aggregations, windows, and so on.
And in the paper, we have results from both our local cluster where we run experiments with RDMA
and also from AWS where we run experiments using EBS for remote storage
because we also want to verify that it works with remote storage in case that you have a failure
and you have to start up in a different node.
So yeah, we just hijacked AWS hardware there.
So yeah, I mean, for the first question, for the end-to-end experiments, we used Apache
Flink, which is state-of-the-art, but it's a distributed Java-based system.
And we also used Lightsaber just to see the overhead compared to a system that doesn't have hot dollars at all.
Now, compared to Apache Flink,
we got at least an order of magnitude better results
in terms of throughput and latency.
And compared to Lightsaber,
we saw, on average,
a 30% performance degradation in terms of throughput
and close to
eight times, I think,
if I remember correctly, worse latency
because we're adding in the path
the disk. So the throughput
was good, but yeah, you have to wait a little bit
more for acknowledgements
from the disk in order to continue with the
processing in many cases.
Did you run any experiments
to kind of measure how fast things recover?
And did you compare,
did you do like
a recovery experiment
to see how quickly
things came back online
and failures occurred?
I mean,
I didn't try.
I don't know what that
would look like even,
but did you explore that?
Yes,
I didn't try.
I mean,
we tried fling,
but the results
were really bad,
so we didn't include,
yeah,
we didn't include
the graph there.
We kind of cheated in our recovery experiment.
So, I mean, in order to have a proper recovery solution,
you should be able to have a module that can discover that you failed,
which I guess is the most expensive part, right?
Because you are never sure whether you failed or not.
And I guess you can do it with timeouts or? Because you are never sure whether you failed or not.
And I guess you can do it with timeouts or something like this.
I'm not sure.
There's some solution to that, right?
Yeah.
Anyway, so we skip that part and we just measure how fast we can recover from the point that we fail,
assuming that we instantly know that we failed and have to recover, right? And in that case, we had sub-second recovery latencies,
which was, yeah, really good.
As I said, we optimized for it because we just reloaded everything from this,
like the binaries and all the code for compression
and the compression was there, so you knew what to do.
So yeah, that was an experiment for recovery.
I mean, it wasn't an end-to-end experiment.
It was kind of a micro-benchwork, but yes.
It gives you a flavor of how it would perform in practice, right?
Yeah.
Which is good.
So that's kind of obviously all the headline results of your experiments.
But are there any situations in which Scabbard's performance was suboptimal?
The question I'm kind of getting at here is, what are the general limitations of Scabbard's performance was suboptimal. The question I'm kind of getting at here is what are the general limitations of Scabbard?
It's an academic project, so it has many limitations, I guess.
So first, in our Exactly Once protocol, we assume that the sync, so the sync is the receiver
of the results, the sync participates in the protocol. So the sink has to, we are sending to the sink some metadata
that the receiver, that would allow it to
perform the duplication. So when you recover and you start sending again
your results, you may send some duplicate results. And in our
protocol, the receiver is responsible for dropping these results
with civic assumption, because it may not be able to protocol the receiver is responsible for dropping these results with simply
consumption because it may not be able to handle this logic. So if it couldn't
handle this logic then we would have to end up with something like
distributed transactions and then we go back to distribute execution again and
yeah it's the same issues. But maybe you can...
So maybe a way you could do it is that you could offer it as a service
and you would have probably many scabber nodes for many users
and handle distributed transactions in such an environment
where you wouldn't have the overhead of a single node having to deal with transactions.
The second limitation is that we cannot deal with non-deterministic operators.
So imagine that you have an operator, for example, that does access to some external
database.
Yeah, we cannot definitely handle this.
Because in such case, you will have to track down some additional metadata
and replay
and when you do a replay, when you
restart, you have to be sure
that everything
happens again as
it happened earlier. So you have
to track your history and track more
metadata there.
So yeah, that would be an extension
and we also cannot handle out-of-order data.
So what this means... Okay, so we assume that everything comes in order. So everything is...
You would never find a tuple that has a late timestamp. Okay? So your data is ordered by timestamp and you assume that everything that
comes is in order by the timestamp that you have. So if something comes out of order,
it would break our execution engine. So Lightsaber was designed to be in order. And because we
built on top of Lightsaber, we just didn't try to extend it towards that area.
So to extend our system for out of order data you would have to introduce some additional markers,
some special tuples. And these tuples would tell you how far you've reached in your data in terms
of the timestamp. So it will tell you for example that you've seen all the data up to minute 10 or something.
So you know that it's safe for you
to do all the operations up to that point.
So yeah, these are, I think,
the three most important limitations that we have.
Okay, cool.
So is Scalab publicly available?
Can I go and get my hands on it
and play around with it if I so please?
Yeah, I mean, you can find the link for the GitHub repo in the paper.
Cool.
And we'll link it in the show notes as well,
so the listener can go and find it.
Even better, yeah, you can find it.
Cool.
So I guess building on that theme,
if I was to go and play with it and have a mess around with it,
I know you said it's only an academic project, so it won't be as usable as a commercial system, say.
But what's my interactions with Scab and Look like?
How do I go about writing applications on top of it?
As you said, it's an academic project, right? So,
I'm the only one who spends time writing code, and I'm probably
one of the few people
that have used it.
Not after this podcast, everyone
listens to it.
You go crazy.
So, I have to fix many things.
So, yeah, I mean, it has
an imperative API.
So, you kind like define your operators.
You can define filters, projections, joins, aggregations,
and you kind of chain them together
so that you have pipeline parallelism.
It's kind of similar of how you would create,
for example, in Spark or Flink or Storm, your pipelines.
But it's, yeah, much more rough.
Okay, cool.
But it bears resemblance to that, obviously.
So, yeah, we also have some, I mean, all the benchmarks.
We've implemented all the benchmarks and have the code in the repo.
So you can find and use them as, let's say, your base to build something that looks like these applications.
So I guess my next question is, what was the most interesting, and I ask this to everyone,
what's the most interesting and perhaps maybe unexpected lesson that you learned while working on Scabbard?
I mean, when we started the project, I was very pessimistic because I wouldn't expect to get good performance while persisting so many data on disk.
So in a way, having this, I mean, it's not minor, but this small performance degradation while you store such an amount of data on disk was kind of unexpected.
So, yeah, that was like a really nice result of how you can multiplex data storage with good execution without affecting performance.
Cool, cool.
So, yeah, I guess you're very pessimistic at the start of the project.
And progress in research is very nonlinear.
There's a lot of ups and downs so kind of from the conception of the initial idea for scabbed to the publication i guess how long
did this did that take and what were the things along that journey that you tried that failed
that maybe other people in this area could benefit from knowing uh okay so i didn't have any experience
on in fault tolerance before so i had to learn everything the hard way.
And I was very naive at the beginning on how you can perform,
for example, checkpoints.
So in our first submission that got rejected, of course,
I was trying to have a more general model
where you can checkpoint different data streams
and different operators without synchronization,
which at the time I thought was very cool.
But it was very hard to reason about because when you were recovering,
you would end up in scenarios where your state was not consistent.
So you would have to replay the whole thing from scratch.
So that was an issue that I found.
And also I had many issues
with the implementation
so I thought that I was very smart
and I could do very fast
like lock-free programming
and use like very smart data structures
for doing checkpoints
while doing processing at the same time
insert memory data structures
and yeah I failed miserably.
So yeah, my code was full of concurrency bugs
and I had to go back to using logs
and doing the simple things.
And in the end of the day,
this was just premature optimization.
I tend to do this,
so I try to optimize for things that are not important.
And yeah, I shouldn't do this.
Okay, cool. I know you've since submitted your pieces and yeah I shouldn't do this okay cool
I know you've
since submitted your pieces
and have left academia
but what's the future
the plan for future research with Scammer
I guess is your group going to plan on
taking the project forward
or what's the future goals for it
so I mean we
we didn't go far with these ideas,
but we wanted to test two different things.
So the first was multi-tenancy in stream processing.
So because you have these long-running applications,
you may want to have multiple users in a single server,
and we want to see how we can manage all these limited resources
that you have when you have multiple users.
So now it becomes much more challenging to deal with the CPUs, the disk, the network bandwidth.
So yeah, that was one of the directions.
And the second one was, okay, so now we have a system that can persist data streams and state fast.
But there's no way that you can do queries
on both historical and streaming data.
So you cannot really do rank queries.
You cannot combine these two together, right?
So we would like to have an approach
for out-of-core processing
where you can spill things to disk,
you can have indexing,
but designed in a different way
compared to classical relational systems, right?
So that you could still have
these low latency requirements fulfilled in a way.
Yeah, so obviously over the course of your PhD,
you worked on numerous
topics. Can you maybe tell the
listeners about your other research and things
you worked on during your time?
Yeah, so
I mean we mainly focused
on single node and when we
before I
tried to go distribute my time
I did.
So yeah, we started from optimizing specific operators
with simple instructions,
how we can run very efficiently from a single core.
Then we moved to multi-core execution with multiple sockets.
And we tried to bring ideas from relational database,
like code generation to stream processing, which
was really interesting. And finally, yeah, we tried to make a single node system at the
same time fault tolerant with the assumption from Alien. So yeah, we're trying to fully
utilize all the resources you can find in a single node, trying to make the most out
of it. Cool cool and i don't
know if that's any of these questions so apologies if it catches you off guard not but i was walking
down the street the other day i thought it's an interesting question how did you go about during
your phd and in general i guess even in your day job today it's very research orientated how do
you approach idea generation and then how do you then choose what ideas to work on and select
projects that
are worthwhile pursuing what's your process for that i guess is what i'm asking yeah i think i
don't have a very good process so i i read a lot of papers and i so as i said before stream
processing is not that mature so usually i try to kind of not still borrow ideas, let's say, from relational database
or other processing paradigms.
And I try to adapt these ideas from the papers that I read to a real-time analytics world,
like a streaming world.
And I was also... So, mean, I had like a lot of
ideas, most of them not very smart
but I was also
kind of
lucky because I had two supervisors
and they were
coming from two different worlds
so when I was pitching ideas to them they also
helped me steer
and pick what I was going to do.
So yeah, as I mentioned earlier, I don't think we did anything.
I mean, there was novelty, but I don't think that we did something that was unexpected.
So yes, as I said, we tried to implement code generation in stream processing.
So I mean, it was interesting, but yeah, this was, for example, a project that came
from relational databases.
What do you think is the biggest challenge in stream processing today?
Yeah, I think that the project that I mentioned earlier about unifying streaming with historical data
with a way that you can provide low latency results.
I think this is one of the biggest challenges because you have to deal with novel data structures for indexing,
novel ways of, even protocols of how you buffer your data
because you don't have like the, let's say,
the relational database, a pattern how you you do the access
right so you have to think of what data it could be your code data your code data and how you evict
data for example from your buffer manager so yeah i think that this unification is very a very
interesting problem cool so last question now what's the one key thing you want the listeners to take away from this episode and your research?
I found this quote in Frank McSery's slides. It goes like this.
So it says you can only have a second computer once you've shown you know how to use your first one. so yeah I think that this is what we're trying to to show with
our projects that you have to
somehow think out of the box and try to
redesign systems
so that you achieve resource efficiency
and you exploit the hardware
trends that you
that you can find
right now so yeah that's it
yes brilliant and that's a wrap let's end
it there thanks so much george
for coming on the podcast if the listener is interested to know more about george's work then
i'll put all the links to all the relevant materials in the show notes
and we will see you next time for some more awesome computer science research Thank you.