Hardware-Conscious Data Processing (ST 2023) - tele-TASK - Multicore Parallelism (2)
Episode Date: June 7, 2023...
 Transcript
 Discussion  (0)
    
                                         So we'll continue to talk about multi-core parallelism today.
                                         
                                         And I've decided this is all we're going to do,
                                         
                                         because otherwise it's going to be a rush.
                                         
                                         And a few announcements.
                                         
                                         So thanks for your feedback.
                                         
                                         It was quite positive feedback. I'm quite happy.
                                         
                                         We got a few things that we'll try to incorporate into the lecture.
                                         
                                         And some feedback for the tasks. And the tasks part, this Lawrence will discuss with you in the next task session.
                                         
    
                                         So there were some questions and we'll answer this.
                                         
                                         Then the foundation, so the foundation who finances and funds all the stuff here, this building, etc., is looking for student assistants, so IT admins.
                                         
                                         If you're curious what the foundation does and want to see, maybe peek inside through being a system admin, then you might want to look at their website.
                                         
                                         So I've put the link here.
                                         
                                         The slides are not updated in Moodle,
                                         
                                         but I'll try not to forget to do this later.
                                         
                                         And if you don't find it, just feel free to reach out to me.
                                         
                                         I'll be happy to forward this to you.
                                         
    
                                         And if you're not so much into foundation,
                                         
                                         there's also one for the Barbarini.
                                         
                                         So for the museum, they also look for student assistant
                                         
                                         for IT management there.
                                         
                                         And finally, last announcement that I could think of
                                         
                                         while preparing this.
                                         
                                         Today, we'll have a seminar again
                                         
                                         with our joint seminar with TU Darmstadt.
                                         
    
                                         We'll have a presentation by Danica Borobitsch from Oracle.
                                         
                                         And she's going to talk about
                                         
                                         autonomous database performance insights.
                                         
                                         So Danica, she's a specialist in hardware.
                                         
                                         And of course, Oracle things like engine parts.
                                         
                                         So that should be quite interesting actually.
                                         
                                         So that's why I'm also, I think, yeah, well, have a look.
                                         
                                         I think we also have an announcement in Moodle.
                                         
    
                                         So even if this is not updated in Moodle yet, the slides,
                                         
                                         you can still go to the announcement
                                         
                                         and see the link there.
                                         
                                         So this is gonna be at 4.15 today in Zoom.
                                         
                                         And she's gonna talk from Zurich.
                                         
                                         So I mean, Zoom is the best you get anyway,
                                         
                                         unless you're watching this remotely.
                                         
                                         Then, well, we're still in multi-core parallelism,
                                         
    
                                         so nothing has changed here.
                                         
                                         However, some stuff has changed here.
                                         
                                         So we had no class yesterday.
                                         
                                         It was supposed to be a Q&A anyway,
                                         
                                         so this is not that bad.
                                         
                                         But since I didn't finish multi-core, or we didn't
                                         
                                         get as far as I thought, we're just going to do multi-core today. And that means everything else,
                                         
                                         not everything, but we'll have a bit of a shift again. So we're going to talk about locking next
                                         
    
                                         week and then NUMA. The NUMA session, as announced, will be in this small lecture hall,
                                         
                                         which should be just above here somewhere.
                                         
                                         I'm always confused about the building, but I think it should really just be above here,
                                         
                                         or maybe somewhere over there.
                                         
                                         That's basically shifted. This also means persistent memory is shifted,
                                         
                                         and storage and networking,
                                         
                                         and then we would have had a Q&A session,
                                         
                                         which basically got moved out.
                                         
    
                                         But if you have questions, of course,
                                         
                                         always feel free to also ask either during the task or after the lecture or during the lecture or offline
                                         
                                         through Moodle, etc. So just so you are aware, basically, there's a bit of a shift here and there.
                                         
                                         And today we're basically going to continue with multi-core parallelism. So I've talked about the parallelism basics.
                                         
                                         So why do we need parallelism in the first place?
                                         
                                         And all of you should be able to say right now why.
                                         
                                         So why do we need parallelism today in our database?
                                         
                                         What's, for our case, is the most or single most
                                         
    
                                         important reason if we're looking at hardware?
                                         
                                         There's some discussion.
                                         
                                         We'll discuss with your neighbors.
                                         
                                         Feel free to discuss.
                                         
                                         Yes? We'll discuss with your neighbors. Feel free to discuss.
                                         
                                         Because the clock speed isn't rising anymore?
                                         
                                         Exactly. And what happens instead?
                                         
                                         With more cores.
                                         
    
                                         More cores, yes.
                                         
                                         In order to utilize our hardware, the current hardware, we need parallelism.
                                         
                                         So we don't have single-core systems anymore, we have multi-core systems.
                                         
                                         If we want to utilize multi-core systems,
                                         
                                         we need parallelism.
                                         
                                         Otherwise, no chance of ever utilizing a many-core system.
                                         
                                         I mean, two cores, maybe, right?
                                         
                                         But three, four cores is going to be hard already.
                                         
    
                                         40 cores, no chance.
                                         
                                         So you really need a parallel system in order to properly utilize this.
                                         
                                         So this is why we're looking at this.
                                         
                                         And this is also why you need to, I mean, you generally need this.
                                         
                                         If you want to have your algorithm, your system, whatever,
                                         
                                         run efficiently on a modern server and fully utilize
                                         
                                         this, you need parallelism. Period. Doesn't help. And this is why we talk about this also in this
                                         
                                         lecture. I mean, of course, you can also be efficient on a single core, and you want to be
                                         
    
                                         efficient on a single core. But unless you have many different tasks that you can do, you will have to do this.
                                         
                                         You will have to distribute your tasks and parallelize them.
                                         
                                         So with that, I'm going to skip the whole introduction
                                         
                                         to parallelism and we'll dive right into the parallelism
                                         
                                         in database management systems.
                                         
                                         So there's basically two ways
                                         
                                         in parallelizing database workloads,
                                         
                                         and this is interquery parallelism
                                         
    
                                         and intraquery parallelism.
                                         
                                         And because these are so close,
                                         
                                         what'd you say, the edit distance is so close,
                                         
                                         I'm always having a hard time pronouncing them properly.
                                         
                                         But inter means in between two different queries.
                                         
                                         Intra means within a single query.
                                         
                                         And interquery parallelism means we're doing something concurrently.
                                         
                                         We're using multiple queries at the same time and we're running something concurrently. We're using multiple queries at the same time
                                         
    
                                         and we're running them in parallel.
                                         
                                         So this is cool if we have an OLTP system
                                         
                                         and an OLTP workload where, say,
                                         
                                         our student information registration system,
                                         
                                         first day in the semester,
                                         
                                         everybody wants to
                                         
                                         basically sign up for courses will have potentially hundreds of queries in
                                         
                                         parallel so this means we can actually utilize many cores in parallel because
                                         
    
                                         there's so much going on so much individual queries which we can schedule
                                         
                                         separately typically these kind of workloads, OLTP workloads, are characterized by
                                         
                                         simple queries. So, just lookups, simple lookups, simple updates, etc.
                                         
                                         If we have analytics, so we have a few people actually trying to figure out how can we improve HBI and see, I mean, on the one hand,
                                         
                                         the marketing strategy, on the other hand, make sure that everybody just gets straight A's. So
                                         
                                         how do we have to improve our teaching? This will be long-running queries, and we're not going to
                                         
                                         run thousands of those in parallel. We're going to have a few of those. And then maybe
                                         
                                         based on the insights that we get, we're going to further drill down.
                                         
    
                                         And so for this kind of workload, this inter-query parallelism doesn't really help us because
                                         
                                         we have few queries and there's not much to be run in parallel. And so for this, we need the so-called intra-query parallelism.
                                         
                                         And I mean, this is sort of in OLAP world,
                                         
                                         so online analytical processing,
                                         
                                         where we have these long-running queries,
                                         
                                         this is sort of a must.
                                         
                                         I mean, of course, we can also run it differently,
                                         
                                         but I mean, we really want to use this.
                                         
    
                                         If we want to use the system efficiently,
                                         
                                         our hardware efficiently, then we
                                         
                                         need to parallelize each individual query.
                                         
                                         Of course, we're still in the space where we say, well,
                                         
                                         maybe there is multiple users to our system.
                                         
                                         So we're not going to be able to just utilize or just use
                                         
                                         the system for a single query.
                                         
                                         But we should be able to run multiple queries in parallel.
                                         
    
                                         And maybe one question regarding this,
                                         
                                         why does this make sense anyway?
                                         
                                         So why would we want, even if we could somehow have enough tasks, like split up our query
                                         
                                         in so small pieces that we can utilize, use all of the threads, why would it still make
                                         
                                         sense to have different kinds of queries to be run in parallel, maybe somehow shifted to each other,
                                         
                                         looking at the individual cores.
                                         
                                         So do we have, like, what does a core do?
                                         
                                         I mean, does it just do one single operation at a time?
                                         
    
                                         Does it, I mean, remember what we talked about,
                                         
                                         like how instructions are actually executed in there?
                                         
                                         What happens if we're just doing the same stuff over and over
                                         
                                         in the core?
                                         
                                         Well, it gets pipelined.
                                         
                                         It gets pipelined, yeah. We can use caching.
                                         
                                         Sorry, say again?
                                         
                                         We can use caching.
                                         
    
                                         We can use caching, yes.
                                         
                                         I mean, this is good.
                                         
                                         This speaks for just using a single query, right?
                                         
                                         I mean, if or if the queries touch the same data,
                                         
                                         well, then already caching for multiple queries
                                         
                                         would also improve stuff.
                                         
                                         The other thing is functional units, right?
                                         
                                         So there's many different functional units.
                                         
    
                                         So if we have different kind of tasks that we can schedule on a single core,
                                         
                                         we can actually improve the utilization of the single core
                                         
                                         because we can utilize these different functional units. So talking about our SIMD
                                         
                                         units, talking about our floating point units, our ALU
                                         
                                         load and store units, etc.
                                         
                                         So if there's more different stuff to do for a single core,
                                         
                                         we're actually going to be able to get a better utilization.
                                         
                                         Of course, there's a trade-off.
                                         
    
                                         If all of a sudden, and in query processing,
                                         
                                         it's all about data, right?
                                         
                                         So if we're fighting for caches in the core,
                                         
                                         no, not so great.
                                         
                                         But if we can somehow use the different units in parallel
                                         
                                         through different queries,
                                         
                                         then actually we're going to get a better utilization.
                                         
                                         And anyway, if we have long-running queries, then actually we're going to get a better utilization. And anyway, if we have long-running queries, we still want to be able to kind of make progress
                                         
    
                                         on multiple queries at a time.
                                         
                                         Otherwise users will be unhappy if the system is blocked by one long-running query forever.
                                         
                                         Okay, so how do we parallelize?
                                         
                                         And there's different ways in parallelization on a single server.
                                         
                                         And I mean, in general, it's processes, right?
                                         
                                         So if we have, that's kind of the very basic constructs,
                                         
                                         that's basically for each individual program that we're running,
                                         
                                         we will have a separate process,
                                         
    
                                         and there we have a separate memory space
                                         
                                         or address space, as we know, right?
                                         
                                         So we have this virtual address space.
                                         
                                         Each process will have this, their own.
                                         
                                         And they have their own memory, and they're isolated.
                                         
                                         So basically this is a good thing
                                         
                                         because they cannot interfere with each other.
                                         
                                         At the same time, it's a heavyweight thing.
                                         
    
                                         So process actually, it takes some time to be scheduled.
                                         
                                         There's a lot of context switching.
                                         
                                         We cannot really use the TransationLookAside buffer if there's different
                                         
                                         like different address spaces, this will have to be reloaded, etc.
                                         
                                         So, switching between the processes is actually costly.
                                         
                                         And because it is costly, there's other things, and you already know this, right?
                                         
                                         There's threads.
                                         
                                         So, threads are basically something that run inside a process. So multiple parallel low-key of control inside a running program.
                                         
    
                                         So they have the same virtual address space,
                                         
                                         and they are kind of an abstraction of a virtual processor.
                                         
                                         It's like a kernel thread is still scheduled
                                         
                                         by the kernel and also a process is scheduled by the kernel, right? So if you
                                         
                                         have multiple processes and you will always have multiple processes
                                         
                                         typically running in your system then the kernel will basically switch those
                                         
                                         assign them to you have some influence to, but still the kernel every now and then will use its own processes to run kernel stuff, OS stuff.
                                         
                                         However, for the threads within a process, there's a bit more control that you can have.
                                         
    
                                         And we don't need all this switching memory spaces, etc.
                                         
                                         We can also have user-level threads. They act just like threads. kind of all this switching memory spaces, et cetera.
                                         
                                         We can also have user level threats. They act just like threats.
                                         
                                         They will need some kind of threat infrastructure behind.
                                         
                                         Also, we need all this process infrastructure behind, right?
                                         
                                         In order to have something running.
                                         
                                         So we need our address space, et cetera.
                                         
                                         But then we can use our own threads.
                                         
    
                                         And we can schedule them.
                                         
                                         Like if we're implementing our own thread abstraction,
                                         
                                         we can basically really have a lot of control
                                         
                                         how they are scheduled, if they're
                                         
                                         spread-gilled cooperatively or preemptively, meaning threats are kicking
                                         
                                         each other out or helping each other to be run, these will be invisible to the kernel.
                                         
                                         So the kernel won't do much about them. So I mean, the kernel might still just move out the process
                                         
                                         unless we're basically blocking parts
                                         
    
                                         of the system for our threads and our processes.
                                         
                                         But here, we have much more fine-grained control.
                                         
                                         And then finally, even a lighter abstraction
                                         
                                         would be so-called fibers.
                                         
                                         And this is also typically implemented in user space
                                         
                                         and they also share the address space. But here this is like completely cooperatively
                                         
                                         scheduled. This means here basically our threads there will always only be one fiber running at a time.
                                         
                                         And the fiber run inside threads.
                                         
    
                                         And basically, if we're trying to schedule another fiber,
                                         
                                         or if we're not trying, if you're scheduling another fiber, then the currently running fiber will just yield
                                         
                                         and make space for the next fiber.
                                         
                                         And this also means it's because the fibers are running inside threads.
                                         
                                         So it's kind of a super lightweight abstraction.
                                         
                                         But this means there's no parallelism per se by just using fibers.
                                         
                                         So fibers are just like a unit of scheduling,
                                         
                                         and we will need multiple threads where we schedule the fibers inside.
                                         
    
                                         Otherwise, if we're just using fibers inside a single thread,
                                         
                                         we'll just have a single thread, and we don't have any parallelism.
                                         
                                         It's just like an additional abstraction to have this running, like having the kind of task
                                         
                                         switching, et cetera, more efficiently and cheaper.
                                         
                                         And so this is something that exists in Windows,
                                         
                                         but it also exists in other spaces, so Linux, Unix,
                                         
                                         for example, through the Boost library.
                                         
                                         So the Boost library offers these kind of fibers, for example.
                                         
    
                                         Okay, so given that we have these different kind of units of parallelization, so be it processes, be it threads or fibers,
                                         
                                         we can now start scheduling them in the system and we can now try to somehow split up our work in the system
                                         
                                         in order to make our work parallel.
                                         
                                         And a very simple way of doing this is, or let's say also the most classical way and the way that systems still
                                         
                                         do it because there's just been around before there were threats, is one OS process per DBMS
                                         
                                         worker. So this means every database management system worker, so every process that does some processing for a query,
                                         
                                         will have its own process.
                                         
                                         So that's classical architecture, and that exists because
                                         
    
                                         old systems just were not there, there was nothing else they could do.
                                         
                                         And of course here, as I said, like the scheduling, etc.
                                         
                                         The task switching is just very costly.
                                         
                                         And this is because of that.
                                         
                                         There's also like the more recent and typically used setup
                                         
                                         where you have one thread per worker.
                                         
                                         And then you have some,
                                         
                                         like you have a single multi-threaded process
                                         
    
                                         that all hosts all of the worker activity.
                                         
                                         And then you have some kind of dispatcher thread
                                         
                                         that listens in for new connection.
                                         
                                         And each connection, for example,
                                         
                                         could then be allocated to a new thread
                                         
                                         and this would work for examples very well in ways it would be simple for this inter-query
                                         
                                         parallelism where then for each query you basically open up a new thread and that will
                                         
                                         execute this separate query and then of course because we don't want like this task switching, etc.
                                         
    
                                         be handled by the OS necessarily, if we think we know better, then we can also use our own threads.
                                         
                                         So we can use the database system threads, these user space threading constructs, which replace the need for the OS threads. And there we have kind of cheaper task switching,
                                         
                                         but at the expense that we have to deal with this all ourselves.
                                         
                                         So we basically have to make sure
                                         
                                         that our task switching is good, that the placement of the threads
                                         
                                         is good, et cetera, because the OS won't see them anymore and cannot do anything anymore here.
                                         
                                         Okay.
                                         
                                         So, and of course, we want to have a multi-threaded model today.
                                         
    
                                         I already sort of elaborated to this
                                         
                                         because we have less overhead for context switches. We have the same
                                         
                                         address space and that will make all of this address look up. Remember how that works on the
                                         
                                         CPU and on the caches. All of that will basically be the same within these threads.
                                         
                                         So we don't need to change that.
                                         
                                         We don't need to change the caches
                                         
                                         or invalidate the caches, et cetera.
                                         
                                         I mean, of course, there's some techniques also
                                         
    
                                         to change this if we have multiple threads
                                         
                                         or multiple processes,
                                         
                                         but here this will just be lightweight. So we have the same address space.
                                         
                                         We have the same addresses. We can use the same translation look-aside buffer.
                                         
                                         Everything will work efficiently on the CPU here.
                                         
                                         And we don't have to change anything while switching between the different
                                         
                                         threads.
                                         
                                         The disadvantage is, however,
                                         
    
                                         if one of the threads crashes,
                                         
                                         this means the whole system is down.
                                         
                                         So this usually typically means that the process crashes.
                                         
                                         If a thread crashes, I mean,
                                         
                                         there's some ways to deal with it,
                                         
                                         but that often happens.
                                         
                                         And then the system is down.
                                         
                                         This is also a problem if you have a system that doesn't have its own server, right?
                                         
    
                                         Think about something like, what's it called?
                                         
                                         The small SQLite, for example. So SQLite doesn't have its own server.
                                         
                                         It's just in process, right?
                                         
                                         So you're using the same process as the application.
                                         
                                         If SQLite crashes, the application crashes.
                                         
                                         Or think about DuckDB, same problem, right?
                                         
                                         So if you're using DuckDB,
                                         
                                         if DuckDB crashes, it will kill your application.
                                         
    
                                         And I mean, of course, as a person who builds the system, you don't want that actually to
                                         
                                         happen, right?
                                         
                                         You don't want your users to be unhappy because their application crashes because of your
                                         
                                         database library, for example.
                                         
                                         So that's kind of a bit of a problem.
                                         
                                         And I mean, here I have a list of things that,
                                         
                                         well, a single, like that are per process
                                         
                                         and that are per thread.
                                         
    
                                         So you can see that while we have a per thread,
                                         
                                         we have a program counter, registers, stack, and the state.
                                         
                                         So which is expected, right?
                                         
                                         Because each thread will execute part of the program,
                                         
                                         so this is what we need.
                                         
                                         However, we don't have a separate address space.
                                         
                                         We don't have separate global variables,
                                         
                                         open file, child processes, etc.
                                         
    
                                         So this is all per process, right?
                                         
                                         And so all of this, if we're using like individual processes for our system,
                                         
                                         then all of this will always need to be exchanged for each context switch. So meaning if we have
                                         
                                         multiple processes, many queries running at the same time, then all of this will have to change
                                         
                                         every time when we're switching in between
                                         
                                         queries, for example, or in between different tasks.
                                         
                                         Okay.
                                         
                                         So, well, so with this, we basically know, okay, we want to have a multi-threaded model.
                                         
    
                                         And this is also what current systems all do.
                                         
                                         And then we need to do some scheduling.
                                         
                                         And this is kind of where there's a lot of space,
                                         
                                         how we can deal with this.
                                         
                                         So what kind of task do we give to the different kind
                                         
                                         of threads?
                                         
                                         And I mean, scheduling is not only,
                                         
                                         I mean, there's two different ways or levels of scheduling.
                                         
    
                                         One is scheduling subtasks of the query or work of the database management system to individual threads.
                                         
                                         And then, of course, the question is how do we schedule these individual threads on
                                         
                                         the on the cpu so that's i mean this mostly is done by the cpu somehow however we also have
                                         
                                         influence on that right so we can also somehow deal with or additionally say okay i want to have this thread running here for example I want to have this thread running
                                         
                                         here.
                                         
                                         For example, I want to have this thread running here.
                                         
                                         So this is kind of the second level of scheduling
                                         
                                         that we would have to think about.
                                         
    
                                         But first, we'll look in terms of scheduling,
                                         
                                         look into how do we split up the work in the database management
                                         
                                         system into chunks that we can give to the threads in the way
                                         
                                         and so there the the question then is okay which um which thread will execute which work and
                                         
                                         um well typically database management system will do this themselves i mean of course you can use
                                         
                                         the os for dealing with all of that so So you're just spinning up multiple threads,
                                         
                                         and then let the OS do its thing.
                                         
                                         And it won't do a very bad job, but it also
                                         
    
                                         won't do a perfect job, because the OS is not really used
                                         
                                         to pure database workloads.
                                         
                                         So the OS is optimized for heterogeneous workloads
                                         
                                         with many short-running etc. Database workloads
                                         
                                         typically are very long running tasks, at least OLAP tasks.
                                         
                                         And so database management systems will use something like a dispatcher thread to assign
                                         
                                         individual tasks to workers. So this is called a push model. It could also be the other way around,
                                         
                                         that we have multiple worker threads that just pull additional work from something like a work
                                         
    
                                         queue. But this kind of push model is used by DB2, SQL Server, MySQL, Oracle, HANA, etc. The other way, as I said, would be push-based. So, we're pushing
                                         
                                         the work towards the system, towards the... No, push-based is basically the dispatcher pushes
                                         
                                         or gives the task to the workers. Pull-based means we have a worker queue
                                         
                                         and the workers will just pull their parts.
                                         
                                         Okay, so with that, the question is how do, or yeah,
                                         
                                         so with a dispatcher, how does it work?
                                         
                                         So this is kind of what the web server would do,
                                         
                                         and it's similar, but a bit more complicated.
                                         
    
                                         So for a web server, well, you have network connection,
                                         
                                         you have individual incoming requests for certain websites,
                                         
                                         and then you have worker threads,
                                         
                                         and basically the dispatcher thread will basically say,
                                         
                                         well, I have this request incoming here, worker thread, please look up this page
                                         
                                         and give me the result back, or give the result back
                                         
                                         to the user.
                                         
                                         And most of this, typically, like all of these threads,
                                         
    
                                         et cetera, would be handled in user space.
                                         
                                         So we don't have to deal with the kernel threats.
                                         
                                         For a query, for a database system, then for every query plan, the database management system
                                         
                                         basically has to decide where, when, and how to execute this in an optimal manner. A lot of this, of course, is query optimization.
                                         
                                         So we have to basically think about given a SQL query,
                                         
                                         how do we translate this into operators?
                                         
                                         How do we split up the operators, et cetera?
                                         
                                         And I mean, or which order
                                         
    
                                         and what kind of operators will we use?
                                         
                                         And this is, I mean, all of the query optimization,
                                         
                                         as I said, is not really part of this.
                                         
                                         So this will be dealt with in database systems too.
                                         
                                         However, if we already know this is kind of a query plan that we actually want to execute,
                                         
                                         then we have to figure out how many tasks are we going to use?
                                         
                                         How do we basically split up the query plan into
                                         
                                         tasks which course do we want to place them on and there's basically again a trade-off so on the one
                                         
    
                                         hand we want to have them uh in such a way that the caches works work efficiently on the other
                                         
                                         hand we don't want them to be kind of fighting for certain cached items.
                                         
                                         So if they basically are iterating on the same cache lines, that would be bad, for example.
                                         
                                         Then we might have some contention there.
                                         
                                         And then, of course, also where should the results go?
                                         
                                         Or the individual, like intermediate results typically go.
                                         
                                         And of course, well, as is like efficient execution of individual queries, et cetera, and using the CPU,
                                         
                                         the scheduling is of course also a key to efficient execution
                                         
    
                                         and especially to parallelism.
                                         
                                         So in order to get efficient or
                                         
                                         in order to use the course efficiently
                                         
                                         we need to schedule the threads accordingly
                                         
                                         in order to get good parallelism.
                                         
                                         Okay, so we said there's two ways.
                                         
                                         Either we do push, which means we have a dispatcher,
                                         
                                         which basically knows what's going on and monitors the progress
                                         
    
                                         and then has like a global control of where we are in our query
                                         
                                         and in our tasks, like what the progress is.
                                         
                                         The other approach is pull. in our query and in our tasks, like what the progress is.
                                         
                                         The other approach is pull.
                                         
                                         So we just have a queue and then there's again,
                                         
                                         some kind of dispatcher.
                                         
                                         We need something that fills up the queue,
                                         
                                         but then the workers just pull the individual subtasks
                                         
    
                                         and process them and return basically to get the next task so the task will
                                         
                                         also tell them where to put the result etc and as soon as they're done they're going to pick up the
                                         
                                         next um the next task and here we don't have control where things will be executed right
                                         
                                         so we because the threats will just pull or the workers will just pull their information
                                         
                                         the the task and which core this will go to etc we don't actually know in detail well if we have
                                         
                                         a global control we can actually say well this core does this or this thread which is co-located
                                         
                                         with this other thread on the core should do this so in in the pool-based, this is not going to happen.
                                         
                                         So there's trade-off.
                                         
    
                                         And then we have to figure out how to allocate the workers.
                                         
                                         And there's, again, multiple approaches.
                                         
                                         Of course, at least we want one worker per core.
                                         
                                         Otherwise, we'll have empty cores
                                         
                                         that are not going to be utilized.
                                         
                                         We might reserve a core for like this dispatcher thread etc. So we might not want to even,
                                         
                                         I mean if you think like a rack scale setup, we might use something for like a primary server
                                         
                                         in order to make sure this is not the general scheduling that's not interfering
                                         
    
                                         or the workers are not interfering with the scheduling, etc.
                                         
                                         But in general, for everything where work should be done,
                                         
                                         every core should have a worker.
                                         
                                         And then we can hard assign this.
                                         
                                         We can say this thread should constantly be
                                         
                                         running here by setting an affinity. So we're pinning the thread to a certain core and then
                                         
                                         this thread will basically keep on running on this certain core. Of course, the OS might
                                         
                                         still schedule something in between, but this thread should be running on that core.
                                         
    
                                         An alternative to that is we have multiple workers per core,
                                         
                                         and this kind of gets back to what I asked you earlier.
                                         
                                         So if we have multiple,
                                         
                                         or we know we have multiple units on a single core,
                                         
                                         so scheduling multiple threads on a single core. So scheduling multiple threads on a single core
                                         
                                         actually makes sense,
                                         
                                         because if these threads have different kind of things to do,
                                         
                                         then they actually might be utilizing this core better
                                         
    
                                         than just a single thread.
                                         
                                         Like a single thread might just be waiting for data
                                         
                                         coming from the caches or from memory, right?
                                         
                                         But if we have another threat or another worker that does something else in between,
                                         
                                         then the core can be better utilized and it can basically use this hyper-threading.
                                         
                                         However, there's a trade-off, right?
                                         
                                         So we have task switching.
                                         
                                         So, I mean, even though this is cheaper than processes,
                                         
    
                                         we still need to do some switching.
                                         
                                         We still need to change the stack, et cetera.
                                         
                                         So that basically means we get a better utilization,
                                         
                                         but we're paying in scheduling.
                                         
                                         At a certain point, if we have too many threats,
                                         
                                         we're not going to get better performance anymore because we're just busy with task switching.
                                         
                                         However, the one thing that actually might be good is using multiple functional units.
                                         
                                         So this is kind of, if you think about this here as kind of different functional units,
                                         
    
                                         so say we have four different functional units here and we have two instruction streams,
                                         
                                         having two instruction streams will help us better utilize these functional units
                                         
                                         or then having just a single instruction stream,
                                         
                                         which might have some contention here or there,
                                         
                                         or just use certain units for a certain amount of time.
                                         
                                         So another stream, we might be able to somehow fill this in.
                                         
                                         So having, say, for example, two workers per core,
                                         
                                         four workers per core,
                                         
    
                                         might still give you better performance. And I'm saying might, because if you look at a lot of
                                         
                                         experiments, we often see that using a single worker per core gives you close to the best
                                         
                                         performance. Then having a few more, I mean, having twice as many
                                         
                                         might slightly improve, but it's not a lot.
                                         
                                         So we get a bit of better utilization,
                                         
                                         but the scheduling often is already a problem.
                                         
                                         Four or 16, something like that, there
                                         
                                         we're not going to get any benefit anymore, typically.
                                         
    
                                         But it really depends.
                                         
                                         And it also depends on hardware.
                                         
                                         Some hardware is better at scheduling and task switching.
                                         
                                         So this is something you have to basically try out. So some processors really cannot efficiently be utilized
                                         
                                         by just a single core,
                                         
                                         single worker per core.
                                         
                                         Other hardware is basically fully utilized. just a single worker per core.
                                         
                                         Other hardware is basically fully utilized. And this is a lot about instruction scheduling internally.
                                         
    
                                         Okay, so more details about inter-query parallelism.
                                         
                                         So how do we get inter-query parallelism?
                                         
                                         So we have to somehow split our work, our query into tasks.
                                         
                                         So let's look at a simple query.
                                         
                                         So we have, yeah, select from where, right?
                                         
                                         Two tables, there's a join, and we want to parallelize this.
                                         
                                         What we have to do is we basically have to split up
                                         
                                         our tables into subparts and execute on these subparts separately.
                                         
    
                                         And so then we can basically, I mean, for selections, for projections, for example, this is quite simple, right?
                                         
                                         We can just do this in parallel on subpartitions of the table.
                                         
                                         So this is just an example.
                                         
                                         In this case, we're splitting up our table into three partitions,
                                         
                                         and each of those partitions could be run on a separate core
                                         
                                         or executed on a separate core.
                                         
                                         Then we can do in parallel the selection and the projection.
                                         
                                         For example, if we do a join, so in this case,
                                         
    
                                         for example, a hash join, then we can in parallel
                                         
                                         build the hash table.
                                         
                                         We can in parallel probe the hash table
                                         
                                         and exchange the results in order
                                         
                                         to get the final result. So for this,
                                         
                                         by splitting this up into multiple parts,
                                         
                                         we can actually
                                         
                                         utilize the system and we will get performance improvement. It won't be perfect because there
                                         
    
                                         is some interchange, so we have some communication. This is shown here by these exchange operators,
                                         
                                         but we'll get performance. And now let's look at these exchange operators. So this is based on the volcano style parallelism.
                                         
                                         So volcano, as you remember, is this kind of traditional way
                                         
                                         of executing crease.
                                         
                                         So with this iterator model, right?
                                         
                                         And this already had an idea or had an approach to parallelism.
                                         
                                         And so basically, rather than, or the idea in the volcano splitting up our query plan.
                                         
                                         So this is basically, this here is our query plan, right?
                                         
    
                                         This part.
                                         
                                         And we can basically add additional operators here that gives us parallelism.
                                         
                                         So that basically encapsulate this internally.
                                         
                                         And it's called the so-called exchange operator.
                                         
                                         And I mean, for interquery parallelism,
                                         
                                         we have pipeline parallelism available.
                                         
                                         Of course, we have the query plan has multiple steps.
                                         
                                         So you remember, right?
                                         
    
                                         Pipeline parallelism would mean we're doing a scan,
                                         
                                         we're doing a selection, we're doing a join,
                                         
                                         we're doing a projection, for're doing a selection, we're doing a join, we're doing a projection, for example.
                                         
                                         This is separate operators.
                                         
                                         We might even be able to break them down into smaller operators.
                                         
                                         And we can basically run them in parallel instances or in instances next to each other.
                                         
                                         Separate threads, basically.
                                         
                                         One thread does the scan, one thread does the selection, etc.
                                         
    
                                         So we get some parallelism, which will improve our throughput. basically one thread does the scan, one thread does the selection, etc.
                                         
                                         So we get some parallelism, which will improve our throughput.
                                         
                                         It won't improve our latency,
                                         
                                         because we still need to go through the whole pipeline.
                                         
                                         And then we can basically partition,
                                         
                                         and we can do this again, if you look not at the query but at the operator level.
                                         
                                         We can have an intra-operator parallelism and we can have an inter-operator parallelism. So breaking our query into parts, we can say, well, a simple way of parallelizing could be now we have multiple operators.
                                         
                                         So let's run these operators one at a time,
                                         
    
                                         or in parallel, and this is good if we have something like a bushy tree.
                                         
                                         So, say for example, we have two tables, we're scanning the two tables, these are two operators,
                                         
                                         we can run them in parallel, no problem.
                                         
                                         However, we further want to break this down typically, and then we're getting into
                                         
                                         intra-operator parallelism. So similar to queries, right, rather than running two operators next to
                                         
                                         each other, we're splitting up the operator into multiple parallel instances. And this, of course,
                                         
                                         a parallel scan is very simple. We're splitting up the table into different partitions, but we
                                         
                                         also have parallel joins or parallel sorting.
                                         
    
                                         And this is exactly what we're going to look at later on.
                                         
                                         And in the Volcano Stein parallelism, we want to have all of it, including setup, teardown, runtime logic, encapsulated in a separate operator.
                                         
                                         And this is called the exchange operator and basically you put this exchange operator into
                                         
                                         your query plan or this data flow and this will basically say well the operator before this
                                         
                                         exchange operator can now be executed in parallel and the exchange operator will basically deal
                                         
                                         with all the distribution of the data such that we have a good output on the next level again.
                                         
                                         So if we're say, for example,
                                         
                                         we're doing a scan and a selection,
                                         
    
                                         then the exchange won't have to do much
                                         
                                         because we just directly connect them basically.
                                         
                                         But say, for example, if we want to do a join,
                                         
                                         then the exchange operator would
                                         
                                         make sure that all the join partners see each other again.
                                         
                                         So we're basically creating an intermediate result again that will be correct for everything.
                                         
                                         And so basically this is done on a plan level. So we're building our query plan, an ideal query plan for a query.
                                         
                                         And then the optimizer also figures out what's the degree of parallelism that I would want
                                         
    
                                         to have for each individual operator.
                                         
                                         So the operator, of course, needs to be parallelizable. And so this means we need to be able to run
                                         
                                         on separate chunks of data and split up the operators.
                                         
                                         And then basically, the optimizer
                                         
                                         will instantiate one query operator per plan or query
                                         
                                         operator plan for each threat.
                                         
                                         And these operators will then be connected
                                         
                                         with the exchange operator,
                                         
    
                                         which basically manages or encapsulates the parallelism
                                         
                                         and manages threats.
                                         
                                         So this is actually also some management.
                                         
                                         And this basically, like in a simple way,
                                         
                                         this could basically be in between each of the individual operators, right?
                                         
                                         So here, say after the scan, we have an exchange.
                                         
                                         After the join, here we don't have one, then it would be a multi-way join.
                                         
                                         We could also have one here, for example.
                                         
    
                                         And then after the join, we'll have an exchange which will collect the final output and will
                                         
                                         print the final result. And this is a simple model
                                         
                                         or an elegant model, which is actually used by many systems.
                                         
                                         So we're basically having this separate operator which
                                         
                                         handles the parallelism.
                                         
                                         And you can think of it similarly to scheduling this
                                         
                                         in the MapReduce system.
                                         
                                         So if we're breaking down our SQL query into MapReduce steps,
                                         
    
                                         then of course, the MapReduce system
                                         
                                         will basically give a certain degree of parallelism.
                                         
                                         Typically, you would do this yourself,
                                         
                                         but the optimizer could also do this
                                         
                                         if you have a more advanced system.
                                         
                                         Then something like Hive would give you some kind of parallelism automatically.
                                         
                                         And also do this exchange, right? Then the steps in between, this is basically the exchange operator. So we're producing this intermediate results. If we can pipeline, if we can merge different operators,
                                         
                                         we don't need to do exchange, but this can also be decided basically on the optimizer
                                         
    
                                         level.
                                         
                                         And well, the cool thing is, well, it's good, right?
                                         
                                         It's quite easy.
                                         
                                         The operators are oblivious to the parallelism, so the operators do the same thing that they did before.
                                         
                                         We just need to ensure that we can run them on the subsets
                                         
                                         and still get correct results.
                                         
                                         The bad thing is that, well, if we're doing this at compile time,
                                         
                                         when we're optimizing the query,
                                         
    
                                         we probably don't know the actual exact data statistics.
                                         
                                         And that means we'll most likely not get a good utilization or we will have some
                                         
                                         load imbalances. So some parts
                                         
                                         will run faster than others and then between these exchange operators, we basically have to wait
                                         
                                         and we're not going to make any progress.
                                         
                                         Also, if we're doing this at compile time,
                                         
                                         we cannot change this at runtime.
                                         
                                         We cannot really change the parallelism at runtime,
                                         
    
                                         at least not efficiently.
                                         
                                         So this might have some additional overhead.
                                         
                                         And, well, then there's, of course, potential overhead. And well, then there's of course potential overhead. The exchange operator
                                         
                                         might create some additional copies of the tuples by copying it around some additional
                                         
                                         intermediate results. And if we're creating too many threads, well, then we have too much
                                         
                                         context switching, which of course, I mean, we also know, right?
                                         
                                         So this is basically why this is done in classical systems similar to the iterator model, right? This has been done in classical systems, but modern systems will use something differently.
                                         
                                         So they will basically have some kind of query coordinator that does the parallel execution,
                                         
    
                                         at least has some influence at runtime. So it will basically, based on the number of parallel
                                         
                                         servers, it will determine the granularity and the load distribution. And there's parallelism within and between operators.
                                         
                                         So, say for example, we have a scan and a group by,
                                         
                                         then we typically have a producer-consumer pair.
                                         
                                         So, the scan would produce the data, the group by would then consume.
                                         
                                         So, we get this kind of pipeline because they're communicating to each other,
                                         
                                         where we can use pipeline parallelism.
                                         
                                         And we can do this on separate subparts.
                                         
    
                                         So our initial table would be split up into multiple parts.
                                         
                                         And then we can do the parallel.
                                         
                                         We don't really have to split it up.
                                         
                                         We basically just tell the different threads,
                                         
                                         read different parts of the data from memory, read these memory regions in parallel.
                                         
                                         And then we need, of course, some communication.
                                         
                                         If we're doing grouping, for example,
                                         
                                         this means some form of sorting or hashing
                                         
    
                                         in order to get the correct sets.
                                         
                                         And then the results will basically
                                         
                                         be sent to some kind of coordinator that gives back the result to the user.
                                         
                                         And an important part, of course, is the degree of parallelism,
                                         
                                         which is the number of threads in the end that we're executing this on,
                                         
                                         or the maximum number of threads that we can use.
                                         
                                         And this can be chosen manually.
                                         
                                         So it could be chosen like in the volcano style
                                         
    
                                         statically at compile time.
                                         
                                         It could be chosen statically at system setup.
                                         
                                         Or it can be done dynamically as the system is running.
                                         
                                         And it'd be adapted also based on how many queries do we
                                         
                                         have in the system, for example.
                                         
                                         Because of course, if we have only one query,
                                         
                                         we always want to utilize the whole system.
                                         
                                         If we have multiple queries, we might
                                         
    
                                         reduce the degree of parallelism of a single query
                                         
                                         in order to be able to execute multiple queries in parallel.
                                         
                                         So, now the question is how do we parallelize an operator? And well, typically, well, we just have to split up the work into some fraction. And for us, this means either it's
                                         
                                         an operator or it's a subset of the data.
                                         
                                         So we basically can split up the query based on the operators,
                                         
                                         which is this inter-query operator parallelism,
                                         
                                         and then we can split up on sub-parts of the data that we're processing,
                                         
                                         which would then be a part of the total work as well. And of course, we want typically to create more tasks
                                         
    
                                         than there are threads.
                                         
                                         Because otherwise, if we have less tasks,
                                         
                                         we'll basically have less.
                                         
                                         Well, we cannot fully utilize.
                                         
                                         Getting the exact right number of tasks
                                         
                                         will typically be hard.
                                         
                                         And also, if we split them up, I mean, of course, we know how many cores we have, so
                                         
                                         we could just split up in the number of cores.
                                         
    
                                         But then most likely we will have some imbalance in between the different tasks.
                                         
                                         So we typically we want to basically have more tasks in that in order to then have some
                                         
                                         at runtime, some flexibility when some tasks are finished faster or some subsets of the work are
                                         
                                         finished faster than others. And so this basically means if at runtime, if we see that a certain task is too much work,
                                         
                                         then we might even be able to split it up further.
                                         
                                         And this happens due to SKU.
                                         
                                         So if you've seen big data systems, their typical problem in MapReduce systems is that
                                         
                                         we have one reducer that gets
                                         
    
                                         all of the work.
                                         
                                         And MapReduce can't really handle that, so then basically one reducer will forever continue
                                         
                                         working on this.
                                         
                                         And in a modern database system, this will further be split up, right?
                                         
                                         So if we see all of the work goes into a single task, well, let's split up this task, make it more parallel
                                         
                                         and try to have a good load balancing.
                                         
                                         And here's an example of this.
                                         
                                         So this is out of a paper by Balkhisen et al,
                                         
    
                                         where they use a main memory hash join.
                                         
                                         And we'll talk about this kind of join a bit later.
                                         
                                         And here, for example, this is a Radix join.
                                         
                                         So here in a Radix join.
                                         
                                         So here in a Radix join, we're first doing the partitioning, or we're first partitioning
                                         
                                         the data, and then we're building hash tables and doing classical hash join, basically.
                                         
                                         And what we can see, if we're just splitting this up into separate steps or into multiple tasks based on the number of threads,
                                         
                                         we can have very different load.
                                         
    
                                         And what happens is that even if the tasks are smaller
                                         
                                         and we have a static scheduling,
                                         
                                         we still will run into the same problem,
                                         
                                         that we have very different kind of load
                                         
                                         based on the work that the subtasks get.
                                         
                                         And so here, for example, you can see that
                                         
                                         the histograms computation,
                                         
                                         so in order to see how we want to partition,
                                         
    
                                         can be very different.
                                         
                                         And then the actual partitioning can be very different.
                                         
                                         In the end, then we have hash tables,
                                         
                                         so that might actually work.
                                         
                                         However, if we do some additional task composition
                                         
                                         for large tasks and for, well, basically splitting up
                                         
                                         very large tasks and rescheduling them, then we can get a much nicer distribution of work
                                         
                                         and we can basically also get much nicer total runtime for the whole thing.
                                         
    
                                         So this means that while in a setup where we have some SKU,
                                         
                                         we will basically, a lot of the tasks or a lot of the threads,
                                         
                                         workers will wait for certain tasks that we are running.
                                         
                                         We can evenly distribute and fully utilize everything.
                                         
                                         So simple queuing will basically lead to poor load balancing and basically threats just
                                         
                                         waiting for different stuff.
                                         
                                         And with fine-grained, we can get additional performance. So now we somehow have to partition the data in order to get this.
                                         
                                         So based on this experiment, we see, okay, we need some fine-grained partitioning.
                                         
    
                                         Of course, this leads to additional scheduling,
                                         
                                         but having a fine-grained addition or a better load balance will still improve
                                         
                                         our total throughput.
                                         
                                         We need fine partitions and we somehow want to dynamically adjust them.
                                         
                                         Rather than doing a static partitioning, which we would think about in a push-based model, right? In a push-based model,
                                         
                                         we typically would say, okay, I'm statically partitioning up my work, and then I'm pushing
                                         
                                         this to the workers that the dispatcher will basically say, okay, this thread will do this,
                                         
                                         this, etc. It's better to be more dynamic about this. So there is a lot of imbalance, because there is skew in the data, because we don't
                                         
    
                                         really know how the data distributions are, it actually makes sense to be as dynamic as
                                         
                                         possible to this.
                                         
                                         How we can do this on an engine level is what's called morsel-driven parallelization, which is also implemented in HYPR.
                                         
                                         And the idea is here that we're basically splitting up our work into very fine, and of course it's not three,
                                         
                                         but it's, I don't know, a thousand tuples, something like this, so, but small blocks
                                         
                                         that we will be individually executing. And the dispatcher thread will basically say,
                                         
                                         okay, we're breaking up the task
                                         
                                         or the query into these small tasks,
                                         
    
                                         and then the individual threads will basically run on these individual tasks.
                                         
                                         These are constant sized work units, they're called morsels, so it's a small bite of work
                                         
                                         basically.
                                         
                                         The dispatcher will put this into basically a queue or a pipeline of operators
                                         
                                         that of worker threads,
                                         
                                         which will then work on these individual morsels.
                                         
                                         And with this, we get like a very dynamic balancing, right?
                                         
                                         So we're just basically continuing to work on these morsels,
                                         
    
                                         small bite, small size chunks.
                                         
                                         And these are large enough such that the scheduling
                                         
                                         doesn't hurt us that much, but they're also small enough
                                         
                                         that we have a good dynamic scheduling.
                                         
                                         And in this setup, so in Hyper, for example,
                                         
                                         the number of worker threads is equal to the number of hardware threads.
                                         
                                         As I told you, like twice as many might give you the number of worker threads is equal to the number of hardware threads.
                                         
                                         As I told you, like twice as many
                                         
    
                                         might give you slight performance improvement,
                                         
                                         might cost you a bit more.
                                         
                                         It really depends on the hardware, et cetera.
                                         
                                         And of course, then each individual operator
                                         
                                         needs to be designed for parallel execution.
                                         
                                         So let's look at this in a bit more detail.
                                         
                                         So if we have this kind of pipeline, where we have two, or this kind of query, where
                                         
                                         we have two joins, three tables, each of those have some kind of selection, of course, projection,
                                         
    
                                         et cetera, could also be in there.
                                         
                                         What happens is that we're breaking this query up to pipeline breakers.
                                         
                                         So the join would be a pipeline breaker if we're doing a hash join, because we have to
                                         
                                         build the hash table that we're then probing.
                                         
                                         And otherwise, everything else, meaning building these two hash tables for the join, is basically
                                         
                                         one thing that we need to do.
                                         
                                         So we do this separately.
                                         
                                         These are two pipelines.
                                         
    
                                         And once these hash tables are built, we can just use them and have a single pipeline. So we can do
                                         
                                         this in a single pipeline fashion. And the work for doing this, we can do in parallel. So building
                                         
                                         up this hash table, we can again do based on these small child byte
                                         
                                         morsels so we're basically having multiple scans in parallel individual pages that will be run
                                         
                                         like individual morsels basically that will be executed in parallel on multiple threads that
                                         
                                         will build up the hash table.
                                         
                                         And both, of course, we can do in parallel or we can do one after the other.
                                         
                                         And once these are done, then we can basically schedule the last pipeline.
                                         
    
                                         So this would be this pipeline, which then in a multi or in a parallel fashion,
                                         
                                         multi-threaded fashion, can probe both hash tables.
                                         
                                         And well, basically completely parallel
                                         
                                         for individual morsels get the final results.
                                         
                                         So this is kind of a modern way
                                         
                                         of executing multiple threads
                                         
                                         and also parallelizing the whole query.
                                         
                                         And we'll look at how to basically split up the joining, etc., into parallel
                                         
    
                                         parts in just a few minutes after a short break. But do we have questions so far? No? Well, let's do a three-minute break,
                                         
                                         then we'll look at parallel joints. Before we continue, quick question. Who
                                         
                                         would be interested in a project seminar in the winter term on hardware based stuff.
                                         
                                         More than 50%. That's great.
                                         
                                         Because we're thinking if we should have a follow up on this,
                                         
                                         we're going to do big data systems in the winter term
                                         
                                         again and some other stuff.
                                         
                                         But I thought maybe have some hardware related stuff would
                                         
    
                                         also be fun.
                                         
                                         So if we have enough people and it seems like I mean we are not everybody I know there's
                                         
                                         more people behind the camera so those we can basically also ask but looks like we could
                                         
                                         have a decently sized group.
                                         
                                         OK, cool.
                                         
                                         Then back on track, parallel joints.
                                         
                                         I'm spending too much time already anyway.
                                         
                                         So here's some statistics by Andy Pablo.
                                         
    
                                         And this is really one type of workload. This is not necessarily,
                                         
                                         let's say, enterprise real-world data. This is TPC-H on Cloudera Impala. This is basically
                                         
                                         measuring how much time is spent in which query operator. And if you're doing Cloudera Impala on TPCA
                                         
                                         or TPC-H on Cloudera Impala,
                                         
                                         then most of the time is actually spent
                                         
                                         in the join operator.
                                         
                                         So this will take a lot of time.
                                         
                                         There's a lot of research or also like survey papers
                                         
    
                                         that look at real-world workloads
                                         
                                         and there you'll see the join is not necessarily always the most expensive part, but it is a significant part.
                                         
                                         So this is something we want to somehow parallelize.
                                         
                                         And especially in OLAP workloads, looking at SAP HANA for example, you will have tens of joins per query, tens to hundreds of joins per query.
                                         
                                         So this needs to be efficient.
                                         
                                         Of course, also join order is important.
                                         
                                         Again, this is something that we're not looking at.
                                         
                                         This is something we're expecting to already get.
                                         
    
                                         But we want to do joins.
                                         
                                         We want to do them in parallel using multiple threads within the join operator. So we want to parallelize the join operator.
                                         
                                         So we want to parallelize the join operator.
                                         
                                         And I already showed you to some degree
                                         
                                         through this Morse parallelism how this can be done.
                                         
                                         But there's two main approaches.
                                         
                                         So one is parallel hash join, and the other one
                                         
                                         is a parallel sort merge join that will typically
                                         
    
                                         be implemented in current system. Of course, you can also do like
                                         
                                         this MapReduce style joins where then within the join you can do all kinds of other or within the
                                         
                                         partitions you can do all kinds of different join operators but from let's say an overall
                                         
                                         setup these are the two main functions that we want to do.
                                         
                                         And we're going to look at the hash join first and then at the sort merge join.
                                         
                                         In a hash join, you might remember, very simple, the very simple setup is basically we're building
                                         
                                         a hash table on the inner join relation, typically the smaller table table because we want to have a small hash table
                                         
                                         here that's the build phase so first we're building the hash table and then we're scanning
                                         
    
                                         or reading through the outer relation and probe the hash table that's the probe phase so this is
                                         
                                         like very simple we're looking at basically two for loops
                                         
                                         where first we're inserting everything.
                                         
                                         The second loop, we're probing everything.
                                         
                                         And we need to do this first.
                                         
                                         So we need to have the full hash table.
                                         
                                         At least we need to know that we have everything in the hash table that we need
                                         
                                         to find for the tuples that we're probing with.
                                         
    
                                         So this kind of blocks us for then doing the probing.
                                         
                                         And in parallel this basically means, or graphically this basically means first we're scanning,
                                         
                                         we're building the hash table, then we're probing, we're scanning. And in a parallel fashion, we can basically have a shared
                                         
                                         hash table, meaning we're having multiple sub-parts, we're partitioning our relations,
                                         
                                         and we're basically scanning these partitions in parallel.
                                         
                                         Again, we have to do the build first in order to have the full hash table there
                                         
                                         and then in the second step we can basically do the probing and we can do
                                         
                                         the probing also in parallel but only after we've filled this shared hash
                                         
    
                                         table. Otherwise we will not see all of the tuples
                                         
                                         and we'll lose some data.
                                         
                                         So that's why we basically need this two-step approach.
                                         
                                         Now the question is,
                                         
                                         how do we deal with this hash table?
                                         
                                         So we can have one global big hash table
                                         
                                         or we can break it down into multiple smaller hash tables.
                                         
                                         And then there's multiple different ways of how to deal with this.
                                         
    
                                         So in a non-partitioning-based join,
                                         
                                         we're basically using multiple threads to build up a global hash table.
                                         
                                         And then we can use multiple threads to basically probe this global hash table.
                                         
                                         You could even have some copies of the hash table or something like that. But this is basically meaning we have one instance of a hash table, one single global
                                         
                                         data structure that all of the information or everybody, basically all the threads have to touch.
                                         
                                         And because that might be some contention, there's also an idea of basically partitioning everything.
                                         
                                         And so in this case, we're basically building sub hash tables.
                                         
                                         We're partitioning the input relations in a way that we know okay
                                         
    
                                         i don't know say modulo for example we know all even numbers or even keys will go to our first
                                         
                                         hash table or all even keys will go to our second hash table in the build phase. And then we're doing the same partitioning in the probe phase.
                                         
                                         And then we have two sub-hash tables,
                                         
                                         which we can probe in parallel.
                                         
                                         And now you can already think.
                                         
                                         I mean, maybe remember in the art, right?
                                         
                                         So this bit, basically, so this bit basically
                                         
                                         or the radix try, so we can use basically the same kind of
                                         
    
                                         techniques for the partitioning, right, so we can basically use
                                         
                                         our key, use a binary representation
                                         
                                         of our key and use the binary representation
                                         
                                         to split up our data into multiple partitions
                                         
                                         and using these radixes basically so in a partition-based hash join
                                         
                                         we first have a partition phase which basically means we divide the tuples into these individual sub
                                         
                                         partitions based on the join key and then we can in parallel basically build the hash table and
                                         
                                         so then if we have the sub partitions basically everything once we have nice, small, and hopefully somehow
                                         
    
                                         not too skewed subpartitions, all of the threads
                                         
                                         can basically run in parallel, do their partitionings.
                                         
                                         And then we can, in parallel, also probe.
                                         
                                         And we always only have to look up the correct hash table that
                                         
                                         fits basically. we always only have to look up the correct hash table that fits.
                                         
                                         Basically, we use the same kind of partitioning for the parallel probe,
                                         
                                         so we automatically know which hash table to look up.
                                         
                                         And so this is kind of the three phases of a parallel partition-based hash join.
                                         
    
                                         And, well, the idea again would be here
                                         
                                         to make sure that everything somehow fits into the cache.
                                         
                                         Because if we have a global hash table, so if we have a large global hash table,
                                         
                                         the problem is that, I mean, the building,
                                         
                                         the probing here, every individual probe
                                         
                                         will most likely be a cache miss.
                                         
                                         Right, so this means every single probe here
                                         
                                         will actually be slow because our hash table is huge.
                                         
    
                                         If we have a good hash function,
                                         
                                         the accesses will be uniform across the hash table
                                         
                                         because like every bucket basically has the same probability,
                                         
                                         so every single lookup will be expensive.
                                         
                                         In order to make this better, we somehow make to partition,
                                         
                                         we want to partition the hash tables into small disjoint chunks that are of cache size.
                                         
                                         And then basically, when we're doing the lookups in the cache
                                         
                                         size, in the hash tables that fit into cache,
                                         
    
                                         then we don't have a cache miss, because the hash table can
                                         
                                         be cached.
                                         
                                         And again, in the partitioning phase, now we have to deal with this. So now we have to somehow partition the data in parallel.
                                         
                                         And this means we need to write separate partitions.
                                         
                                         And these partitions,
                                         
                                         either we do them shared again,
                                         
                                         meaning we're writing to a single partition,
                                         
                                         like all of the even numbers to one partition,
                                         
    
                                         or we're saying per each thread,
                                         
                                         let's have a separate partition,
                                         
                                         and then somehow synchronize,
                                         
                                         and then somehow merge.
                                         
                                         So if we have a shared partition,
                                         
                                         then we need some kind of lock.
                                         
                                         We'll talk about locking next time.
                                         
                                         But meaning we need to know, OK, we
                                         
    
                                         have to write to the end of the partition.
                                         
                                         We need to know where this is.
                                         
                                         For this, we need some kind of information.
                                         
                                         We need something that makes sure that our multiple threads
                                         
                                         don't overwrite each other's data.
                                         
                                         So this is some kind of latch.
                                         
                                         Or latch basically means lock in this case.
                                         
                                         Or we let every thread do their stuff individually
                                         
    
                                         and then combine afterwards.
                                         
                                         So this is kind of this private approach.
                                         
                                         So in the shared partition way, meaning we have our table,
                                         
                                         we're scanning through our table or each processor,
                                         
                                         each thread basically does their subset
                                         
                                         and does the partitioning and basically needs to know,
                                         
                                         okay, where does the data go, right?
                                         
                                         So does the partitioning or does basically the
                                         
    
                                         hashing into the right partition and because the other threads also write we basically need to
                                         
                                         synchronize on the partitions where i actually want to write so if say for example this thread
                                         
                                         wants to write into this bucket of this partition, then while writing, it needs to somehow lock this.
                                         
                                         It needs to make sure that no other threads writes
                                         
                                         into the same position here.
                                         
                                         But in the end, we already have the full final partitions
                                         
                                         correctly in memory.
                                         
                                         The other way would be every thread basically
                                         
    
                                         gets their own partition.
                                         
                                         So every thread has a separate memory set or memory region
                                         
                                         where it partitions to.
                                         
                                         And then we need a separate combination step
                                         
                                         where then we combine these small subpartitions
                                         
                                         into larger partitions that we then can do basically the hashing, et cetera.
                                         
                                         OK.
                                         
                                         So and if we have like this is basically one single pass.
                                         
    
                                         So we basically go through the whole data at a single time.
                                         
                                         But this would mean on the one hand hand we have many small partitions because we need
                                         
                                         kind of a high fan out. We need to make sure that we have enough subtasks for our threads that later
                                         
                                         will do the hashing and building the hash tables. And because we want to divide our full table,
                                         
                                         like, I don't know, TPC-H line item table,
                                         
                                         we want to divide it into cache-sized partitions.
                                         
                                         So this means there's going to be a huge number
                                         
                                         of individual partitions,
                                         
    
                                         and that might actually be quite expensive.
                                         
                                         So another approach is using a multi-pass partitioning and this is called Radix Join.
                                         
                                         So meaning we're rather than splitting everything up in a single pass, we're basically increasingly
                                         
                                         splitting up the data in such a way that we get the correct number of partitions.
                                         
                                         And basically using this binary representation of the keys.
                                         
                                         And because of this radix or this multipath partitioning of this binary representation,
                                         
                                         we also call this the radix join.
                                         
                                         And so we're looking at the different bits of the key,
                                         
    
                                         just like we did in the art tree, right?
                                         
                                         So we're starting by the most significant bit.
                                         
                                         With the most significant bit, we can get two partitions, basically.
                                         
                                         With the two most significant bits, we can get four partitions,
                                         
                                         three, we can get eight partitions, three, we can get eight
                                         
                                         partitions and so on.
                                         
                                         We can do this in multiple passes until we have the desired number of individual partitions.
                                         
                                         We can exactly say how many partitions will we have per pass, factors of by the power of two, right? So we cannot do
                                         
    
                                         three, that won't work, but we can do four, we can do eight, we can do 16, 32, etc.
                                         
                                         And so this basically means we're limiting the number of partitions per
                                         
                                         pass to the number of translation look-aside
                                         
                                         buffers.
                                         
                                         That basically helps us to make the lookup much faster.
                                         
                                         This again depends on your hardware, but typically this would be something like 64 or 512.
                                         
                                         Modern systems or different chips like M1 has more larger translation look-aside buffer.
                                         
                                         But using this, basically, we'll find the addresses much quicker
                                         
    
                                         than if we have to actually look this up.
                                         
                                         So basically, in the first pass, we'll split everything up
                                         
                                         by, say, 512 partitions.
                                         
                                         And then in the second pass, each of those again until we have the correct
                                         
                                         number of partitions, which basically means the right size of partitions for our cache
                                         
                                         size.
                                         
                                         And having the translation look-aside buffer help us with the look-up, right?
                                         
                                         Making this so much faster
                                         
    
                                         will also compensate for these multiple steps
                                         
                                         that we're actually paying extra for.
                                         
                                         Okay.
                                         
                                         And then, basically, once we have these small partitions,
                                         
                                         we can actually do the same as we did before, right?
                                         
                                         We can do the hashing, building the hash tables and doing the probing in the same as we did before, right? We can do the hashing, building the hash tables and doing the
                                         
                                         probing in the same way. Okay, so the last five minutes I'll give the overview of the join and sort merge join. Who knows a sort merge join?
                                         
                                         No? OK, cool.
                                         
    
                                         Well, sort merge join, very simple.
                                         
                                         It's a super cool join if your data is already sorted,
                                         
                                         because then you basically skip the first phase.
                                         
                                         So first phase is you're sorting both your input tables.
                                         
                                         And the second phase, you're just
                                         
                                         scanning through the input tables and the second phase you're just scanning through the input tables
                                         
                                         and because they're basically sorted you can say okay i'm at key number five i'm on the other table
                                         
                                         i'm a key number three i'm going to continue to key number five if there's a key number five
                                         
    
                                         i have a match if i'm skip there's no key number 5. I'm key number 6. I'm continuing on the other table.
                                         
                                         I'm at key number 6, for example.
                                         
                                         I have a match again, so I can basically alternate between the two tables.
                                         
                                         And I get, with a linear search through both tables,
                                         
                                         I can get all the join results.
                                         
                                         But I need to sort first.
                                         
                                         So the data needs to be sorted.
                                         
                                         So if my data is sorted, sort merge join is always the fastest.
                                         
    
                                         Let me rephrase this.
                                         
                                         It's typically the fastest.
                                         
                                         If I have very small data, nested loop might even be faster
                                         
                                         than having lots of complex code for something.
                                         
                                         But if data is sorted, then sortMergeJoin is actually a good idea.
                                         
                                         And typically, if the data is sorted, and even if the tables do not fit into memory,
                                         
                                         we can actually just scan the data once.
                                         
                                         So what this looks like in basically two phases is, well, first we have the two tables.
                                         
    
                                         First we basically have to sort them.
                                         
                                         And then we can just simply merge them.
                                         
                                         And this representation should already show you, right?
                                         
                                         So I mean, the white ones will basically match the
                                         
                                         black ones will be imagine we can just scan through the two in a basic merge fashion and
                                         
                                         we can also do this in kind of subsets right so if we know we're we're splitting this up here
                                         
                                         here for example we're doing range partitioning here then we can do the merging also in parallel here.
                                         
                                         For doing this in parallel, it's mostly about sorting, right? So the merging,
                                         
    
                                         if we merge or if we have range-based partitions, merging is quite simple. I mean, you can
                                         
                                         basically do this in parallel for all of the individual partitions.
                                         
                                         But the sorting is expensive and it's kind of also hard to parallelize because there is communication that you need to do.
                                         
                                         So, well, we basically want to do this as parallel as possible and we want to utilize the hardware as good as possible. And so one way is also to use
                                         
                                         SIMD instructions. And you already know all about this, fortunately. So if we're doing a parallel
                                         
                                         sort, basically what we're doing is again, we're splitting this up into sub parts. So we're basically sorting smaller sets of data and then again merging, right? So
                                         
                                         basically this is what parallel sorting does. We're basically taking a small chunk of the data,
                                         
                                         we're sorting this small chunk and then we're merging it with larger amounts of data. And again,
                                         
    
                                         we want to be smart about it. We want to utilize the hardware.
                                         
                                         So we want to merge, I mean, we want to sort within caches.
                                         
                                         So we're basically sorting data that fits nicely
                                         
                                         into caches because that's going to be fast.
                                         
                                         Then we want to merge within caches
                                         
                                         and that's called Bitonic Merge.
                                         
                                         So we're building up one way of doing this
                                         
                                         as a Bitonic Merge.
                                         
    
                                         So we're building up a merge network and then is a Pythonic merge. So we're building up a merge network.
                                         
                                         And then we need to do this multi-way merge.
                                         
                                         So we basically have multiple partitions
                                         
                                         that we're going to merge again.
                                         
                                         And I don't have all the time today.
                                         
                                         I'm going to show you the run generation in the sort phase,
                                         
                                         and then we'll finish up the rest next week, which
                                         
                                         is not much more than.
                                         
    
                                         So in order to generate what we call runs, which
                                         
                                         is basically this first subpart that we want to build.
                                         
                                         So basically, we have unsorted data, we're splitting it up into
                                         
                                         many small pieces and then we're sorting them. And this is what we call a run then,
                                         
                                         because this is already a sorted set which then will be merged.
                                         
                                         For this we can use a sorting network network. A sorting network is something that you can implement in hardware.
                                         
                                         If you have an FPGA, for example, you can have a sorting network.
                                         
                                         You can also build this in software by just having a fixed path for the elements.
                                         
    
                                         And this is efficient because it doesn't need any branches.
                                         
                                         Basically we're saying, okay, our data will always run through the sorting network, all of the data, all of the time,
                                         
                                         just by basically comparing elements and swapping elements
                                         
                                         if they're larger or smaller.
                                         
                                         And in the end, I'll get the final result.
                                         
                                         I'll show you in a second, basically.
                                         
                                         And one way of implementing this is using min-max.
                                         
                                         So say, for example, I have A, b, c, d that I want to sort,
                                         
    
                                         so just four elements, then this would be say for example a sorting network. It's hard to read but
                                         
                                         I'll show you there's basically 10 steps that I need to do and then with these 10 steps in y, x,
                                         
                                         in w, x, y,, I will have the correct output
                                         
                                         by just doing this min-max comparisons.
                                         
                                         Basically, I'm always saying which one is smaller,
                                         
                                         which one is larger.
                                         
                                         And if they're already in the correct order,
                                         
                                         they will be staying in this correct order.
                                         
    
                                         If they're not, I'll be swapping them.
                                         
                                         And this I can implement without any branching
                                         
                                         with the difference or zero operator, for example.
                                         
                                         So I can say if this is or not, if I'm basically
                                         
                                         saying that the operator tells me if x is larger, then I'm going to get the difference.
                                         
                                         If x is smaller, then it's going to be zero.
                                         
                                         And because I can always calculate this, there's no branching.
                                         
                                         This would just produce an output.
                                         
    
                                         And if I do this with y, for example,
                                         
                                         then getting the maximum, I can basically say, I'm going to get, either I'm going to get Y or I'm going to get X.
                                         
                                         Or the other way with the minimum, I can, in this case, I get difference or zero.
                                         
                                         So for X, if X is less, then I'm going to get y back.
                                         
                                         No, I'm going to get 0 back.
                                         
                                         If y is less, then I should get y back.
                                         
                                         I'll have to figure it out.
                                         
                                         On the slide, it's a bit hard.
                                         
    
                                         But basically, believe me, this is a way
                                         
                                         to implement min and max. And we can see there's no branching here, right? And this, by using this,
                                         
                                         basically what I get is something like this network here, right? So I do these comparisons,
                                         
                                         I actually can do in parallel, right? So the ones that don't depend on each other,
                                         
                                         so E, F, G, H, for example,
                                         
                                         I can do in parallel all four of them, these comparisons.
                                         
                                         Then I can do Y, I and J, I can do in parallel.
                                         
                                         And then again, I think I should be able to do
                                         
    
                                         these last four in parallel.
                                         
                                         And this is basically what you can see here, right?
                                         
                                         So first I'm comparing these two and these two.
                                         
                                         If they are already in order, they will stay.
                                         
                                         If not, they will be swapped.
                                         
                                         Then I'm comparing these two.
                                         
                                         Again, if they're already in order, they will stay.
                                         
                                         If not, they're swapped.
                                         
    
                                         And finally, I only have to basically compare these two.
                                         
                                         So in the end, I'm comparing all to all.
                                         
                                         And I'm only swapping if I need to.
                                         
                                         Otherwise, they will stay the same.
                                         
                                         So after running through this network, I will have a correct output.
                                         
                                         I don't need to do any branching.
                                         
                                         And I can implement this in SIMD.
                                         
                                         And this is what I'm going to show you next time,
                                         
    
                                         because unfortunately, time is up.
                                         
                                         Questions so far?
                                         
                                         No?
                                         
                                         Well, then thanks a lot.
                                         
                                         Then we're going to talk about locking next time.
                                         
                                         So this, I will definitely finish up this next time,
                                         
                                         and I will have enough time for all the locking fun and this will be next week tuesday thank you very much
                                         
