Disseminate: The Computer Science Research Podcast - George Theodorakis | Scabbard: Single-Node Fault-Tolerant Stream Processing | #12

Episode Date: November 21, 2022

Summary (VLDB abstract):Single-node multi-core stream processing engines (SPEs) can process hundreds of millions of tuples per second. Yet making them fault-tolerant with exactly-once semantics while ...retaining this performance is an open challenge: due to the limited I/O bandwidth of a single-node, it becomes infeasible to persist all stream data and operator state during execution. Instead, single-node SPEs rely on upstream distributed systems, such as Apache Kafka, to recover stream data after failure, necessitating complex clusterbased deployments. This lack of built-in fault-tolerance features has hindered the adoption of single-node SPEs. We describe Scabbard, the frst single-node SPE that supports exactly-once fault-tolerance semantics despite limited local I/O bandwidth. Scabbard achieves this by integrating persistence operations with the query workload. Within the operator graph, Scabbard determines when to persist streams based on the selectivity of operators: by persisting streams after operators that discard data, it can substantially reduce the required I/O bandwidth. As part of the operator graph, Scabbard supports parallel persistence operations and uses markers to decide when to discard persisted data. The persisted data volume is further reduced using workload-specifc compression: Scabbard monitors stream statistics and dynamically generates computationally efcient compression operators. Our experiments show that Scabbard can execute stream queries that process over 200 million tuples per second while recovering from failures with sub-second latencies.Questions:Can start off by explaining what stream processing is and its common use cases?  How did you end up researching in this area? What is Scabbard? Can you explain the differences between single-node and distributed SPEs? What are the advantages of single-node SPEs? What are the pitfalls that have limited single-node SPEs adoption?What were your design goals when developing Scabbard?What is the key idea underpinning Scabbard?In the paper you state there are 3 main contributions in Scabbard can you talk us through each one;How did you implement Scabbard? Give an overview of architecture?What was your approach to evaluating Scabbard? What were the questions you were trying to answer?What did you compare Scabbard against? What was the experimental set up?What were the key results?Are there any situations when Scabbard’s performance is sub-optimal? What are the limitations? Is Scabbard publicly available?  As a software developer how do I interact with Scabbard? What are the most interesting and perhaps unexpected lessons that you have learned while working on Scabbard?Progress in research is non-linear, from the conception of the idea for Scabbard to the publication, were there things you tried that failed? What do you have planned for future research with Scabbard?Can you tell the listeners about your other research?  How do you approach idea generation and selecting projects? What do you think is the biggest challenge in your research area now? What’s the one key thing you want listeners to take away from your research?Links:PaperGitHubGeorge's homepage Hosted on Acast. See acast.com/privacy for more information.

Transcript
Discussion (0)
Starting point is 00:00:00 Hello and welcome to Disseminate, the computer science research podcast. I'm your host, Jack Wardby. I'm delighted to say I'm joined today by George Theodorakis, who'll be talking about his VLDB22 paper, Scabbard, Single Node Fault Tolerance Stream Processing. George was a PhD student in the large-scale distributed systems group at Imperial College London, and he recently submitted his thesis. And he also recently joined Neo4j in their research lab. Distributed Systems Group at Imperial College London, and he recently submitted his thesis. And he also recently joined Neo4j in their research lab. George's research interests lie in the areas of distributed and parallel computing, database management systems, and stream processing engines. George, thanks for joining us on the show.
Starting point is 00:01:00 Yeah, thanks for having me. It's interesting to discuss about our latest work on stream processing. Brilliant, let's dive straight in then. Can you start off by explaining to the listeners what stream processing is and what its common use cases are? All right, cool. So in the last years, we've seen an explosive growth of data volumes and velocity, and at the same time, we've seen the emergence of a new type of applications that are long-running. These applications run for hours, for months, for days, for even years, if you can make them work that way, and execute continuously. So they execute as new data arrives.
Starting point is 00:01:39 And this data can be possibly in the form of infinite streams, right? So you have to deal with the problem of data that are infinite. You don't know their size in advance. So pretty much this is stream processing. It's the processing paradigm that allows you to process real-time data that are infinite. According to estimates, we expect like 30% of the data in the future to be produced and analyzed in real time.
Starting point is 00:02:06 And this makes stream processing a paradigm that is established between transactional processing, online analytics, and reporting. Some common applications for stream processing are high-frequency trading, card-for-detection, sensor monitoring. And the common characteristic of these applications is that apart from having to deal with very high throughput of data, you also have to report in sub-second latency. You have this requirement where you have to respond very fast and whenever you delay your responses you get you have adverse effects so
Starting point is 00:02:46 for example in the case of health monitoring you get you have to respond within seconds right in order to matter or in in the case of a high frequency trading you you have to the significance of your results decays with over time so you have to respond as fast as possible to make more profit. Awesome. So how did you end up researching in this area? So yeah, like six years ago, I was trying to find a topic for my undergrad thesis. And it happened that I worked with some people from Imperial College London at the time. So they just had published their stream processing engine and we wanted to collaborate with them. So I worked with a system they had back at the time
Starting point is 00:03:35 trying to do stream query optimization at the logical and physical level. And they use Apache CloudSight, which is an extensive data management framework. So, yeah, we worked on how we parse streaming queries, how we optimize them. And at that time, there was also this hype with Apache Flame, Spark, and Kafka. So I decided it would be cool to work in the field. Also, one of the reasons that I chose the field was that back at the time it wasn't that mature. So there was a lot of space to experiment and try things
Starting point is 00:04:10 that people haven't tried. Cool. So let's dive into the paper then. Can you tell us what is scabbard? And am I pronouncing it right? Yeah. So scabbard is like a holster for your sword, right? Something to protect your weapons. So, yeah, we have this thing that we name our projects after weapons. Okay. Yeah, the missile project was Saber, then its extension was Lightsaber, and then Scabbard is the one that protects the Lightsaber.
Starting point is 00:04:43 Yeah, anyway, so we had this single node engine, this single node system that ran efficiently on multicore systems, and we wanted to make it fault tolerant. So this means, by default, okay, let's step back. For the context, fault tolerance means that we have to be able to handle failures because we are running our software in either local clusters or on the cloud there are cases where things can fail either because of bugs or because the node crashed right so you have to be
Starting point is 00:05:21 able to handle failures and hide their effects completely. And we try to design a system that can achieve this in a single node, which is kind of counterintuitive, given how people are doing this. You mentioned earlier on systems like Apache Flink and whatnot. So they're distributed stream processing engines and this is a single node stream processing engine. Can you explain what the differences are between these two systems? Yeah, a system like Apache Spark or Flink adopt a cert-nothing execution model and their whole design allows you to parallelize your computations to many nodes. You get, not for free, but easier features like fault tolerance. They have very nice APIs.
Starting point is 00:06:11 They are mature. You can do many stuff with them. But because of their design, they face many challenges. The first challenge they face is that because we are now doing computations over the network, we have to pay the price, right? So we have cross-process and network communication overheads. We have to serialize our data. We have to design our data structures and our algorithms
Starting point is 00:06:34 with a different way. We don't have shared memory. So we increase our resource and maintenance footprint, definitely. So this is one challenge. The second challenge is that because we are over the network, we cannot really have predictable performance guarantees. So earlier I mentioned that mainstream processing applications require very low latency,
Starting point is 00:06:55 and you cannot guarantee this when you do processing over the network, where you try to scale very high-throughput streams. And one third issue is that many parallel computations, many operators, are very hard to parallelize efficiently in a distributed environment. For example, aggregations with Windows. So Windows is a form of a structure
Starting point is 00:07:24 that allows you to process this infinite data. So in the case for example of aggregations over Windows, when you do them in parallel in a distributed environment, you have to either perform redundant computations or you have to approximate your window results because you don't have a global clock in a distributed environment. I think the most important bit is that you have to pay the tax of distributed execution. Yeah, it reminds me of the cost paper, right? Yeah, it inspired us a lot for our project.
Starting point is 00:07:57 So yeah, and at the same time, you see that there's this rise of shared multi-core CPUs, where you have many CPUs, many sockets, you have terabytes of memory, you have accelerators like GPUs, FPGAs, so you can do a lot of stuff with a single node, right? You can optimize for latency, you can optimize for throughput, you can redesign your algorithms, your data structures, and also now with the high-speed networking that's been enabled by RDMA, remote direct memory access, you can ingest a lot of data, and usually from most of the use cases we've seen where you have to handle hundreds of gigabytes of data, you can fit them in a single node.
Starting point is 00:08:51 So it's possible to have this as an alternative. Okay, cool. So you kind of summarized in your last answer there what the advantages of the single-node stream processing engines are. But what are the significant downsides, what are the pitfalls that have maybe limited the adoption of single-node stream processing engines in practice? What I forgot to mention before is that with single-node implementations that we've seen mostly from academia,
Starting point is 00:09:20 you get usually at least an order of magnitude better performance in terms of throughput and latency. So this is the benefit that you get when you don't have the tax of distributed execution. So, I mean, these are all nice, but nobody really uses these systems in practice. And that's mainly, we think that happens mainly because they lack fault tolerance mechanisms.
Starting point is 00:09:44 As I said earlier, when you run continuously and long-running applications, there's a higher chance that your node is going to fail and you have to restart and produce correct results. For many applications, it really matters. I think that one of the biggest drawbacks is that these systems don't have built-in fault tolerance mechanisms. And while the strawman solution would be to just take the approaches from distributed execution and plug them into a single node, this would end up blowing up again your cluster size, right?
Starting point is 00:10:22 Because you will need many nodes to perform persistence, many nodes to... I mean, the way they're implemented right now, many nodes to perform persistence or recovery or the bits that are part of the fault tolerance mechanisms. So we...
Starting point is 00:10:40 In the paper, we had this interesting experiment where we're using a single node system, and we... So this system could ingest In the paper, we had this interesting experiment where we're using a single node system. And we, so this system could ingest like tens of gigabytes of data per second, right? And in order to make it fault tolerant, the simplest solution would be of at least 20 Kafka nodes to persist the data streams that we needed. So, yeah, it kind of comes back to the efficiency argument, right? And if you have a single node and then you need 20 Kafka nodes, you end up in a distributed environment again. Back to square one.
Starting point is 00:11:27 Yeah. So yeah. Cool. So kind of, I guess, yeah, building off that, what were the design goals that you had in mind when you were developing Scabber? So we already had a system that had a very high throughput and low latency. Maybe you could maybe mention the previous, you mentioned earlier on the journey from sort of, what was the first one?
Starting point is 00:11:47 The first one was called Saber. Then Lightzaber, then Scammered, right? That's the flow. So yeah, maybe we can talk about those. I didn't work with Saber. I mean, I didn't work on the paper. So Saber was like the first step that was trying to utilize a single node with GPUs.
Starting point is 00:12:03 Okay. And do this, it was designed to utilize a single node with GPUs and do this. It was designed to like schedule tasks on both CPUs and GPUs and run them in parallel. And LightSaber was kind of the next step, an optimized version of Saber trying to use NUMA-aware execution, SIMD instructions, code generation. So we had like this system, this efficient multi-core system. There are many similar to that, right? It wasn't just this one.
Starting point is 00:12:31 And we just wanted to... So we already had high throughput and low latency. We just want to ensure that we can still have these performance guarantees during the execution that doesn't have failures, which is the common case, right? So execution that doesn't have failures, which is like the common case, right? So whenever we didn't have failures, we just want to ensure that we have these requirements. And the second design goal had to do with the limitations
Starting point is 00:12:57 that we have to deal with in a single node. So when you're in a single node, you have limited resources in terms of network and disk bandwidth. So you have to be careful of how you use these resources and which data you choose to store and which data you choose to seep over the network. So, yeah, our second design goal was to deal with these limitations. And the third goal was to guarantee fast recovery, because we are in a single-node environment. Of course, we don't have high availability,
Starting point is 00:13:30 but we have to, because we have a single point of failure, we have to provide sub-second recovery so that we decrease the downtime of the system when it's running. So yeah, I have performance guarantees, deal with
Starting point is 00:13:44 limited resources of a single node, and guarantee fast recovery. That was the key goals. Cool. So I guess, how did you go about addressing these design goals? What's the key idea underpinning Scabbard? And yeah, let's start with that, and then we can dive into it.
Starting point is 00:14:02 I know in the paper you say there's three main contributions, and we can talk about those in a second. But yeah, what's the kind of key idea that you had to address these design goals? So when we're trying to implement the system... Okay, a step back, which is something I haven't mentioned properly. When you're in the streaming world, in order to achieve fault tolerance, you have to persist at least all your input streams so that you can replay them and cut the correct recovery state. And as an optimization, you can always checkpoint your operator state so that you can cut down from your input streams. Now in our context in a single
Starting point is 00:14:45 node execution, we're ingesting streams in the order of gigabytes per second and also your state would reach the same magnitude, right? So you would have to persist per second tens of gigabytes, right? And this would have to happen with SSD disks, simple disks that have a bandwidth lower than gigabyte probably. So like commodity disks that you can find. And you already see that there is a bottleneck there. You cannot store all this data with a single SSD. And while you don't have this problem with distributed execution because everything is partitioned, everything works fine in a single node,
Starting point is 00:15:26 that was our key issue, right? So the idea that we tried to... The main idea behind Scabart is that you have to reduce what you store to disk in order to accelerate fault tolerance. And you have to do it in a matter that adapts to different workloads. So it has to be adaptive to different workloads so that you can get the best performance possible. Cool. So let's go through the main aspects to achieving this.
Starting point is 00:15:56 So the first thing you mentioned in the paper is this idea of a persistent operator graph model. Can you tell us more about that? Yeah, for our first contribution, we wanted to come up with a model that allows us to have the right abstractions to perform persistence and fault tolerance in a single node. So we wanted to integrate the persistence with the query execution so that we perform a series of optimizations. What this really means is that we have encoded persistence as an operation in our data flow graph, execution data flow, and therefore we can reorder persistence as you would reorder, for example, a filter or a projection, and we can apply it after we have discarded data with highly selective operators so we can for example
Starting point is 00:16:50 when we try to persist our input stream we can persist it after we have applied filters, projections or we have applied compression for example that I will discuss next and we can also identify opportunities where we can even inject operators like projections so that we can reduce identify opportunities where we can even inject operators like projections
Starting point is 00:17:05 so that we can reduce what we store to disk. Yeah, we have this set of optimizations and then we have the abstractions that allow us to efficiently persist both the data streams and the operator state that I mentioned earlier that we want to persist. And we also have some abstractions that allow us to coordinate all these operations. And based on all these abstractions, like the coordination abstractions and the persistent abstractions, we built a protocol that allows us to provide exactly what's output. Now, what this means is that we can, in the event of failure, we can hide completely its effects
Starting point is 00:17:45 and provide the exact same results as if nothing happens. And yeah, I mean, this model tries to capture only the essential information that you need to persist this so that you can recover with exactly one's output. Okay, cool. So once we have this model and the next component of Scabbard is this query-specific adaptive compression. How does this work? So I mean, after doing the optimization from before where you discard data and you pick what you're going to store from your streams and
Starting point is 00:18:21 your operator state, we found out that there was still... disk was still the bottleneck, right? So the most natural thing to go after then was compression. So we decided to compress whatever was left to be stored for disk, what was essential for us to store to disk, and we decided to go for adaptive compression. Now because as I mentioned earlier we have to deal with long-running applications we also have to deal with changing data characteristics so it was essential for us to capture these changes by collecting lightweight statistics at runtime and choosing the most suitable compression algorithm based on these statistics.
Starting point is 00:19:07 Because our initial system was using code generation for all the other operations, we decided to use also code generation so that we can reduce the overheads when you inject new compression schemes into the operator graph. And yeah, this is how the adaptive compression bit comes from. Cool. So what sort of statistics do you gather? I mean, it's very simple. It's about the data distribution. So we, for example, we gather minimum, maximum values at runtime, or we gather, for example, we try to use run length encoding if it makes sense so we gather
Starting point is 00:19:45 how many consecutive values are equal statistics like this so yeah it's very basic statistics but still
Starting point is 00:19:54 they help us to identify which algorithm to choose and sorry one last bit that we collect is
Starting point is 00:20:00 that we measure is the bit precision for integers and this allows us to to pick the right algorithm to compress integers even better. Cool. I was going to ask, how does these statistics then map to the choice of algorithm you use
Starting point is 00:20:15 and what algorithms are supported? And what is better for what? I mean, we don't have the proper optimizer there. It's just like we have probably two or three different algorithms for different data types, for example, for integers, for floats, for... It's not actually a data type, but for timestamps, we consider it as a data type. Or for strings.
Starting point is 00:20:44 But, yeah, the logic of how you choose different algorithms is very simple. So basically we did some analysis and we figured out which algorithm to choose depending on your bit precision, your, as I said before, how many sequences of equal values you have and so on so we just have agreed the approach of picking the right algorithm I guess it's maybe an area for future research yeah
Starting point is 00:21:12 and I guess we've got the persistent operator graph model the query specific adaptive compression and the last thing that is a big contribution in the paper is the efficient failure recovery mechanism. So can we dig into that a little bit? I mean...
Starting point is 00:21:28 And how it fits in with everything else, I guess, right? Yes. So the other two are kind of more conceptual contributions, so high level. This is kind of more an implementation one, so it's kind of the third contribution that you have to put in your paper just to submit it, I guess, in a way. So, yeah, I mean, as I said earlier, you have to recover fast, right? So we were trying to optimize this path of recovery. And so when we spin the system, you have to generate code for different operations, for your compression, for your operators and so on.
Starting point is 00:22:07 And all these things take time. So we figure out that instead of going through all the optimization phases all over again, trying to generate the code, we would just store the binaries and then load them. So this saved us a lot of time. And we also optimized checkpoints so that we can perform them frequently without disrupting execution. And by performing frequent checkpoints, you kind of optimize for recovery. So you can recover without having to replay a lot of data.
Starting point is 00:22:40 And finally, we have a garbage collection mechanism that deletes data that you don't want to load when you recover. So we try to minimize the amount of data overall that you load from disk and how much time it takes to spin up the system. Cool. So I guess we mentioned implementation there. So let's talk about the implementation a little bit more. So there's this path between Saber, Lightsaber, and Scabbard. How did you go about implementing Scabbard? Is it all in the same code base?
Starting point is 00:23:08 How does it all fit together? How did you go about implementing it? Okay, so we implemented Scabbard atop Lightsaber. So Lightsaber was written in C++, and we used its query execution engine, its compiler, and we extended them. For example, we extended the compiler of Lightsaber to generate additional types of tasks. So we generate tasks for persistence, for compression, for checkpointing,
Starting point is 00:23:34 and we try to integrate everything as a task in our system. We also implemented some features regarding networking, like how we do data ingestion or how we send data using RDMA just to make it faster. So anyway, everything in the system is a task, like from networking to persistence to compression to how you execute the queries. And all these tasks are executed asynchronously. And this helps our system design because you can hide all the latencies from disk and network. And you can better utilize the underlying hardware. So everything
Starting point is 00:24:15 is a task. Everything can be executed by all the cores that we have. Every core in the system is a worker. And we also optimize for locality. So all the tasks, the workers that are closer to the data will try to execute the tasks. And only if they don't have work, they will fetch something from another circuit. So yeah, this was the way to integrate all our logic with Lightsaber just like to make everything a task and
Starting point is 00:24:47 have all the workers trying to, having all the workers pushing the state of the system forward. Okay cool, so can I use lightsaber without scabber, is it pluggable or did it basically have they now forked and there are two independent systems, or can I turn it off and turn it on again, or has it been a complete, obviously, refactor of the original system, so it's something different now? So the only thing that you have to do is that you have to set some flags in your operators to make them fault-tolerant,
Starting point is 00:25:22 or to your data streams. And then, I mean, yeah, the API is not clean at all, but you have to set, let's say, some flags and you choose either to use lightsaber or scabbard. So did
Starting point is 00:25:36 scabbard pre... Sorry, yeah, lightsaber pre-scabbard as adding that code in if I turn off the flags is there any overhead there by since you've added scabbard in or am I basically getting
Starting point is 00:25:52 lightsaber as a baseline and then I'm just kind of losing a bit of performance by adding scabbard on so yeah definitely I haven't engineered the code that well but no don't say that it's brilliantly engineered i'm sure it uh so yeah it just if you don't set the flaps just you just like follow
Starting point is 00:26:13 the lightsaber let's say code path and that's it you don't have any okay so you get it like as it was okay that's fine cool so what was your approach to evaluating Scattered and what were the questions you were trying to answer? Cool. Okay. So the first thing that we wanted to answer was whether, I mean, after doing all these optimizations, all these new abstractions and all these things in a single node, whether its performance was comparable or better to existing state-of-the-art solutions like Apache Flink, for example, that is fault tolerant, right? And we wanted also to see what's the overhead that we added. I mean, you mentioned earlier the overhead.
Starting point is 00:26:54 We want to see what's the overhead compared to a system that's efficient single node system and doesn't have fault tolerance. So this is the first part, like our end-to-end experiment in terms of fault tolerance to see its overhead and how it compares to state-of-the-art approaches. And the second question that we want to answer was whether our approach of persisting data,
Starting point is 00:27:18 like data streams, was comparable or better to an approach like the one used by Apache Kafka. So, yeah, Apache Kafka is like this distributed messaging queue that people use to persist their data streams. So I want to compare with that. What was the experimental setup? And you mentioned there what you compared Scabr against.
Starting point is 00:27:39 But, yeah, what are the results? So by setup, do you mean like the workloads and where do you run experiments? Yeah, what are the experiments you run, basically? I can reframe it as that if you want. What are the experiments you run? Okay, so in stream processing, you don't have a benchmark to use. It's not a TPCC for a benchmark. Yeah, TPCA or TPCC or whatever.
Starting point is 00:28:04 So people just make up their own benchmarks or they use some that are not really good. So yeah, we just went with 10 different queries that capture different operations like filters, joins, aggregations, windows, and so on. And in the paper, we have results from both our local cluster where we run experiments with RDMA and also from AWS where we run experiments using EBS for remote storage because we also want to verify that it works with remote storage in case that you have a failure
Starting point is 00:28:43 and you have to start up in a different node. So yeah, we just hijacked AWS hardware there. So yeah, I mean, for the first question, for the end-to-end experiments, we used Apache Flink, which is state-of-the-art, but it's a distributed Java-based system. And we also used Lightsaber just to see the overhead compared to a system that doesn't have hot dollars at all. Now, compared to Apache Flink, we got at least an order of magnitude better results in terms of throughput and latency.
Starting point is 00:29:15 And compared to Lightsaber, we saw, on average, a 30% performance degradation in terms of throughput and close to eight times, I think, if I remember correctly, worse latency because we're adding in the path the disk. So the throughput
Starting point is 00:29:33 was good, but yeah, you have to wait a little bit more for acknowledgements from the disk in order to continue with the processing in many cases. Did you run any experiments to kind of measure how fast things recover? And did you compare, did you do like
Starting point is 00:29:48 a recovery experiment to see how quickly things came back online and failures occurred? I mean, I didn't try. I don't know what that would look like even,
Starting point is 00:29:55 but did you explore that? Yes, I didn't try. I mean, we tried fling, but the results were really bad, so we didn't include,
Starting point is 00:30:02 yeah, we didn't include the graph there. We kind of cheated in our recovery experiment. So, I mean, in order to have a proper recovery solution, you should be able to have a module that can discover that you failed, which I guess is the most expensive part, right? Because you are never sure whether you failed or not.
Starting point is 00:30:24 And I guess you can do it with timeouts or? Because you are never sure whether you failed or not. And I guess you can do it with timeouts or something like this. I'm not sure. There's some solution to that, right? Yeah. Anyway, so we skip that part and we just measure how fast we can recover from the point that we fail, assuming that we instantly know that we failed and have to recover, right? And in that case, we had sub-second recovery latencies, which was, yeah, really good.
Starting point is 00:30:51 As I said, we optimized for it because we just reloaded everything from this, like the binaries and all the code for compression and the compression was there, so you knew what to do. So yeah, that was an experiment for recovery. I mean, it wasn't an end-to-end experiment. It was kind of a micro-benchwork, but yes. It gives you a flavor of how it would perform in practice, right? Yeah.
Starting point is 00:31:11 Which is good. So that's kind of obviously all the headline results of your experiments. But are there any situations in which Scabbard's performance was suboptimal? The question I'm kind of getting at here is, what are the general limitations of Scabbard's performance was suboptimal. The question I'm kind of getting at here is what are the general limitations of Scabbard? It's an academic project, so it has many limitations, I guess. So first, in our Exactly Once protocol, we assume that the sync, so the sync is the receiver of the results, the sync participates in the protocol. So the sink has to, we are sending to the sink some metadata that the receiver, that would allow it to
Starting point is 00:31:52 perform the duplication. So when you recover and you start sending again your results, you may send some duplicate results. And in our protocol, the receiver is responsible for dropping these results with civic assumption, because it may not be able to protocol the receiver is responsible for dropping these results with simply consumption because it may not be able to handle this logic. So if it couldn't handle this logic then we would have to end up with something like distributed transactions and then we go back to distribute execution again and yeah it's the same issues. But maybe you can...
Starting point is 00:32:25 So maybe a way you could do it is that you could offer it as a service and you would have probably many scabber nodes for many users and handle distributed transactions in such an environment where you wouldn't have the overhead of a single node having to deal with transactions. The second limitation is that we cannot deal with non-deterministic operators. So imagine that you have an operator, for example, that does access to some external database. Yeah, we cannot definitely handle this.
Starting point is 00:33:01 Because in such case, you will have to track down some additional metadata and replay and when you do a replay, when you restart, you have to be sure that everything happens again as it happened earlier. So you have to track your history and track more
Starting point is 00:33:20 metadata there. So yeah, that would be an extension and we also cannot handle out-of-order data. So what this means... Okay, so we assume that everything comes in order. So everything is... You would never find a tuple that has a late timestamp. Okay? So your data is ordered by timestamp and you assume that everything that comes is in order by the timestamp that you have. So if something comes out of order, it would break our execution engine. So Lightsaber was designed to be in order. And because we built on top of Lightsaber, we just didn't try to extend it towards that area.
Starting point is 00:34:06 So to extend our system for out of order data you would have to introduce some additional markers, some special tuples. And these tuples would tell you how far you've reached in your data in terms of the timestamp. So it will tell you for example that you've seen all the data up to minute 10 or something. So you know that it's safe for you to do all the operations up to that point. So yeah, these are, I think, the three most important limitations that we have. Okay, cool.
Starting point is 00:34:39 So is Scalab publicly available? Can I go and get my hands on it and play around with it if I so please? Yeah, I mean, you can find the link for the GitHub repo in the paper. Cool. And we'll link it in the show notes as well, so the listener can go and find it. Even better, yeah, you can find it.
Starting point is 00:34:59 Cool. So I guess building on that theme, if I was to go and play with it and have a mess around with it, I know you said it's only an academic project, so it won't be as usable as a commercial system, say. But what's my interactions with Scab and Look like? How do I go about writing applications on top of it? As you said, it's an academic project, right? So, I'm the only one who spends time writing code, and I'm probably
Starting point is 00:35:29 one of the few people that have used it. Not after this podcast, everyone listens to it. You go crazy. So, I have to fix many things. So, yeah, I mean, it has an imperative API.
Starting point is 00:35:44 So, you kind like define your operators. You can define filters, projections, joins, aggregations, and you kind of chain them together so that you have pipeline parallelism. It's kind of similar of how you would create, for example, in Spark or Flink or Storm, your pipelines. But it's, yeah, much more rough. Okay, cool.
Starting point is 00:36:12 But it bears resemblance to that, obviously. So, yeah, we also have some, I mean, all the benchmarks. We've implemented all the benchmarks and have the code in the repo. So you can find and use them as, let's say, your base to build something that looks like these applications. So I guess my next question is, what was the most interesting, and I ask this to everyone, what's the most interesting and perhaps maybe unexpected lesson that you learned while working on Scabbard? I mean, when we started the project, I was very pessimistic because I wouldn't expect to get good performance while persisting so many data on disk. So in a way, having this, I mean, it's not minor, but this small performance degradation while you store such an amount of data on disk was kind of unexpected.
Starting point is 00:37:08 So, yeah, that was like a really nice result of how you can multiplex data storage with good execution without affecting performance. Cool, cool. So, yeah, I guess you're very pessimistic at the start of the project. And progress in research is very nonlinear. There's a lot of ups and downs so kind of from the conception of the initial idea for scabbed to the publication i guess how long did this did that take and what were the things along that journey that you tried that failed that maybe other people in this area could benefit from knowing uh okay so i didn't have any experience on in fault tolerance before so i had to learn everything the hard way.
Starting point is 00:37:46 And I was very naive at the beginning on how you can perform, for example, checkpoints. So in our first submission that got rejected, of course, I was trying to have a more general model where you can checkpoint different data streams and different operators without synchronization, which at the time I thought was very cool. But it was very hard to reason about because when you were recovering,
Starting point is 00:38:12 you would end up in scenarios where your state was not consistent. So you would have to replay the whole thing from scratch. So that was an issue that I found. And also I had many issues with the implementation so I thought that I was very smart and I could do very fast like lock-free programming
Starting point is 00:38:34 and use like very smart data structures for doing checkpoints while doing processing at the same time insert memory data structures and yeah I failed miserably. So yeah, my code was full of concurrency bugs and I had to go back to using logs and doing the simple things.
Starting point is 00:38:53 And in the end of the day, this was just premature optimization. I tend to do this, so I try to optimize for things that are not important. And yeah, I shouldn't do this. Okay, cool. I know you've since submitted your pieces and yeah I shouldn't do this okay cool I know you've since submitted your pieces
Starting point is 00:39:09 and have left academia but what's the future the plan for future research with Scammer I guess is your group going to plan on taking the project forward or what's the future goals for it so I mean we we didn't go far with these ideas,
Starting point is 00:39:27 but we wanted to test two different things. So the first was multi-tenancy in stream processing. So because you have these long-running applications, you may want to have multiple users in a single server, and we want to see how we can manage all these limited resources that you have when you have multiple users. So now it becomes much more challenging to deal with the CPUs, the disk, the network bandwidth. So yeah, that was one of the directions.
Starting point is 00:39:55 And the second one was, okay, so now we have a system that can persist data streams and state fast. But there's no way that you can do queries on both historical and streaming data. So you cannot really do rank queries. You cannot combine these two together, right? So we would like to have an approach for out-of-core processing where you can spill things to disk,
Starting point is 00:40:26 you can have indexing, but designed in a different way compared to classical relational systems, right? So that you could still have these low latency requirements fulfilled in a way. Yeah, so obviously over the course of your PhD, you worked on numerous topics. Can you maybe tell the
Starting point is 00:40:47 listeners about your other research and things you worked on during your time? Yeah, so I mean we mainly focused on single node and when we before I tried to go distribute my time I did.
Starting point is 00:41:04 So yeah, we started from optimizing specific operators with simple instructions, how we can run very efficiently from a single core. Then we moved to multi-core execution with multiple sockets. And we tried to bring ideas from relational database, like code generation to stream processing, which was really interesting. And finally, yeah, we tried to make a single node system at the same time fault tolerant with the assumption from Alien. So yeah, we're trying to fully
Starting point is 00:41:39 utilize all the resources you can find in a single node, trying to make the most out of it. Cool cool and i don't know if that's any of these questions so apologies if it catches you off guard not but i was walking down the street the other day i thought it's an interesting question how did you go about during your phd and in general i guess even in your day job today it's very research orientated how do you approach idea generation and then how do you then choose what ideas to work on and select projects that are worthwhile pursuing what's your process for that i guess is what i'm asking yeah i think i
Starting point is 00:42:10 don't have a very good process so i i read a lot of papers and i so as i said before stream processing is not that mature so usually i try to kind of not still borrow ideas, let's say, from relational database or other processing paradigms. And I try to adapt these ideas from the papers that I read to a real-time analytics world, like a streaming world. And I was also... So, mean, I had like a lot of ideas, most of them not very smart but I was also
Starting point is 00:42:51 kind of lucky because I had two supervisors and they were coming from two different worlds so when I was pitching ideas to them they also helped me steer and pick what I was going to do. So yeah, as I mentioned earlier, I don't think we did anything.
Starting point is 00:43:09 I mean, there was novelty, but I don't think that we did something that was unexpected. So yes, as I said, we tried to implement code generation in stream processing. So I mean, it was interesting, but yeah, this was, for example, a project that came from relational databases. What do you think is the biggest challenge in stream processing today? Yeah, I think that the project that I mentioned earlier about unifying streaming with historical data with a way that you can provide low latency results. I think this is one of the biggest challenges because you have to deal with novel data structures for indexing,
Starting point is 00:43:53 novel ways of, even protocols of how you buffer your data because you don't have like the, let's say, the relational database, a pattern how you you do the access right so you have to think of what data it could be your code data your code data and how you evict data for example from your buffer manager so yeah i think that this unification is very a very interesting problem cool so last question now what's the one key thing you want the listeners to take away from this episode and your research? I found this quote in Frank McSery's slides. It goes like this. So it says you can only have a second computer once you've shown you know how to use your first one. so yeah I think that this is what we're trying to to show with
Starting point is 00:44:45 our projects that you have to somehow think out of the box and try to redesign systems so that you achieve resource efficiency and you exploit the hardware trends that you that you can find right now so yeah that's it
Starting point is 00:45:01 yes brilliant and that's a wrap let's end it there thanks so much george for coming on the podcast if the listener is interested to know more about george's work then i'll put all the links to all the relevant materials in the show notes and we will see you next time for some more awesome computer science research Thank you.

There aren't comments yet for this episode. Click on any sentence in the transcript to leave a comment.