Hardware-Conscious Data Processing (ST 2024) - tele-TASK - Multicore Parallelism (2)

Episode Date: May 29, 2024

...

Transcript
Discussion (0)
Starting point is 00:00:00 So thanks for coming. Today we're going to continue with multi-core parallelism in hardware conscious data processing. And before that, two announcements. One I already did last time, but I'm going to do it until it's done. So we're going to have this presentation by Gerd Schreffe next week on Friday, 1 PM.
Starting point is 00:00:22 And this should be interesting. He's from Google. He's an old database person. Not old. He's a well-established senior database person and did a lot of fundamental work, especially the volcano query execution model and optimizer. So that's something you can learn a lot from this guy. So I recommend coming.
Starting point is 00:00:50 And then I'm happy we found somebody to do our invited lecture in the last week of the lecture. So on the Tuesday in the last week, Piotr Ratusniak from Intel will come and tell us about how they use FPGA in practice. Or somebody else probably because they're building them. But so practical applications for FPGAs. I don't have the exact title yet. I don't know the exact contents but he said he looked at my slides for FPGA and he thought this would be a nice thing on top
Starting point is 00:01:26 to talk about. So I'm really looking forward to that as well. And I hope you do too. So we're still in multi-core parallelism and soon to be done with, let's say, the on-chip core package. And then we'll go after locking and NUMA will go to basically the peripherals. So this will happen sometime soon.
Starting point is 00:01:54 And I think everything else is kind of clear. So let's go to where we left off. So we talked about parallelization already and basically how to use parallelism in the modern database system. So how to use multiple threads, different ways of using threads and basically pull and push model. And now we're going to talk about how to parallelize individual operators in more detail. And for this, we'll especially look into joins as one example, like a more complex operator. And we'll go through two types and then
Starting point is 00:02:40 many subtypes of these joins, so hash join and a sort merge join. And then we'll split them up, basically seeing how we can get parallelism out of these. And I mean, before we do so, maybe just a few more general hints. So basically, we want, if we have, or if we want to use parallelism in an inter-operator intra-operator way and we also said we can use multiple queries to get parallelism at the same time so inter-query parallelism rather than intra-query parallelism we can do pipeline parallelism in order to parallelize an individual query by basically having the individual operators run in parallel but not parallelize the operators. We can also have
Starting point is 00:03:33 bushy trees, so basically have like different parts of the query run in parallel, not in a pipeline fashion, if we have more complex queries. But we basically say okay let's go even further down let's break this further down and so we basically need to split up the work that an individual operator has to do and usually we do this in in basically by breaking it into separate parts many part many tasks in the database system are embarrassingly parallel. So scanning something is embarrassingly parallel. Filtering something is embarrassingly parallel, meaning we don't really have to think much how to do this in parallel. There's no communication between different parts of the operator. But something like a join, then we need to communicate, right? So we might, like if we split it up into different portions,
Starting point is 00:04:29 we might have to need to do some exchange in order to figure out everything. Similar things are aggregations, like deduplication. These are kind of operators where we need to communicate. We need to remember stuff, different tasks will have to exchange information. In order to schedule this and make it efficient, we want to have separate tasks. Each of the tasks ideally has a similar workload size, a similar kind of amount of work that
Starting point is 00:05:02 needs to be done. And then the OS or the task scheduler can basically execute these or assign these to separate worker threads and then the worker threads do these somehow in parallel. Ideally, we have more parallelism in there than we actually have tasks or actually have threads in order to do some load balancing over time because then we can actually dynamically schedule if we have exactly the amount of tasks that we have threads then it means we need like a perfect
Starting point is 00:05:41 balancing in between the different tasks because otherwise there will be a lot of imbalance between the different works. So that kind of gets better if we have smaller and smaller tasks. But of course we don't want to have too small tasks because then we're going to have too much overhead again. So we want more tasks than there are threads. If we have kind of like a, or if a single task workload exceeds a certain threshold, then we want to further split it up if possible. And we want to have a good load balancing in between the tasks. So like a good example is if we have like a, or a bad example, actually what happens
Starting point is 00:06:29 a lot, I also mentioned this last time. If we have something like a MapReduce system, say we want to, we're using a reducer for, for each group for like uniqueness, for example. So we unify or we deduplicating our values and we have just have a single value in a database then all of that work would automatically go to a single tasks and if if we just partitioning by by values for example, then Nothing or we're basically single threaded again.. So these kind of problems happen a lot. So here we basically need to further split down and have smaller tasks in order to properly
Starting point is 00:07:11 balance the load. And here's an example out of a nice paper, out of ETH. So here you can basically see a radix join. I'll explain the radix join in a bit. So this is one form of a hash join, a parallelized hash join. And so the idea is that we have multiple phases. So that we're basically first computing histograms on different partitions of the data. So just to figure out like what kind of values do we have in there, then we partition the data according to say some bits. As I said, I'm going to explain this in a bit more detail. And then we do the actual join.
Starting point is 00:08:02 And what you can see here, if we just assign tasks according to the number of threads that we have, so this is like a not super highly multi-threaded machine. We have eight threads, eight cores probably here. Usually in a database system, we're going to use the number of threads that we have hardware cores because we can fully saturate them unless we have like different kind of work to be done or many different queries, for example, that we want to run in parallel, then it might make sense to better utilize the different functional units. But if we have one large query, then we'll just assign this.
Starting point is 00:08:46 As you can see, the initial histogram is kind of fine. The first partitioning phase is also fine because we're basically splitting this up exactly by, let's say, task size. So we're just splitting up the data in exactly the same amount. And the work that needs to be done by every core or every thread is the exact same. But then all of a sudden we have some skew in the data.
Starting point is 00:09:18 And this is as soon as we're partitioning and the data is skewed, we're partitioning by a key, then we'll get differently sized partitions. And if we don't manage this, if we don't readjust this somehow, then we will basically have some kind of, yeah, we have some imbalance here, right? So this is basically in operations in cycles
Starting point is 00:09:43 that need to be done. And you can see, and there's some synchronization after each phase and that's required in a hash join because first we're basically building the hash table and then we do the actual join. And even then, like if the partitions are very different, then this means like one thread will take much more time
Starting point is 00:10:03 than the others. And this is basically what everybody else will have to wait for. And we're blocked by this and same in the join phase. Now, if we're reducing the task size here and we basically have more tasks than we have thread, then we can have a much nicer distribution here. We can dynamically schedule the tasks, getting a much more balanced workload. And we can see here, we get an improvement by 25% roundabout. Because just where we don't have this kind of additional time that nobody does anything, right? Or most threads don't do anything and one thread is just blocked by everybody else or one thread blocks everybody else.
Starting point is 00:10:52 What you also can see, I mean, you can roughly guess it, right? So overall, this does not really seem, I mean, this seems like there's more time that we actually waiting than, or more empty space. Let's say the sum of all black parts seems to be less here than here. And that's probably true because of the task overhead, right?
Starting point is 00:11:16 Like smaller tasks, more scheduling overhead. So we need to do a bit more work here, but it pays off because the other threads don't have to wait. So basically all threads always do useful work in the beginning. This is also true for the upper graph, but in the simple task queuing, then we basically get a 25% reduction in performance just because threads wait. And if we're doing fine-grained task composition, then we get a good load balancing. Okay, so regarding data partitioning, well, we want fine-grained partitioning because this increased scheduling overhead will pay off in the end,
Starting point is 00:12:07 even though we're doing more work in total, because all of the cores can properly work all the way until the end of each phase. In total, we're better off because more work can get done in a short amount of time. We get more parallelism in total. We also want to have like a dynamic assignment of the tasks and partitions because that makes the balancing easier. We could also do this statically, assuming partition or like distributions, et cetera. But again, we might get worse.
Starting point is 00:12:47 Or if we're wrong, we will get worse performance in the end. We will have more waiting in total. And so now the question is, how can we properly incorporate this? And you remember the different kind of execution strategies, I hope. So Martin presented this to you. We have like Volcano, Toppler at a time way of doing it. We have query compilation, we have vectorized execution, et cetera.
Starting point is 00:13:18 And in the query compilation strategy, there was one thing that's sort of one way of executing this is in morsels. So morsel-driven parallelism. And the idea here is basically if we have something like a join, for example. So here we see basically the second phase of the join. We have like, we've already built our hash tables so two two hash tables as a sort of a join of like a three-way join with a relation r relation s and
Starting point is 00:13:56 relation t and we're joining relation r with s and then the join of the two with T. And rather than doing this on the whole table or partitioning the table into, let's say, number of threads sub-partitions, we're partitioning it in a way that we have small bytes or small pieces of the table that we can nicely assign or that we have, that give us like enough work for a single thread. So the thread doesn't like have complete overhead
Starting point is 00:14:33 in the scheduling all the time. But at the same time, we have enough tasks to balance in between the work, in between the different threads. And we can rebalance because there's more work than we have number of threads. So we basically split the input data into constant size work units or morsels. A morsel is basically a bite of food.
Starting point is 00:14:57 So basically each core gets a bite of the data at a time and can chew on it until it's swallowed and then gets the next byte so that's the idea and we have a dispatcher this assigns the morsels in a pipeline or like the operators or a pipeline of operators and then the workers threads actually pull from the pipeline. So this means every worker thread gets their morsel, chews on it, swallows it, then grabs the next one. Right? So basically we get like a nice way of, or a nice dynamic scheduling by pulling the tasks
Starting point is 00:15:41 from the queue. And the number of worker threads is the number of the queue. And the number of worker threads is the number of hardware threads. And of course the operators need to be designed for parallel execution. So this is, so we want each of the operators be parallelizable and be able to work with these individual morsels.
Starting point is 00:16:02 And then at a certain point, we need some communication. We need to do some exchange. And this is basically where this pipeline then will break. And we'll have to start with a new, we basically have to finish one sub-pipeline and then start with the next operator or pipeline of operators. And as an example, so typical morsel size,
Starting point is 00:16:22 at least proposed in the paper is a hundred thousand tuples so this means if you actually want to use this uh you better have a many tuples in your table that you actually want to join right so this is not something like this kind of level of parallelism this does not make sense if you're joining a hundred tuples in a hundred tuples you're joining 100 tuples and 100 tuples. You're probably not going to get any speed up there. Also, this means like for an OLTP kind of workload, this kind of parallelism in that way will probably just lead to overhead, right? So like any kind of query in an OLTP kind of fashion
Starting point is 00:17:00 most likely will always be within a single morsel. So this means we'll probably do this in a single thread. There are some exceptions every now and then, but in the general cases, this is true. Okay, so let's look at this again. So we have pipelines again. So this is kind of the same join. We have T, S, and R. For T, we're doing a selection and then build the hash table. For S, we're doing a selection and then build the hash table. And then we're joining R with these,
Starting point is 00:17:41 or we're basically using these two hash tables to first filter R and then probe into these kind of hash tables. And the way you would implement this in this mortal driven parallelism or in this hyper type of execution model is that for each of the sub-pipelines here, so S and T, we're not just doing like initially the selection in parallel because I mean the output of the selection, we would just basically materialize and do something else again, but we can actually also do the hash table build within the same pipeline, right?
Starting point is 00:18:22 So it makes sense to read this, do the selection or filtering, and build the hash table in a single pipeline. And this we can do completely in parallel, so there's nothing stopping us from doing this on the different kind of morsels within this tuple, but we need to build one large hash table somehow. And we'll see about different strategies how to do this. And this can then be done by multiple threads in parallel. And we can do this for both of these pipelines, for the S and the T hash table, we can do them in parallel, or we can do one first and then the other first. And in the second step, we do the last pipeline here, which is the basically R, like scanning R, filtering R,
Starting point is 00:19:11 and then probing these two hash tables, right? So, and again, we can do this completely in parallel. So we have one morsel that basically scans one part of the table R, like one morsel, does some filtering, probes with this morsel, probes the first hash table. If we have a match, we're probing the second hash table. And if we have a match again, we have an output topple
Starting point is 00:19:40 that we can basically write into our output. And this, again, can be done in parallel. We just have concurrent access here on these hash tables, essentially. OK, clear so far? So this is basically from a general strategy how to do this. Now we have to think about how we do this in the operators themselves.
Starting point is 00:20:04 So if we have joins for example, then I mean you get got some idea already but we'll look into more detail. There's different strategies how do we do this. So let's look into joins. So why do we want joins or why would we care about joins? Because joins are expensive and I mean this, again, Andy Pavlo here in Impala. So that's one cloud database, essentially, or a large scale data processing engine. Let's say what you can see in their typical workloads, you can see 50% of the time is spent on hash joins. There's also a lot of time spent on sequential scans, unions, aggregates, etc. But the major part is actually spent on the joins.
Starting point is 00:20:57 And this is mainly the CPU time, right? And the workload here is TPC-H. This is to be taken with a grain of salt because tpch is a benchmark if you look into real world workloads a lot of time is spent into text character casting and that kind of stuff right so i mean a lot of time a lot of data that's that's basically put in a database is text data then this will take a lot of time, a lot of data that's basically put in a database is text data, then this will take a lot of time. That's nicely parallelizable. The joins will still make a considerable amount of work, especially in, let's say, an enterprise database where you have many tables and very
Starting point is 00:21:40 large tables. There, you will do a lot of joining all the time. That will cost you a lot of time. If you have big data, whatever stuff, then a lot of time will just be casting, et cetera. Still, joins are important, take a lot of time. And so we want to properly parallelize them. And we want to do this with multiple threads in an intra-query parallelism approach. And there's two main approaches. That's a parallel hash join or parallel sort merge join.
Starting point is 00:22:16 We can also have something like an index join or nested loop join. Even a nested loop join is important, just because unlike the hash join and the sort merge join, the nested loop join can implement any teta join. So who remembers what a teta join is? So what do we implement with a hash join? It's an equi join, right? So an equi join means we're basically comparing for equality. So we're basically just like for a key, we're searching the exact same key.
Starting point is 00:23:00 And that's good, right? And this is the most common case in the database because we're joining by keys frequently. But sometimes we want to join with something else, being like greater than text, whatever, similarity, or something like that. And that's what we call a Teta join, meaning the comparison operator can be anything. And this can be implemented in a nested loop join because... Because it basically checks every combination of tuples and we can...
Starting point is 00:23:41 Exactly. Exactly. So the comparison is done on, like we're basically comparing every tuple with every tuple. And so any kind of comparison operator will work, which is not true in a hash join. In a hash join, we only compare things that have the same hash. So most likely have the same key or just a collision in the hash.
Starting point is 00:24:04 And in the sort merge join, again, we're sorting. We're just comparing keys that are in the same sort order. We might be able to do something with ranges or something like that. I would have to think about it. But typically, this is just for an equid join. So we also want a nested loops join, but in most cases, this will be inefficient because we're doing many more comparisons than we need for an IKRI join.
Starting point is 00:24:34 So that's why we want this parallel or this hash join or sort merge join. Typically the sort merge join is slower because sorting is expensive. Just as a quick brain teaser again, why would we still want to have it? Yeah, that's one. So we could optimize by directly using a join that outputs a sorted result. That's one option, yes. Like a shared hash table that would be a memory problem. Okay, so we want the output to be sorted. So that would be one. The shared hash table might be a problem.
Starting point is 00:25:24 But there's one other reason. So that would be one. The hash table might be like the shared hash table might be a problem. But there's one other reason. That's not so much on the output or the operator state internal, but it's on the input. If the input is already sorted, then we're basically skipping the sort part. We're just merging. And then it basically is just a scan of the two tables
Starting point is 00:25:49 rather than doing anything else. And the scan will also have to do in the hash join, right? So we have to do that in the hash join. We basically have to do the scan, build the hash table, do the whatever comparisons. If we have sorted tables already, then it breaks down to just basically comparing whatever comparisons, if we have sorted tables already, then it breaks down to just basically comparing two tables
Starting point is 00:26:10 in a linear fashion. So that's then much more, typically that's then cheaper, not necessarily much cheaper, but it will be cheaper than building the hash table and doing the comparisons. So that's why, I mean, if there's already a sort ordering here in our input, then it makes sense to use it. And of course, if we have an output, then it might make sense. Or if we need unsorted output, that also might make sense if our output
Starting point is 00:26:39 is similar size to the input. If our output is super small, maybe sorting later on might make more sense. But again, there we get into optimizations, basically then we need cost analysis, seeing how large the results will be. Okay, so let's start with the hash join. I hope all of you vaguely remember, we've sort of talked about this already,
Starting point is 00:27:04 but let's go through it in a bit more detail again. So in order to build or to compute a hash join, we first need to build a hash table on the inner join relation in the build phase, and then we scan the outer table in the probe phase and basically probe each, or for each tuple we're probing the hash table. All right. So it's typically two phases and this is also reflected in this Morse parallelism. So we basically we have to completely build the hash table before we can ever probe the hash table.
Starting point is 00:27:46 So, that's why it's also blocking. So, we first build and then we point. There's really a break here. Even if we parallelize this, this needs to be finished until we can do the next thing. It's not true for all things, but I don't want to digress too much. But there is also variants where we can actually do this in a streaming fashion. But for this, you will have to go to big data systems. Okay, so let's go. Let's look at this. We have the two tables. Typically, we're using the smaller relation
Starting point is 00:28:26 for the hash table. And that's because then we have a smaller hash table. It means we can easily fit it in memory. Also, building the hash table usually is a bit more expensive. So, that's basically we want a small relation for building the hash table. So we're scanning through the relation, we're hashing it, storing the results, and then we're using the second table, the S table, we're hashing and we're probing into the hash table. And if we find the matches, then we're outputting the matches, we have our results. And now the question is, how can we parallelize this? Again, there's these two phases, so we want to basically parallelize both phases.
Starting point is 00:29:14 And as you can already imagine, there's many ways of doing so. At least I hope you can imagine. So essentially, we have a couple of different things that we can parallelize. On the one hand, we can parallelize. On one hand, we can parallelize the building, basically breaking up relation R. Breaking up relation R here into multiple partitions, then hashing in parallel, building the hash table. The hash table can be shared or can be partitioned again. And then we're also partitioning S,
Starting point is 00:29:54 we're hashing in parallel, and we're either probing against the shared hash table or a partitioned hash table. So either it's non-partition based, so we have one global hash table. That means during the build phase, we're just like all of the threads hammer on one large hash table,
Starting point is 00:30:17 fill in this one large hash table. And we have one global hash table, which is easier to maintain. And then we're just probing this one large hash table with the many separate partitions or many threads on the other side. That might be a problem if we have multiple threads, multiple cores, then they will try to work in their caches. If we have one global hash table and two threads basically get to the same kind of cache lines,
Starting point is 00:31:00 then the caches will be invalidated. We basically get some contention on these cache lines. Everything will get slower because the CPU needs to do some synchronization here across the different threads. If there's enough data and there's not so much contention, then this shouldn't be a problem. If we have this problem, then we might want to partition, actually.
Starting point is 00:31:23 So we might want to partition both tables with the same kind of partition function. And then we can build separate hash tables for these separate partitions. So we're partitioning R into ideally equally sized partitions and put the results in these separate hash tables. We're partitioning S according to the same partitions and put the results in these separate hash tables, repartitioning S according to the same partitions, and just probe the separate sub-hash tables. So that's a partition-based. And now the question is again, here, how many partitions are there?
Starting point is 00:31:59 Is this like a single-step partition or this is a multi-step partitioning in here? Gives us yet another chance of freedom. So if we have a partition-based hash join, then our first step is we have a partition phase. So we divide the tuples of both tables into partitions by using a hash on the join key. So this basically, I mean, this could
Starting point is 00:32:30 be something like a modulo. Of course, a modulo is not great because we often not get a good partitioning. Then we're building in parallel. So we're scanning the partitions of the R table and create the hash table on the on each like on the key per partition. And then we're doing the parallel probe for each of the partitions in the correct hash table. And of course if we find a match then we build the output tuple and or combine the tuple and output it.
Starting point is 00:33:07 And if you're reading papers on this, you've got to be careful because this last step, the very last step, this is often omitted in many papers. So they're basically, they're just building the partitions, building the hash tables, probing the hash tables, but they're never really finally materializing the output, which actually is a lot more data movement in the end, especially if you have large tuples. Because if you're just checking keys, everything nicely fits into caches. If you have large tuples in the end, then all of a sudden you need to do a bit more work there. So you get super high throughput in research joint papers, but that's mainly because you're not doing the full work in the end. Okay, so what we do is,
Starting point is 00:33:55 just visually again, we're basically partitioning and then we have separate smaller hash tables, right? So we have these separate smaller hash tables that will directly match. And these can have a single key, but can also have different kind of keys. They're just going to be sub-ranges, and we're directly, through these partitionings, we basically directly know what to match where.
Starting point is 00:34:26 And ideally, or I mean, like one way of optimizing further here is we can play around with the sizes of these partitions to make sure that they fit in caches, that they fit in the translation look-aside buffer and these kinds of things, right? So that the number of keys that we get in these sub-partitions actually nicely fits to our memory hierarchy. And we get a good performance improvement just because everything nicely fits. And in reality, the order is actually up to you or up to
Starting point is 00:35:03 the engine. So of course, we need to partition this first, then build this. But if we partition the second table right away after we partition this, or if we first build and then partition and probe, that doesn't really matter. It's more...
Starting point is 00:35:21 Yeah, it doesn't really matter. It's basically this. The neat thing is that these things can become completely done in parallel, right? So this partitioning and the partitioning and building here can be done in parallel. While we need to be done with the building until we can start with the probing.
Starting point is 00:35:42 So that's kind of the blocker here. So now the question again, right? So yet another step, how do we partition? Do we partition globally or do we partition locally? So, because I mean, if we do global partitioning, we again need to exchange information. Sort of makes sense, right? So, this here, the idea is that we get separate
Starting point is 00:36:16 partitions that we can work on in parallel. But from the partitioning part, or here from the partitioning, but from the partitioning, if we break this up into sub parts, the question is, do we exchange the information here? Do we have like, let's say virtually one large set of partitions where we add data in, or do we do this in a smaller, like fine grained level
Starting point is 00:36:43 where each thread essentially partitions by its own and doesn't have to communicate with anybody else. I mean communicate in essence just by using the same memory regions, writing to the same kind of partitions. That again needs synchronization. So if we have a partition with multiple threads right in, then they need to know where can I actually place my tuple in this partition, because if I don't synchronize here, they will overwrite everything all the time. So this basically means it costs more time again.
Starting point is 00:37:17 So if we have private partitions, every thread works completely independently, but then we'll have to combine the partitions later on. So that needs, is an extra step of work yet again. Okay, so if we do like do this in a single pass, that's basically if we have shared partitions, that means that we have the whole table, we're partitioning by, say, the attribute B, so then we're splitting up our table into separate morsels, again, right,
Starting point is 00:37:52 so 100,000 tuples at a time. One thread starts basically just hashing those and figuring out in which partition they belong, and then we have the number of partitions that we want to have as an output. So we're splitting up the key range or the hash range into the number of partitions that we want to have. Say, I don't know, number of threads for simplicity matter. But then that means all of the threads
Starting point is 00:38:22 will basically write to the same kind or to all of the partitions. And that means we need to do some kind of synchronization here. We need to do some kind of locking such that they don't override whatever everybody else writes. So that we're basically improving or increasing the counters or the positions, the addresses, where the next tuple is lying in the partition. An alternative to this is basically, we're having these private partitions, so every thread basically has their own partitions,
Starting point is 00:38:59 no communication, whatever, no synchronization, no locking needed, Everybody works by themselves. But then we'll need some kind of a merge phase in order to combine these partitions after all. So this is basically private partitions. And we're still in a single pass setup, right? Meaning we have one partitioning step, not multiple partitioning step. And then we have one, like an extra step that combines these separate partitions.
Starting point is 00:39:32 Okay? Good. So, now the question is, I mean, we're still in this, so we have partitioning. We can do it like we thought about single partitioning so far. And we can have one large set of partitions, like one global set of partitions.
Starting point is 00:39:57 Or we have these private partitions that we can basically have per thread and then combine them. Now the question is, do we do a single step partitioning or do we do multiple steps of partitions? A single pass partition means we're partitioning once, either in private or in global partitions, or shared partitions, or alternatively, we do multiple steps of partitions. So, in a single partition it's good, because it's just a single step, right?
Starting point is 00:40:33 So, it's less work, but we need to get the required fan out. So, we basically need to figure out how many partitions do we want after all, in order then to have enough parallelism in our next step. So basically, the number of partitions will then lead to the number of individual tasks that we can do for building the hash tables. I mean, we can also parallelize the separate hash tables, but that would, again, well, then we have a communication in there again.
Starting point is 00:41:10 So anyway, we need to figure out the desired number of partitions. And the number of partitions that we want to have can be very large. So that will be costly. That means we have, because we want to divide the table into cache size partitions,
Starting point is 00:41:29 that means we get a large number of partitions. That's basically quite costly in terms of, or it's basically we get like a lot, like we're partitioning into a large range of memory. So in order to avoid this, we can also do multi-pass partitioning. And that's usually called a radix join. So we're basically doing in multiple steps into like splitting into smaller set of partitions. Meaning initially, I mean, very simple idea would be first of partitions. Meaning, initially,
Starting point is 00:42:06 a very simple idea would be first two partitions, and then these two partitions in four partitions, and then from these four partitions in eight partitions, and maybe you can already get, or see where this is getting at. Typically we're looking at bits here, and this is
Starting point is 00:42:22 why it's also called Radix Join, because we're just looking at the first couple of bits, either the least significant, we are typically least significant bits, then based on these, we create separate partitions. And by this, we can get a very nice, or we can nicely control the fan out. We can basically say, okay, I want to have 16 partitions in the beginning,
Starting point is 00:42:48 so I'm going to look at four bits at a time and split my partition or my input table based on this radix or on these four bits. And then looking at the result, I can basically go further and further split up the partitions into the number of partitions that I finally want and basically get two very small, nicely cache-fitting kind of partitions.
Starting point is 00:43:23 So this is basically what we do in a radix join is we split up the partitions in multiple passes and the number of partitions per pass is limited by the number of translation look-aside buffer entries because that basically means that our address translation will be very fast or will be very efficient. And that's basically, I mean, we will have multiple passes by doing so. So this means it's 64 to 512 partitions at a time. This means we have larger partitions initially, but we get much faster partitioning
Starting point is 00:44:07 just because we don't need to do like additional address translation. Address translation is basically cheaper because the translation look-aside buffer helps us in the transformation. So first we're gonna use a number of bits in order to get the first set of partitions, and then we do the second set of partitions.
Starting point is 00:44:30 And by doing this in multiple steps, we also have some influence in basically how do we further partition, right? So we can do this using a histogram, or we can actually, during the partitioning, we can see if one partition is already very small, we don't need to further partition. We might have some skew in the data. Then we stick with this rather than doing more fine-grained partitioning. And the Translating Look-Aside Buffer actually
Starting point is 00:44:57 compensates the multiple reads and write passes. Because this basically means we don't need two lookups into memory for the address translation, then these multiple passes are actually faster. OK, so with this, we're going to go from the hash join to the sort merge join. And before we do so, we're going to do a quick break here. Questions so far? Yes?
Starting point is 00:45:31 Yes. then the hash is always going to be the same word. So the corresponding position will be very large. So it won't fit into a cache. And also, during the probing, if we split it up, we have to remember that we have to check all of them. What do you do in such cases? So the question is if one value is basically taking up all 50% of the relation, right? So like we have like a foreign key relationship, I don't know,
Starting point is 00:46:11 or something like Twitter or something in followers. And we have one person who everybody follows something like this, like a very long power law. So everything goes to one tuple. This is actually, I mean, this results into, in the hash table, it's one entry, right? So it's just like all of the addresses that we need to do or that we then need to map.
Starting point is 00:46:35 So the, I mean, from the, like say the Radix partitioning doesn't help us much here, because from where we're probing, this is basically just one entry. What is then is all of the addresses that are somehow queued in the hash table. It's more a problem of how the hash table is designed then. It's basically, do we have the entry in the hash table? The position in the hash table is just one position that we have. Now, if we have a linked list or something behind the hash table where all of the entries that this would match are, this is basically then that we have to go through.
Starting point is 00:47:18 If these are millions of records that are just linked to the single hash table bucket, then we need some parallelization strategy for this. I mean, the result of this join is basically the Cartesian product of this whatever goes, like whatever matches, and all of the entries in the single bucket. Does it make sense? It's basically like the hash join does what the hash join does. The hash join is fine, essentially. The problem is what's behind this.
Starting point is 00:47:55 And this is also basically where what I said here, where was it? Where this part basically comes out. So the matches, that's no problem. The output of the combined tuple, the outputs, that will be the problem because all of a sudden I have these many tuples that I actually have to materialize again. And these somehow need to be stored in the hash table. So I will have the problem in building the hash table, essentially.
Starting point is 00:48:28 So, it's not so much in the materialization. In the Radix partitioning, the idea is that, like, hopefully we'll just split it up by more and more bits. And we hope that we get like more partitions if we run out of bits because basically we're in the same key then it will end up in a single partition with like just the partitioning won't give us like any additional improvement here this makes sense like we are joining with another table, so we have a three-way join. Three-way join. Three-way join.
Starting point is 00:49:07 And then the result is very large, and that's the problem. So then it's not just that, yeah, we have to give it some natural. So the problem is basically the same. So like. But then this should be further parallelized. Yes, we could further parallelize it.
Starting point is 00:49:33 I mean, that basically would lead, or what we need to do is basically say, OK, if our radix partition doesn't split it up anymore, so then, well, let's do another type of like then. I mean, for the hash table, it's not a big issue, as I said. It's basically one entry. We just have all of the set of entries that we somehow need to link to the value in the hash table. So in order to materialize this later,
Starting point is 00:50:04 we will have to go through this. I mean, we can do this completely in parallel because each of the tuples is like one part of the result tuple. We could, I mean, what in practice can be done is we can materialize this late. So rather than like building the Cartesian product, we just keep the information that this should be matched later. And then if we find we're just keeping the separate sets of tuples that match, maybe let me draw this. Okay, so the question is basically we have in the end the problem comes to
Starting point is 00:51:01 we have two sets that have the same key, right? So it have two sets that have the same key. So it's two sets of tuples that have the same key. What do we do with those? This will end up in the same partition. These will have the same hash key. Now, if we materialize the result, this will get even increase.
Starting point is 00:51:28 I mean, if it's just a single key on the one side, so this is S, this is R, we're building the hash table with this one. So worst case, all tuples have the same key. So this means we have a hash table that has exactly one entry. And behind this, we have all the tuples in here. This same thing, so we're probing all of them hit the same thing. So we're not getting any parallelism here
Starting point is 00:51:58 from the radix join or from any of the parallel hash joints. So what we can do in order to parallelize this in using a hash, I mean the hash joint won't help us. Like basically this is what happens. And what the hash joint would do then is basically, well, just basically create all of the combinations. So then we get back to a nested loop, where we basically say, okay, for this tuple, let's compare all of those and output all of
Starting point is 00:52:34 those because they're all matching. This is actually what's happening. So if we're just really unlucky and everything is colliding here, then the Radix Join will help us because the Radix Join will increasingly add more bits to the key until we get the partitioning. If they have the same key, well, then we have to do the Cartesian product. We basically combine this with all of these, combine this with all of these combine this with all of these and so on in order to improve this um or i mean we can arbitrarily partition this as well we just have to make sure that we have all combinations later on again for creating the
Starting point is 00:53:21 output but the output will be also the size of this times this. So that's basically the problem with the Cartesian product. And in order to not have this problem in intermediate steps, and maybe if we have further steps, like in the pipeline, so if we have further joins, for example, what we can do is we can be lazy here. Rather than materializing this result, we just store these two sets and we know these match.
Starting point is 00:53:49 Because if we know, then all of the tuple combinations will also fit. So now if we say, for example, we want to further join with T here, then rather than doing this combination, we can first do this and then join with these. And maybe we'll get some filtering out of this joint, for example. But most likely, a database system won't do this. But it's possible. It would be a possibility. Make sense? So these are the cases, like when it's very skewed, then usually you're very screwed. So basically, then all of a sudden, like any kind of parallelism won't work anymore.
Starting point is 00:54:38 So everything goes to a single partition. Well, then your partitioning won't help you. Then you need to do something else. Okay, further questions? No? Well then, we'll still do a quick break here. So remember, sort merge join, very simple. We have two phases. We have the sort and then we have the merge. In the sort phase, we're sorting by the join key both of the tables.
Starting point is 00:55:10 And in the merge phase, we're scanning the two sorted relations and just compare the tuples. And yeah, I mean, depending how we scan, I mean, we basically need to only scan like one of the relations once. I mean, in general, we also only need to scan both tables once if we're just traversing tuple by tuple. So this looks something like this. So we have the two relations that are unsorted initially. We sort them in a nice fashion so that the either the smallest keys
Starting point is 00:55:48 first or the largest keys first and then we can just compare and I mean we just need one way or like one type of sort order and then we can just merge the two. As I said this is definitely useful, especially if we already have sorted data, or if we want sorted output, then this might also be useful. Sorted output, like sorting, like if the output of a join, there it also depends how large is our output, right? So if the output of a hash join is small, then sorting it separately might make more sense again than initially sorting and keeping the sort order. But anyway, that's basically, let's say, further requirements of sorting or characteristics of sorting within the query plan
Starting point is 00:56:50 will influence if we want to choose a sort merge join or not. Okay, so in general, the sorting is the more expensive part, right? The merging is a linear operation. We just go through the data once. The sorting is n log n. I mean, that's just basic. Unless, again, we know more about the tuples. So if we know exactly the tuple distribution,
Starting point is 00:57:22 we know how many keys we have, etc. We can also use a linear sort algorithm, like an insertion sort. Then it becomes a linear operation again. However, most likely, in most cases, this will be the most expensive part, so we want to speed this up, especially. We want to use as many CPU cores as possible, and we want to use as many CPU cores as possible, and we want to use SIMD instructions.
Starting point is 00:57:46 So we're going to go back to SIMD a bit here in order to implement this efficiently. And so the idea is that we have a very parallel sorting, and we're starting from smaller partitions and getting into smaller sorted runs, getting to larger and larger runs. And then for that, we're basically building sorting networks and we're using different levels of merging. Until at a certain point, I mean, initially we're trying to stay within caches, right? Doing this very fast uh doing this in
Starting point is 00:58:26 cindy instructions then at a certain point our runs get too large we have to do this basically in memory but we want at that point we're basically only merging anymore we're not really sorting anymore but we're just merging merging merging until we have like a final very large sorted partition or sorted table. But now I already gave away part of it, the idea that, I mean, we don't necessarily need to have like a one global sorted order. We could also work with partitions in the intermediate steps, right? So then we get like different kind of variation between one very large sorted table and a very large merge and doing like sub sorted lists that we somehow compare against so we'll we'll go into this in a bit more so let's let's basically take this apart so we we're starting
Starting point is 00:59:23 with a sorting network that we can do in SIMD. Then we can do some Bitonic merge. So very efficient merging of smaller lists also in SIMD instructions or in also non-blocking instructions. And then we're gonna get to this multi-way merge. So we have many like sub-partitions that we're
Starting point is 00:59:46 merging in parallel. We're staying in cache initially. So this is really important that in the initial phases, everything is in cache because that's going to be efficient. And then in the larger merge, this needs to go out of cache just because the data sets will be too large. OK, so basically, I mean, you can think about it like a,
Starting point is 01:00:19 what do you call it, like a sorting that basically, yeah, always sort merging, right? Merge. What's the merge sort? That's what it's called, right? So in essence, what we're trying to implement is a sort of merge sort operation that later on goes into a sort merge join.
Starting point is 01:00:43 So we're basically doing sort merge merge join, essentially. So we have into sort merge join. So we're basically doing sort merge merge join essentially. So we have to sort merge sort, and then doing the merge join. And we do this because the initial sorting, if we do this just on small runs that we can, if we stay in cache, it becomes very efficient. It comes very fast and we can be very parallel here.
Starting point is 01:01:09 And essentially, we're building fixed paths for lists with a fixed number of elements. So if we have a large number of elements, we can basically break it down into, say, sets of eight. I'll show you this for sets of eight, or now four, basically, on the slides. And then execute this very efficiently on the CPU, because it's just like the same kind of instructions without any branches or anything. So, I mean, if you think about this kind of sorting, you always think about branching, right? So you basically think, if this is smaller than the other, then we're swapping.
Starting point is 01:01:47 If not, then we're going to this path. And these were these examples where we basically showed, well, the branching is actually like branch prediction is costly. And this is where basically time goes in sorting. We don't have to do this. We can do this branchless if we always use these kind of fixed, for small number of elements, if we use a fixed sorting network. And this can be done, for example, with the min-max implementation. So this is still, if you think about min and max, you still think about branching, but I'll show you how we can do this without
Starting point is 01:02:26 branching. So if this is larger than A or B or something like this, so this is basically one example how we can do a merging through or how we can build up a sort network for four inputs with, let me count, 12, I think 10 operations. Four, eight, ten operations, right? If I don't... Anyway, so this is basically the number of steps that we need that can be executed in order to sort this input ABC. So we have ABC as an input and we have WXYZ as an output. And if we perform these operations, then W Y, Z will be the sorted output of any input A, B, C, D.
Starting point is 01:03:29 Which, kind of interesting, right? And I mean, in order to do this branch-free, we can use an operation that's called difference or zero. And difference or zero basically means either we output the difference, or if the number, like the difference of the larger number versus the smaller number, or we output 0 if it's not 0. So I mean, difference or 0, x and y, for example.
Starting point is 01:04:06 So if x is 4 and y is 7, for example, then the output would be 0. So I mean, basically, difference or 0. Maybe let's have to do this here, because I'm going gonna forget half of it So then it's 4 minus 7 times 4 greater or equal 7. So this is minus 3, right? Times 0 is 0. So that's basically how this works.
Starting point is 01:05:09 And this gives us, so if we compute the maximum of x and y this is, we say, y plus different or 0 of x and y so this is basically 0 plus y in our example 4 and 0 so this is 7 plus 0 so it's 7 is our maximum and if we do the minimum Minimum is x minus x. So this is, in our case, 4 minus 0 is 4. So it's fairly easy, but this is basically, and you can see this is branch-free, right?
Starting point is 01:06:27 So we don't have an if branch here or anything. We're just directly computing the minimum or maximum. The thing is we always compute it, right? So it's basically also in the sorting. Even if this is sorted, we're going to go through all of the steps all of the time. We're going to do all of this min-max operations in order to sort.
Starting point is 01:06:49 But we don't need the branches. And this essentially looks then something like this. So in terms of a comparison or a set of comparisons, we basically, if we have the input 9, 5, 3, 6, then we're going to compare 5 and 9. We're not comparing, right? We're min-maxing, basically, the two and exchanging them if they're swapped in the wrong way.
Starting point is 01:07:21 And we do the same thing with 3 and 6. Then we're doing like the first let's say that first and second third and fourth then we do the first and third and the second and and fourth and then finally do the last swap basically depending on what's larger or less and with this we get a sorted output This we can also build for larger amounts or larger number sets, so for eight elements or 16 elements. Eventually, this will get complicated, right?
Starting point is 01:07:54 Because we just get a very large sorting network. But this is how we can generate these initial small runs. Branch-free in cache, very efficient, right? So, I mean, in Assorted, there's no assumption about the input data or anything. We just always compute these individual steps. So this can be implemented in SIMD quite quickly. And in SIMD, what we do, we basically do this across the registers and not within. That's also how SIMD works, right? So we're comparing across different registers.
Starting point is 01:08:39 We're not comparing within the register. Because SIMD works with doing stuff in parallel on multiple items at a time or on vectors of items. So we cannot basically swap. We cannot sort. This doesn't work so well. We cannot sort within here. But we somehow have to do this across these registers. So what we do is we basically do this min-max stuff across the registers, right? So we do this with four elements in parallel,
Starting point is 01:09:14 and then we need to shuffle across, right? So this is basically, we do the sorting across the registers. And then, so we're sorting within the registers. And then we need to basically shuffle them in order to get them, or we transpose them in order to get this into a sorted operation. So we're using the maximum operations, basically the same thing that we did here across the registers. And then we're shuffling, we're basically shifting and then transposing in order to have the complete operation.
Starting point is 01:09:54 So in order to, for these 16 numbers, we'll need 10 min max, eight shuffles and eight load in store in a SIMD-Dified fashion for doing the sorting. This would then be an initial run that we can work with. And then in the first phase of the merging, than doing already like a merging separately, we can still do this in this hardwired merging using a bitonic merge network. And this is basically the same idea, other than or with the difference that we basically have, we have two sorted runs.
Starting point is 01:10:42 And in this case, our assumption is that they're sorted in different direction but of course this also works if they're sorted like in the same direction right here then the network would just look a bit different so this is sorted like a1 is the smallest and a4 is the largest and here b1 is the smallest and b4 is the largest. And here B1 is the smallest and B4 is the largest. Again, doesn't really matter if it's the other way around. Then just the setup doesn't look as nice. So then we do the same kind of comparisons. Again, we can use SIMD instructions
Starting point is 01:11:18 and we can use like a difference or zero operations in order to do this branch-free, in order then to get with such an output or such a network, get a sorted output of twice the size of the initial. So we're basically creating additional larger runs, everything we're trying just to still stay in cache, doing this efficiently branch-free on the hardware within the CPU. And from that, then basically we'll have sorted runs and these then need to be basically further merged, right? So this is basically, then we have these separate sorted runs these will be again parallelized where we have a parallel merge
Starting point is 01:12:06 where we always try to keep basically in cache and this is then this multi-way merge so we're trying to keep the runs or these partitions in cache size before we output them such that we're always nicely or always cache friendly in our outputs. And we need to do this in multiple stages in order to basically have these cache-sized inputs and outputs. If all the intermediates basically fit into cache, then this will be fairly fast. This is basically then all of a sudden, we have basically this very, then we have sorted in the end.
Starting point is 01:12:53 This would be basically one relation that is finally sorted. So this basically are then cache-sized partitions, or can also be more full-sized partitions or can also be morsel-sized partitions that then will later on work for the final merge sort, for the final merge stage in between these two tables. And there's again two variants of this, so we can do like a multi-way sort merge.
Starting point is 01:13:23 So have both inputs sorted globally and then do the like local merge joins after the sorting or we can have what is called a massively parallel sort merge where we only basically sort one of the tables globally and have the other table only locally sorted. And then we basically need to partition or we need to reshuffle such that, or communicate the table such that all of the tables, basically all of the partitions that are not globally sorted see the fully sorted global table.
Starting point is 01:14:07 So one table is basically globally sorted. One table is only locally sorted. And then we need to move the globally sorted table. This actually only makes sense if one of the tables is much smaller than the other. Then it makes sense to basically replicate this or touch this somehow. Okay, so the multi-way merge zone is basically what we saw so far. We have our initial sorting
Starting point is 01:14:36 and sorting networks and then larger sorts and then we do this multi-way merge into separate partitions, which then we can do like we have, or we're partitioning the final output into say morsel sized partitions. And then if these partitions are partitioned, like they are sorted, but they're also partitioned in a way that on both sides, the partitions have the same kind of keys in them. Then we can do local joins on the sorted partitions again, right? So these would contain different keys, but the same kind of key ranges. So it's basically the same key range.
Starting point is 01:15:19 So we know that we only need to compare these subpartitions and do the join there. And if we have this, then after the sort, we can basically, again, do the local merge join completely in parallel, or these separate merge joins completely in parallel. So we do this whole kind of sorting step and network stuff on this side,
Starting point is 01:15:42 and we do the same thing on this side. We end up with separate partitions we for the partitions we need to know what is the like what oops what range the the keys are in and then we basically need to find the same ranges with the partitions and we can do those locally the alternative is what I just said. Basically we do this on one side completely. We have the separate partitions and then we stick on the other side,
Starting point is 01:16:14 we're sticking with like the smaller local partitions and basically either look up these sorted partitions or we're repl up these sorted partitions, or we're replicating these sorted partitions. So we can basically say, OK, I'm just addressing. I mean, if this part is very small, it actually might make sense to just use a copy of each of these for each individual side of this here. This is also, like in a large scale,
Starting point is 01:16:43 we would call this a repartition join. Yeah, repartition join. So we basically, we're replicating this part or we're looking up on each of these, we're looking up the separate, the globally ordered one. And here as a final or as last part, let's look at the performance. So we have the multi-way merge sort where we have this bitonic or first sort networks,
Starting point is 01:17:22 then bitonic merge, and then multi-way merge with cache-sized partitions. Then we have this massively parallel sort where we have one relation globally sorted and one relation locally sorted, and then we look up. We have the no partition join, which is a hash join with a global hash table. And we have the radix join, which is a hash join with a global hash table. And we have the radix join, which
Starting point is 01:17:46 has multiple steps of partitioning until everything nicely fits into caches, essentially. And the partitions always try to use the translation look aside buffer optimally. And we see the performance comparison here in cycles per output tuple. So how many cycles do we need per tuple? That's our y-axis here.
Starting point is 01:18:11 And then we have the different joints. And we also have the total amount of time that it takes. And you can see, I mean, the number of cycles is lower for the radix partitioning joint. And also the total amount of time that it takes is lower. And you can see also it's broken down into partitioning, sorting, merging, and in build and probe. So here, for the build and probe, so here is basically in the partition
Starting point is 01:18:48 or the radix partitioning. This would be basically probably the partitioning. And then we have to build and probe part up here, I would assume. Or it's not 100% sure, actually. I would have to look it up in the paper what exactly this means. But here in the no partitioning join,
Starting point is 01:19:07 we can see that like basically the building takes the major time. Then we have the probing. For the M-ray join, this is actually faster than the no partitioning join. So doing the sorting efficiently on hardware actually is faster, but it's not faster than the Radix joins. So the Radix joins, especially for smaller tables,
Starting point is 01:19:29 are still much faster. When it comes to larger tables, so we're not as easily, we need many kind of partitions, many runs of partitions, then the Radix joins gets a bit slower again. And there's many papers that look at this in various settings, like setups, larger tables, smaller tables, different sizes of tables, more skew, etc.
Starting point is 01:19:56 And then always the results might shift a little. But let's say, in order to be efficient, you can see that doing a partitioning here makes sense, right? And using a hash join, typically the hash join or a Radix partition hash join will be faster than a sort merge join, but an efficient sort merge join can have a very similar performance in the end. Okay.
Starting point is 01:20:23 So with this, we're going gonna, well we're done with the the multi-core part. We will talk about locking next time. So basically how to make sure that we're like if we have one global hash table for example, how we're not overwriting each other all the time or in the partitioning. And yes, questions? Yes. Quick one. Are we sorting only indices into the table, or like the whole table?
Starting point is 01:20:59 I mean, again, it would be probably better to have the actual table entries sorted so that we have consecutive memory access, but in the beginning, it's over-matched. So the question is, do we sort the complete table, or do we just sort basically tuple identifiers or something like this? And that depends on the implementation.
Starting point is 01:21:22 I mean, that's a cheap answer. So in many cases, I mean, in the experimental settings here, you will only sort the indexes, basically. And then, well, for the materialization, you then later on have to basically go through the tables again. In a practical setup, it makes sense to, for sure, sort the whole tuple, but of course, already somehow projected to whatever you need.
Starting point is 01:21:54 So you don't necessarily want to have very large. And again, it depends on the data types. So if your tuple is super large, I don't know, you have a tuple that's an image or something like that, it makes sense to just work with the addresses rather than the whole tuple. If it's a small tuple, then it might make sense to use the whole tuple. So I would say it depends. Further questions? How does HiRISE do it?
Starting point is 01:22:32 Radix join. And do you use the full tuple? Or? No, never. Never. Yeah, so here it's basically always only on the indexes. And a Radix join. Very good.
Starting point is 01:22:46 OK. Good. Then thank you very much. See you next week.

There aren't comments yet for this episode. Click on any sentence in the transcript to leave a comment.