Hardware-Conscious Data Processing (ST 2023) - tele-TASK - Multicore Parallelism (2)
Episode Date: June 7, 2023...
Transcript
Discussion (0)
So we'll continue to talk about multi-core parallelism today.
And I've decided this is all we're going to do,
because otherwise it's going to be a rush.
And a few announcements.
So thanks for your feedback.
It was quite positive feedback. I'm quite happy.
We got a few things that we'll try to incorporate into the lecture.
And some feedback for the tasks. And the tasks part, this Lawrence will discuss with you in the next task session.
So there were some questions and we'll answer this.
Then the foundation, so the foundation who finances and funds all the stuff here, this building, etc., is looking for student assistants, so IT admins.
If you're curious what the foundation does and want to see, maybe peek inside through being a system admin, then you might want to look at their website.
So I've put the link here.
The slides are not updated in Moodle,
but I'll try not to forget to do this later.
And if you don't find it, just feel free to reach out to me.
I'll be happy to forward this to you.
And if you're not so much into foundation,
there's also one for the Barbarini.
So for the museum, they also look for student assistant
for IT management there.
And finally, last announcement that I could think of
while preparing this.
Today, we'll have a seminar again
with our joint seminar with TU Darmstadt.
We'll have a presentation by Danica Borobitsch from Oracle.
And she's going to talk about
autonomous database performance insights.
So Danica, she's a specialist in hardware.
And of course, Oracle things like engine parts.
So that should be quite interesting actually.
So that's why I'm also, I think, yeah, well, have a look.
I think we also have an announcement in Moodle.
So even if this is not updated in Moodle yet, the slides,
you can still go to the announcement
and see the link there.
So this is gonna be at 4.15 today in Zoom.
And she's gonna talk from Zurich.
So I mean, Zoom is the best you get anyway,
unless you're watching this remotely.
Then, well, we're still in multi-core parallelism,
so nothing has changed here.
However, some stuff has changed here.
So we had no class yesterday.
It was supposed to be a Q&A anyway,
so this is not that bad.
But since I didn't finish multi-core, or we didn't
get as far as I thought, we're just going to do multi-core today. And that means everything else,
not everything, but we'll have a bit of a shift again. So we're going to talk about locking next
week and then NUMA. The NUMA session, as announced, will be in this small lecture hall,
which should be just above here somewhere.
I'm always confused about the building, but I think it should really just be above here,
or maybe somewhere over there.
That's basically shifted. This also means persistent memory is shifted,
and storage and networking,
and then we would have had a Q&A session,
which basically got moved out.
But if you have questions, of course,
always feel free to also ask either during the task or after the lecture or during the lecture or offline
through Moodle, etc. So just so you are aware, basically, there's a bit of a shift here and there.
And today we're basically going to continue with multi-core parallelism. So I've talked about the parallelism basics.
So why do we need parallelism in the first place?
And all of you should be able to say right now why.
So why do we need parallelism today in our database?
What's, for our case, is the most or single most
important reason if we're looking at hardware?
There's some discussion.
We'll discuss with your neighbors.
Feel free to discuss.
Yes? We'll discuss with your neighbors. Feel free to discuss.
Because the clock speed isn't rising anymore?
Exactly. And what happens instead?
With more cores.
More cores, yes.
In order to utilize our hardware, the current hardware, we need parallelism.
So we don't have single-core systems anymore, we have multi-core systems.
If we want to utilize multi-core systems,
we need parallelism.
Otherwise, no chance of ever utilizing a many-core system.
I mean, two cores, maybe, right?
But three, four cores is going to be hard already.
40 cores, no chance.
So you really need a parallel system in order to properly utilize this.
So this is why we're looking at this.
And this is also why you need to, I mean, you generally need this.
If you want to have your algorithm, your system, whatever,
run efficiently on a modern server and fully utilize
this, you need parallelism. Period. Doesn't help. And this is why we talk about this also in this
lecture. I mean, of course, you can also be efficient on a single core, and you want to be
efficient on a single core. But unless you have many different tasks that you can do, you will have to do this.
You will have to distribute your tasks and parallelize them.
So with that, I'm going to skip the whole introduction
to parallelism and we'll dive right into the parallelism
in database management systems.
So there's basically two ways
in parallelizing database workloads,
and this is interquery parallelism
and intraquery parallelism.
And because these are so close,
what'd you say, the edit distance is so close,
I'm always having a hard time pronouncing them properly.
But inter means in between two different queries.
Intra means within a single query.
And interquery parallelism means we're doing something concurrently.
We're using multiple queries at the same time and we're running something concurrently. We're using multiple queries at the same time
and we're running them in parallel.
So this is cool if we have an OLTP system
and an OLTP workload where, say,
our student information registration system,
first day in the semester,
everybody wants to
basically sign up for courses will have potentially hundreds of queries in
parallel so this means we can actually utilize many cores in parallel because
there's so much going on so much individual queries which we can schedule
separately typically these kind of workloads, OLTP workloads, are characterized by
simple queries. So, just lookups, simple lookups, simple updates, etc.
If we have analytics, so we have a few people actually trying to figure out how can we improve HBI and see, I mean, on the one hand,
the marketing strategy, on the other hand, make sure that everybody just gets straight A's. So
how do we have to improve our teaching? This will be long-running queries, and we're not going to
run thousands of those in parallel. We're going to have a few of those. And then maybe
based on the insights that we get, we're going to further drill down.
And so for this kind of workload, this inter-query parallelism doesn't really help us because
we have few queries and there's not much to be run in parallel. And so for this, we need the so-called intra-query parallelism.
And I mean, this is sort of in OLAP world,
so online analytical processing,
where we have these long-running queries,
this is sort of a must.
I mean, of course, we can also run it differently,
but I mean, we really want to use this.
If we want to use the system efficiently,
our hardware efficiently, then we
need to parallelize each individual query.
Of course, we're still in the space where we say, well,
maybe there is multiple users to our system.
So we're not going to be able to just utilize or just use
the system for a single query.
But we should be able to run multiple queries in parallel.
And maybe one question regarding this,
why does this make sense anyway?
So why would we want, even if we could somehow have enough tasks, like split up our query
in so small pieces that we can utilize, use all of the threads, why would it still make
sense to have different kinds of queries to be run in parallel, maybe somehow shifted to each other,
looking at the individual cores.
So do we have, like, what does a core do?
I mean, does it just do one single operation at a time?
Does it, I mean, remember what we talked about,
like how instructions are actually executed in there?
What happens if we're just doing the same stuff over and over
in the core?
Well, it gets pipelined.
It gets pipelined, yeah. We can use caching.
Sorry, say again?
We can use caching.
We can use caching, yes.
I mean, this is good.
This speaks for just using a single query, right?
I mean, if or if the queries touch the same data,
well, then already caching for multiple queries
would also improve stuff.
The other thing is functional units, right?
So there's many different functional units.
So if we have different kind of tasks that we can schedule on a single core,
we can actually improve the utilization of the single core
because we can utilize these different functional units. So talking about our SIMD
units, talking about our floating point units, our ALU
load and store units, etc.
So if there's more different stuff to do for a single core,
we're actually going to be able to get a better utilization.
Of course, there's a trade-off.
If all of a sudden, and in query processing,
it's all about data, right?
So if we're fighting for caches in the core,
no, not so great.
But if we can somehow use the different units in parallel
through different queries,
then actually we're going to get a better utilization.
And anyway, if we have long-running queries, then actually we're going to get a better utilization. And anyway, if we have long-running queries, we still want to be able to kind of make progress
on multiple queries at a time.
Otherwise users will be unhappy if the system is blocked by one long-running query forever.
Okay, so how do we parallelize?
And there's different ways in parallelization on a single server.
And I mean, in general, it's processes, right?
So if we have, that's kind of the very basic constructs,
that's basically for each individual program that we're running,
we will have a separate process,
and there we have a separate memory space
or address space, as we know, right?
So we have this virtual address space.
Each process will have this, their own.
And they have their own memory, and they're isolated.
So basically this is a good thing
because they cannot interfere with each other.
At the same time, it's a heavyweight thing.
So process actually, it takes some time to be scheduled.
There's a lot of context switching.
We cannot really use the TransationLookAside buffer if there's different
like different address spaces, this will have to be reloaded, etc.
So, switching between the processes is actually costly.
And because it is costly, there's other things, and you already know this, right?
There's threads.
So, threads are basically something that run inside a process. So multiple parallel low-key of control inside a running program.
So they have the same virtual address space,
and they are kind of an abstraction of a virtual processor.
It's like a kernel thread is still scheduled
by the kernel and also a process is scheduled by the kernel, right? So if you
have multiple processes and you will always have multiple processes
typically running in your system then the kernel will basically switch those
assign them to you have some influence to, but still the kernel every now and then will use its own processes to run kernel stuff, OS stuff.
However, for the threads within a process, there's a bit more control that you can have.
And we don't need all this switching memory spaces, etc.
We can also have user-level threads. They act just like threads. kind of all this switching memory spaces, et cetera.
We can also have user level threats. They act just like threats.
They will need some kind of threat infrastructure behind.
Also, we need all this process infrastructure behind, right?
In order to have something running.
So we need our address space, et cetera.
But then we can use our own threads.
And we can schedule them.
Like if we're implementing our own thread abstraction,
we can basically really have a lot of control
how they are scheduled, if they're
spread-gilled cooperatively or preemptively, meaning threats are kicking
each other out or helping each other to be run, these will be invisible to the kernel.
So the kernel won't do much about them. So I mean, the kernel might still just move out the process
unless we're basically blocking parts
of the system for our threads and our processes.
But here, we have much more fine-grained control.
And then finally, even a lighter abstraction
would be so-called fibers.
And this is also typically implemented in user space
and they also share the address space. But here this is like completely cooperatively
scheduled. This means here basically our threads there will always only be one fiber running at a time.
And the fiber run inside threads.
And basically, if we're trying to schedule another fiber,
or if we're not trying, if you're scheduling another fiber, then the currently running fiber will just yield
and make space for the next fiber.
And this also means it's because the fibers are running inside threads.
So it's kind of a super lightweight abstraction.
But this means there's no parallelism per se by just using fibers.
So fibers are just like a unit of scheduling,
and we will need multiple threads where we schedule the fibers inside.
Otherwise, if we're just using fibers inside a single thread,
we'll just have a single thread, and we don't have any parallelism.
It's just like an additional abstraction to have this running, like having the kind of task
switching, et cetera, more efficiently and cheaper.
And so this is something that exists in Windows,
but it also exists in other spaces, so Linux, Unix,
for example, through the Boost library.
So the Boost library offers these kind of fibers, for example.
Okay, so given that we have these different kind of units of parallelization, so be it processes, be it threads or fibers,
we can now start scheduling them in the system and we can now try to somehow split up our work in the system
in order to make our work parallel.
And a very simple way of doing this is, or let's say also the most classical way and the way that systems still
do it because there's just been around before there were threats, is one OS process per DBMS
worker. So this means every database management system worker, so every process that does some processing for a query,
will have its own process.
So that's classical architecture, and that exists because
old systems just were not there, there was nothing else they could do.
And of course here, as I said, like the scheduling, etc.
The task switching is just very costly.
And this is because of that.
There's also like the more recent and typically used setup
where you have one thread per worker.
And then you have some,
like you have a single multi-threaded process
that all hosts all of the worker activity.
And then you have some kind of dispatcher thread
that listens in for new connection.
And each connection, for example,
could then be allocated to a new thread
and this would work for examples very well in ways it would be simple for this inter-query
parallelism where then for each query you basically open up a new thread and that will
execute this separate query and then of course because we don't want like this task switching, etc.
be handled by the OS necessarily, if we think we know better, then we can also use our own threads.
So we can use the database system threads, these user space threading constructs, which replace the need for the OS threads. And there we have kind of cheaper task switching,
but at the expense that we have to deal with this all ourselves.
So we basically have to make sure
that our task switching is good, that the placement of the threads
is good, et cetera, because the OS won't see them anymore and cannot do anything anymore here.
Okay.
So, and of course, we want to have a multi-threaded model today.
I already sort of elaborated to this
because we have less overhead for context switches. We have the same
address space and that will make all of this address look up. Remember how that works on the
CPU and on the caches. All of that will basically be the same within these threads.
So we don't need to change that.
We don't need to change the caches
or invalidate the caches, et cetera.
I mean, of course, there's some techniques also
to change this if we have multiple threads
or multiple processes,
but here this will just be lightweight. So we have the same address space.
We have the same addresses. We can use the same translation look-aside buffer.
Everything will work efficiently on the CPU here.
And we don't have to change anything while switching between the different
threads.
The disadvantage is, however,
if one of the threads crashes,
this means the whole system is down.
So this usually typically means that the process crashes.
If a thread crashes, I mean,
there's some ways to deal with it,
but that often happens.
And then the system is down.
This is also a problem if you have a system that doesn't have its own server, right?
Think about something like, what's it called?
The small SQLite, for example. So SQLite doesn't have its own server.
It's just in process, right?
So you're using the same process as the application.
If SQLite crashes, the application crashes.
Or think about DuckDB, same problem, right?
So if you're using DuckDB,
if DuckDB crashes, it will kill your application.
And I mean, of course, as a person who builds the system, you don't want that actually to
happen, right?
You don't want your users to be unhappy because their application crashes because of your
database library, for example.
So that's kind of a bit of a problem.
And I mean, here I have a list of things that,
well, a single, like that are per process
and that are per thread.
So you can see that while we have a per thread,
we have a program counter, registers, stack, and the state.
So which is expected, right?
Because each thread will execute part of the program,
so this is what we need.
However, we don't have a separate address space.
We don't have separate global variables,
open file, child processes, etc.
So this is all per process, right?
And so all of this, if we're using like individual processes for our system,
then all of this will always need to be exchanged for each context switch. So meaning if we have
multiple processes, many queries running at the same time, then all of this will have to change
every time when we're switching in between
queries, for example, or in between different tasks.
Okay.
So, well, so with this, we basically know, okay, we want to have a multi-threaded model.
And this is also what current systems all do.
And then we need to do some scheduling.
And this is kind of where there's a lot of space,
how we can deal with this.
So what kind of task do we give to the different kind
of threads?
And I mean, scheduling is not only,
I mean, there's two different ways or levels of scheduling.
One is scheduling subtasks of the query or work of the database management system to individual threads.
And then, of course, the question is how do we schedule these individual threads on
the on the cpu so that's i mean this mostly is done by the cpu somehow however we also have
influence on that right so we can also somehow deal with or additionally say okay i want to have this thread running here for example I want to have this thread running
here.
For example, I want to have this thread running here.
So this is kind of the second level of scheduling
that we would have to think about.
But first, we'll look in terms of scheduling,
look into how do we split up the work in the database management
system into chunks that we can give to the threads in the way
and so there the the question then is okay which um which thread will execute which work and
um well typically database management system will do this themselves i mean of course you can use
the os for dealing with all of that so So you're just spinning up multiple threads,
and then let the OS do its thing.
And it won't do a very bad job, but it also
won't do a perfect job, because the OS is not really used
to pure database workloads.
So the OS is optimized for heterogeneous workloads
with many short-running etc. Database workloads
typically are very long running tasks, at least OLAP tasks.
And so database management systems will use something like a dispatcher thread to assign
individual tasks to workers. So this is called a push model. It could also be the other way around,
that we have multiple worker threads that just pull additional work from something like a work
queue. But this kind of push model is used by DB2, SQL Server, MySQL, Oracle, HANA, etc. The other way, as I said, would be push-based. So, we're pushing
the work towards the system, towards the... No, push-based is basically the dispatcher pushes
or gives the task to the workers. Pull-based means we have a worker queue
and the workers will just pull their parts.
Okay, so with that, the question is how do, or yeah,
so with a dispatcher, how does it work?
So this is kind of what the web server would do,
and it's similar, but a bit more complicated.
So for a web server, well, you have network connection,
you have individual incoming requests for certain websites,
and then you have worker threads,
and basically the dispatcher thread will basically say,
well, I have this request incoming here, worker thread, please look up this page
and give me the result back, or give the result back
to the user.
And most of this, typically, like all of these threads,
et cetera, would be handled in user space.
So we don't have to deal with the kernel threats.
For a query, for a database system, then for every query plan, the database management system
basically has to decide where, when, and how to execute this in an optimal manner. A lot of this, of course, is query optimization.
So we have to basically think about given a SQL query,
how do we translate this into operators?
How do we split up the operators, et cetera?
And I mean, or which order
and what kind of operators will we use?
And this is, I mean, all of the query optimization,
as I said, is not really part of this.
So this will be dealt with in database systems too.
However, if we already know this is kind of a query plan that we actually want to execute,
then we have to figure out how many tasks are we going to use?
How do we basically split up the query plan into
tasks which course do we want to place them on and there's basically again a trade-off so on the one
hand we want to have them uh in such a way that the caches works work efficiently on the other
hand we don't want them to be kind of fighting for certain cached items.
So if they basically are iterating on the same cache lines, that would be bad, for example.
Then we might have some contention there.
And then, of course, also where should the results go?
Or the individual, like intermediate results typically go.
And of course, well, as is like efficient execution of individual queries, et cetera, and using the CPU,
the scheduling is of course also a key to efficient execution
and especially to parallelism.
So in order to get efficient or
in order to use the course efficiently
we need to schedule the threads accordingly
in order to get good parallelism.
Okay, so we said there's two ways.
Either we do push, which means we have a dispatcher,
which basically knows what's going on and monitors the progress
and then has like a global control of where we are in our query
and in our tasks, like what the progress is.
The other approach is pull. in our query and in our tasks, like what the progress is.
The other approach is pull.
So we just have a queue and then there's again,
some kind of dispatcher.
We need something that fills up the queue,
but then the workers just pull the individual subtasks
and process them and return basically to get the next task so the task will
also tell them where to put the result etc and as soon as they're done they're going to pick up the
next um the next task and here we don't have control where things will be executed right
so we because the threats will just pull or the workers will just pull their information
the the task and which core this will go to etc we don't actually know in detail well if we have
a global control we can actually say well this core does this or this thread which is co-located
with this other thread on the core should do this so in in the pool-based, this is not going to happen.
So there's trade-off.
And then we have to figure out how to allocate the workers.
And there's, again, multiple approaches.
Of course, at least we want one worker per core.
Otherwise, we'll have empty cores
that are not going to be utilized.
We might reserve a core for like this dispatcher thread etc. So we might not want to even,
I mean if you think like a rack scale setup, we might use something for like a primary server
in order to make sure this is not the general scheduling that's not interfering
or the workers are not interfering with the scheduling, etc.
But in general, for everything where work should be done,
every core should have a worker.
And then we can hard assign this.
We can say this thread should constantly be
running here by setting an affinity. So we're pinning the thread to a certain core and then
this thread will basically keep on running on this certain core. Of course, the OS might
still schedule something in between, but this thread should be running on that core.
An alternative to that is we have multiple workers per core,
and this kind of gets back to what I asked you earlier.
So if we have multiple,
or we know we have multiple units on a single core,
so scheduling multiple threads on a single core. So scheduling multiple threads on a single core
actually makes sense,
because if these threads have different kind of things to do,
then they actually might be utilizing this core better
than just a single thread.
Like a single thread might just be waiting for data
coming from the caches or from memory, right?
But if we have another threat or another worker that does something else in between,
then the core can be better utilized and it can basically use this hyper-threading.
However, there's a trade-off, right?
So we have task switching.
So, I mean, even though this is cheaper than processes,
we still need to do some switching.
We still need to change the stack, et cetera.
So that basically means we get a better utilization,
but we're paying in scheduling.
At a certain point, if we have too many threats,
we're not going to get better performance anymore because we're just busy with task switching.
However, the one thing that actually might be good is using multiple functional units.
So this is kind of, if you think about this here as kind of different functional units,
so say we have four different functional units here and we have two instruction streams,
having two instruction streams will help us better utilize these functional units
or then having just a single instruction stream,
which might have some contention here or there,
or just use certain units for a certain amount of time.
So another stream, we might be able to somehow fill this in.
So having, say, for example, two workers per core,
four workers per core,
might still give you better performance. And I'm saying might, because if you look at a lot of
experiments, we often see that using a single worker per core gives you close to the best
performance. Then having a few more, I mean, having twice as many
might slightly improve, but it's not a lot.
So we get a bit of better utilization,
but the scheduling often is already a problem.
Four or 16, something like that, there
we're not going to get any benefit anymore, typically.
But it really depends.
And it also depends on hardware.
Some hardware is better at scheduling and task switching.
So this is something you have to basically try out. So some processors really cannot efficiently be utilized
by just a single core,
single worker per core.
Other hardware is basically fully utilized. just a single worker per core.
Other hardware is basically fully utilized. And this is a lot about instruction scheduling internally.
Okay, so more details about inter-query parallelism.
So how do we get inter-query parallelism?
So we have to somehow split our work, our query into tasks.
So let's look at a simple query.
So we have, yeah, select from where, right?
Two tables, there's a join, and we want to parallelize this.
What we have to do is we basically have to split up
our tables into subparts and execute on these subparts separately.
And so then we can basically, I mean, for selections, for projections, for example, this is quite simple, right?
We can just do this in parallel on subpartitions of the table.
So this is just an example.
In this case, we're splitting up our table into three partitions,
and each of those partitions could be run on a separate core
or executed on a separate core.
Then we can do in parallel the selection and the projection.
For example, if we do a join, so in this case,
for example, a hash join, then we can in parallel
build the hash table.
We can in parallel probe the hash table
and exchange the results in order
to get the final result. So for this,
by splitting this up into multiple parts,
we can actually
utilize the system and we will get performance improvement. It won't be perfect because there
is some interchange, so we have some communication. This is shown here by these exchange operators,
but we'll get performance. And now let's look at these exchange operators. So this is based on the volcano style parallelism.
So volcano, as you remember, is this kind of traditional way
of executing crease.
So with this iterator model, right?
And this already had an idea or had an approach to parallelism.
And so basically, rather than, or the idea in the volcano splitting up our query plan.
So this is basically, this here is our query plan, right?
This part.
And we can basically add additional operators here that gives us parallelism.
So that basically encapsulate this internally.
And it's called the so-called exchange operator.
And I mean, for interquery parallelism,
we have pipeline parallelism available.
Of course, we have the query plan has multiple steps.
So you remember, right?
Pipeline parallelism would mean we're doing a scan,
we're doing a selection, we're doing a join,
we're doing a projection, for're doing a selection, we're doing a join, we're doing a projection, for example.
This is separate operators.
We might even be able to break them down into smaller operators.
And we can basically run them in parallel instances or in instances next to each other.
Separate threads, basically.
One thread does the scan, one thread does the selection, etc.
So we get some parallelism, which will improve our throughput. basically one thread does the scan, one thread does the selection, etc.
So we get some parallelism, which will improve our throughput.
It won't improve our latency,
because we still need to go through the whole pipeline.
And then we can basically partition,
and we can do this again, if you look not at the query but at the operator level.
We can have an intra-operator parallelism and we can have an inter-operator parallelism. So breaking our query into parts, we can say, well, a simple way of parallelizing could be now we have multiple operators.
So let's run these operators one at a time,
or in parallel, and this is good if we have something like a bushy tree.
So, say for example, we have two tables, we're scanning the two tables, these are two operators,
we can run them in parallel, no problem.
However, we further want to break this down typically, and then we're getting into
intra-operator parallelism. So similar to queries, right, rather than running two operators next to
each other, we're splitting up the operator into multiple parallel instances. And this, of course,
a parallel scan is very simple. We're splitting up the table into different partitions, but we
also have parallel joins or parallel sorting.
And this is exactly what we're going to look at later on.
And in the Volcano Stein parallelism, we want to have all of it, including setup, teardown, runtime logic, encapsulated in a separate operator.
And this is called the exchange operator and basically you put this exchange operator into
your query plan or this data flow and this will basically say well the operator before this
exchange operator can now be executed in parallel and the exchange operator will basically deal
with all the distribution of the data such that we have a good output on the next level again.
So if we're say, for example,
we're doing a scan and a selection,
then the exchange won't have to do much
because we just directly connect them basically.
But say, for example, if we want to do a join,
then the exchange operator would
make sure that all the join partners see each other again.
So we're basically creating an intermediate result again that will be correct for everything.
And so basically this is done on a plan level. So we're building our query plan, an ideal query plan for a query.
And then the optimizer also figures out what's the degree of parallelism that I would want
to have for each individual operator.
So the operator, of course, needs to be parallelizable. And so this means we need to be able to run
on separate chunks of data and split up the operators.
And then basically, the optimizer
will instantiate one query operator per plan or query
operator plan for each threat.
And these operators will then be connected
with the exchange operator,
which basically manages or encapsulates the parallelism
and manages threats.
So this is actually also some management.
And this basically, like in a simple way,
this could basically be in between each of the individual operators, right?
So here, say after the scan, we have an exchange.
After the join, here we don't have one, then it would be a multi-way join.
We could also have one here, for example.
And then after the join, we'll have an exchange which will collect the final output and will
print the final result. And this is a simple model
or an elegant model, which is actually used by many systems.
So we're basically having this separate operator which
handles the parallelism.
And you can think of it similarly to scheduling this
in the MapReduce system.
So if we're breaking down our SQL query into MapReduce steps,
then of course, the MapReduce system
will basically give a certain degree of parallelism.
Typically, you would do this yourself,
but the optimizer could also do this
if you have a more advanced system.
Then something like Hive would give you some kind of parallelism automatically.
And also do this exchange, right? Then the steps in between, this is basically the exchange operator. So we're producing this intermediate results. If we can pipeline, if we can merge different operators,
we don't need to do exchange, but this can also be decided basically on the optimizer
level.
And well, the cool thing is, well, it's good, right?
It's quite easy.
The operators are oblivious to the parallelism, so the operators do the same thing that they did before.
We just need to ensure that we can run them on the subsets
and still get correct results.
The bad thing is that, well, if we're doing this at compile time,
when we're optimizing the query,
we probably don't know the actual exact data statistics.
And that means we'll most likely not get a good utilization or we will have some
load imbalances. So some parts
will run faster than others and then between these exchange operators, we basically have to wait
and we're not going to make any progress.
Also, if we're doing this at compile time,
we cannot change this at runtime.
We cannot really change the parallelism at runtime,
at least not efficiently.
So this might have some additional overhead.
And, well, then there's, of course, potential overhead. And well, then there's of course potential overhead. The exchange operator
might create some additional copies of the tuples by copying it around some additional
intermediate results. And if we're creating too many threads, well, then we have too much
context switching, which of course, I mean, we also know, right?
So this is basically why this is done in classical systems similar to the iterator model, right? This has been done in classical systems, but modern systems will use something differently.
So they will basically have some kind of query coordinator that does the parallel execution,
at least has some influence at runtime. So it will basically, based on the number of parallel
servers, it will determine the granularity and the load distribution. And there's parallelism within and between operators.
So, say for example, we have a scan and a group by,
then we typically have a producer-consumer pair.
So, the scan would produce the data, the group by would then consume.
So, we get this kind of pipeline because they're communicating to each other,
where we can use pipeline parallelism.
And we can do this on separate subparts.
So our initial table would be split up into multiple parts.
And then we can do the parallel.
We don't really have to split it up.
We basically just tell the different threads,
read different parts of the data from memory, read these memory regions in parallel.
And then we need, of course, some communication.
If we're doing grouping, for example,
this means some form of sorting or hashing
in order to get the correct sets.
And then the results will basically
be sent to some kind of coordinator that gives back the result to the user.
And an important part, of course, is the degree of parallelism,
which is the number of threads in the end that we're executing this on,
or the maximum number of threads that we can use.
And this can be chosen manually.
So it could be chosen like in the volcano style
statically at compile time.
It could be chosen statically at system setup.
Or it can be done dynamically as the system is running.
And it'd be adapted also based on how many queries do we
have in the system, for example.
Because of course, if we have only one query,
we always want to utilize the whole system.
If we have multiple queries, we might
reduce the degree of parallelism of a single query
in order to be able to execute multiple queries in parallel.
So, now the question is how do we parallelize an operator? And well, typically, well, we just have to split up the work into some fraction. And for us, this means either it's
an operator or it's a subset of the data.
So we basically can split up the query based on the operators,
which is this inter-query operator parallelism,
and then we can split up on sub-parts of the data that we're processing,
which would then be a part of the total work as well. And of course, we want typically to create more tasks
than there are threads.
Because otherwise, if we have less tasks,
we'll basically have less.
Well, we cannot fully utilize.
Getting the exact right number of tasks
will typically be hard.
And also, if we split them up, I mean, of course, we know how many cores we have, so
we could just split up in the number of cores.
But then most likely we will have some imbalance in between the different tasks.
So we typically we want to basically have more tasks in that in order to then have some
at runtime, some flexibility when some tasks are finished faster or some subsets of the work are
finished faster than others. And so this basically means if at runtime, if we see that a certain task is too much work,
then we might even be able to split it up further.
And this happens due to SKU.
So if you've seen big data systems, their typical problem in MapReduce systems is that
we have one reducer that gets
all of the work.
And MapReduce can't really handle that, so then basically one reducer will forever continue
working on this.
And in a modern database system, this will further be split up, right?
So if we see all of the work goes into a single task, well, let's split up this task, make it more parallel
and try to have a good load balancing.
And here's an example of this.
So this is out of a paper by Balkhisen et al,
where they use a main memory hash join.
And we'll talk about this kind of join a bit later.
And here, for example, this is a Radix join.
So here in a Radix join.
So here in a Radix join, we're first doing the partitioning, or we're first partitioning
the data, and then we're building hash tables and doing classical hash join, basically.
And what we can see, if we're just splitting this up into separate steps or into multiple tasks based on the number of threads,
we can have very different load.
And what happens is that even if the tasks are smaller
and we have a static scheduling,
we still will run into the same problem,
that we have very different kind of load
based on the work that the subtasks get.
And so here, for example, you can see that
the histograms computation,
so in order to see how we want to partition,
can be very different.
And then the actual partitioning can be very different.
In the end, then we have hash tables,
so that might actually work.
However, if we do some additional task composition
for large tasks and for, well, basically splitting up
very large tasks and rescheduling them, then we can get a much nicer distribution of work
and we can basically also get much nicer total runtime for the whole thing.
So this means that while in a setup where we have some SKU,
we will basically, a lot of the tasks or a lot of the threads,
workers will wait for certain tasks that we are running.
We can evenly distribute and fully utilize everything.
So simple queuing will basically lead to poor load balancing and basically threats just
waiting for different stuff.
And with fine-grained, we can get additional performance. So now we somehow have to partition the data in order to get this.
So based on this experiment, we see, okay, we need some fine-grained partitioning.
Of course, this leads to additional scheduling,
but having a fine-grained addition or a better load balance will still improve
our total throughput.
We need fine partitions and we somehow want to dynamically adjust them.
Rather than doing a static partitioning, which we would think about in a push-based model, right? In a push-based model,
we typically would say, okay, I'm statically partitioning up my work, and then I'm pushing
this to the workers that the dispatcher will basically say, okay, this thread will do this,
this, etc. It's better to be more dynamic about this. So there is a lot of imbalance, because there is skew in the data, because we don't
really know how the data distributions are, it actually makes sense to be as dynamic as
possible to this.
How we can do this on an engine level is what's called morsel-driven parallelization, which is also implemented in HYPR.
And the idea is here that we're basically splitting up our work into very fine, and of course it's not three,
but it's, I don't know, a thousand tuples, something like this, so, but small blocks
that we will be individually executing. And the dispatcher thread will basically say,
okay, we're breaking up the task
or the query into these small tasks,
and then the individual threads will basically run on these individual tasks.
These are constant sized work units, they're called morsels, so it's a small bite of work
basically.
The dispatcher will put this into basically a queue or a pipeline of operators
that of worker threads,
which will then work on these individual morsels.
And with this, we get like a very dynamic balancing, right?
So we're just basically continuing to work on these morsels,
small bite, small size chunks.
And these are large enough such that the scheduling
doesn't hurt us that much, but they're also small enough
that we have a good dynamic scheduling.
And in this setup, so in Hyper, for example,
the number of worker threads is equal to the number of hardware threads.
As I told you, like twice as many might give you the number of worker threads is equal to the number of hardware threads.
As I told you, like twice as many
might give you slight performance improvement,
might cost you a bit more.
It really depends on the hardware, et cetera.
And of course, then each individual operator
needs to be designed for parallel execution.
So let's look at this in a bit more detail.
So if we have this kind of pipeline, where we have two, or this kind of query, where
we have two joins, three tables, each of those have some kind of selection, of course, projection,
et cetera, could also be in there.
What happens is that we're breaking this query up to pipeline breakers.
So the join would be a pipeline breaker if we're doing a hash join, because we have to
build the hash table that we're then probing.
And otherwise, everything else, meaning building these two hash tables for the join, is basically
one thing that we need to do.
So we do this separately.
These are two pipelines.
And once these hash tables are built, we can just use them and have a single pipeline. So we can do
this in a single pipeline fashion. And the work for doing this, we can do in parallel. So building
up this hash table, we can again do based on these small child byte
morsels so we're basically having multiple scans in parallel individual pages that will be run
like individual morsels basically that will be executed in parallel on multiple threads that
will build up the hash table.
And both, of course, we can do in parallel or we can do one after the other.
And once these are done, then we can basically schedule the last pipeline.
So this would be this pipeline, which then in a multi or in a parallel fashion,
multi-threaded fashion, can probe both hash tables.
And well, basically completely parallel
for individual morsels get the final results.
So this is kind of a modern way
of executing multiple threads
and also parallelizing the whole query.
And we'll look at how to basically split up the joining, etc., into parallel
parts in just a few minutes after a short break. But do we have questions so far? No? Well, let's do a three-minute break,
then we'll look at parallel joints. Before we continue, quick question. Who
would be interested in a project seminar in the winter term on hardware based stuff.
More than 50%. That's great.
Because we're thinking if we should have a follow up on this,
we're going to do big data systems in the winter term
again and some other stuff.
But I thought maybe have some hardware related stuff would
also be fun.
So if we have enough people and it seems like I mean we are not everybody I know there's
more people behind the camera so those we can basically also ask but looks like we could
have a decently sized group.
OK, cool.
Then back on track, parallel joints.
I'm spending too much time already anyway.
So here's some statistics by Andy Pablo.
And this is really one type of workload. This is not necessarily,
let's say, enterprise real-world data. This is TPC-H on Cloudera Impala. This is basically
measuring how much time is spent in which query operator. And if you're doing Cloudera Impala on TPCA
or TPC-H on Cloudera Impala,
then most of the time is actually spent
in the join operator.
So this will take a lot of time.
There's a lot of research or also like survey papers
that look at real-world workloads
and there you'll see the join is not necessarily always the most expensive part, but it is a significant part.
So this is something we want to somehow parallelize.
And especially in OLAP workloads, looking at SAP HANA for example, you will have tens of joins per query, tens to hundreds of joins per query.
So this needs to be efficient.
Of course, also join order is important.
Again, this is something that we're not looking at.
This is something we're expecting to already get.
But we want to do joins.
We want to do them in parallel using multiple threads within the join operator. So we want to parallelize the join operator.
So we want to parallelize the join operator.
And I already showed you to some degree
through this Morse parallelism how this can be done.
But there's two main approaches.
So one is parallel hash join, and the other one
is a parallel sort merge join that will typically
be implemented in current system. Of course, you can also do like
this MapReduce style joins where then within the join you can do all kinds of other or within the
partitions you can do all kinds of different join operators but from let's say an overall
setup these are the two main functions that we want to do.
And we're going to look at the hash join first and then at the sort merge join.
In a hash join, you might remember, very simple, the very simple setup is basically we're building
a hash table on the inner join relation, typically the smaller table table because we want to have a small hash table
here that's the build phase so first we're building the hash table and then we're scanning
or reading through the outer relation and probe the hash table that's the probe phase so this is
like very simple we're looking at basically two for loops
where first we're inserting everything.
The second loop, we're probing everything.
And we need to do this first.
So we need to have the full hash table.
At least we need to know that we have everything in the hash table that we need
to find for the tuples that we're probing with.
So this kind of blocks us for then doing the probing.
And in parallel this basically means, or graphically this basically means first we're scanning,
we're building the hash table, then we're probing, we're scanning. And in a parallel fashion, we can basically have a shared
hash table, meaning we're having multiple sub-parts, we're partitioning our relations,
and we're basically scanning these partitions in parallel.
Again, we have to do the build first in order to have the full hash table there
and then in the second step we can basically do the probing and we can do
the probing also in parallel but only after we've filled this shared hash
table. Otherwise we will not see all of the tuples
and we'll lose some data.
So that's why we basically need this two-step approach.
Now the question is,
how do we deal with this hash table?
So we can have one global big hash table
or we can break it down into multiple smaller hash tables.
And then there's multiple different ways of how to deal with this.
So in a non-partitioning-based join,
we're basically using multiple threads to build up a global hash table.
And then we can use multiple threads to basically probe this global hash table.
You could even have some copies of the hash table or something like that. But this is basically meaning we have one instance of a hash table, one single global
data structure that all of the information or everybody, basically all the threads have to touch.
And because that might be some contention, there's also an idea of basically partitioning everything.
And so in this case, we're basically building sub hash tables.
We're partitioning the input relations in a way that we know okay
i don't know say modulo for example we know all even numbers or even keys will go to our first
hash table or all even keys will go to our second hash table in the build phase. And then we're doing the same partitioning in the probe phase.
And then we have two sub-hash tables,
which we can probe in parallel.
And now you can already think.
I mean, maybe remember in the art, right?
So this bit, basically, so this bit basically
or the radix try, so we can use basically the same kind of
techniques for the partitioning, right, so we can basically use
our key, use a binary representation
of our key and use the binary representation
to split up our data into multiple partitions
and using these radixes basically so in a partition-based hash join
we first have a partition phase which basically means we divide the tuples into these individual sub
partitions based on the join key and then we can in parallel basically build the hash table and
so then if we have the sub partitions basically everything once we have nice, small, and hopefully somehow
not too skewed subpartitions, all of the threads
can basically run in parallel, do their partitionings.
And then we can, in parallel, also probe.
And we always only have to look up the correct hash table that
fits basically. we always only have to look up the correct hash table that fits.
Basically, we use the same kind of partitioning for the parallel probe,
so we automatically know which hash table to look up.
And so this is kind of the three phases of a parallel partition-based hash join.
And, well, the idea again would be here
to make sure that everything somehow fits into the cache.
Because if we have a global hash table, so if we have a large global hash table,
the problem is that, I mean, the building,
the probing here, every individual probe
will most likely be a cache miss.
Right, so this means every single probe here
will actually be slow because our hash table is huge.
If we have a good hash function,
the accesses will be uniform across the hash table
because like every bucket basically has the same probability,
so every single lookup will be expensive.
In order to make this better, we somehow make to partition,
we want to partition the hash tables into small disjoint chunks that are of cache size.
And then basically, when we're doing the lookups in the cache
size, in the hash tables that fit into cache,
then we don't have a cache miss, because the hash table can
be cached.
And again, in the partitioning phase, now we have to deal with this. So now we have to somehow partition the data in parallel.
And this means we need to write separate partitions.
And these partitions,
either we do them shared again,
meaning we're writing to a single partition,
like all of the even numbers to one partition,
or we're saying per each thread,
let's have a separate partition,
and then somehow synchronize,
and then somehow merge.
So if we have a shared partition,
then we need some kind of lock.
We'll talk about locking next time.
But meaning we need to know, OK, we
have to write to the end of the partition.
We need to know where this is.
For this, we need some kind of information.
We need something that makes sure that our multiple threads
don't overwrite each other's data.
So this is some kind of latch.
Or latch basically means lock in this case.
Or we let every thread do their stuff individually
and then combine afterwards.
So this is kind of this private approach.
So in the shared partition way, meaning we have our table,
we're scanning through our table or each processor,
each thread basically does their subset
and does the partitioning and basically needs to know,
okay, where does the data go, right?
So does the partitioning or does basically the
hashing into the right partition and because the other threads also write we basically need to
synchronize on the partitions where i actually want to write so if say for example this thread
wants to write into this bucket of this partition, then while writing, it needs to somehow lock this.
It needs to make sure that no other threads writes
into the same position here.
But in the end, we already have the full final partitions
correctly in memory.
The other way would be every thread basically
gets their own partition.
So every thread has a separate memory set or memory region
where it partitions to.
And then we need a separate combination step
where then we combine these small subpartitions
into larger partitions that we then can do basically the hashing, et cetera.
OK.
So and if we have like this is basically one single pass.
So we basically go through the whole data at a single time.
But this would mean on the one hand hand we have many small partitions because we need
kind of a high fan out. We need to make sure that we have enough subtasks for our threads that later
will do the hashing and building the hash tables. And because we want to divide our full table,
like, I don't know, TPC-H line item table,
we want to divide it into cache-sized partitions.
So this means there's going to be a huge number
of individual partitions,
and that might actually be quite expensive.
So another approach is using a multi-pass partitioning and this is called Radix Join.
So meaning we're rather than splitting everything up in a single pass, we're basically increasingly
splitting up the data in such a way that we get the correct number of partitions.
And basically using this binary representation of the keys.
And because of this radix or this multipath partitioning of this binary representation,
we also call this the radix join.
And so we're looking at the different bits of the key,
just like we did in the art tree, right?
So we're starting by the most significant bit.
With the most significant bit, we can get two partitions, basically.
With the two most significant bits, we can get four partitions,
three, we can get eight partitions, three, we can get eight
partitions and so on.
We can do this in multiple passes until we have the desired number of individual partitions.
We can exactly say how many partitions will we have per pass, factors of by the power of two, right? So we cannot do
three, that won't work, but we can do four, we can do eight, we can do 16, 32, etc.
And so this basically means we're limiting the number of partitions per
pass to the number of translation look-aside
buffers.
That basically helps us to make the lookup much faster.
This again depends on your hardware, but typically this would be something like 64 or 512.
Modern systems or different chips like M1 has more larger translation look-aside buffer.
But using this, basically, we'll find the addresses much quicker
than if we have to actually look this up.
So basically, in the first pass, we'll split everything up
by, say, 512 partitions.
And then in the second pass, each of those again until we have the correct
number of partitions, which basically means the right size of partitions for our cache
size.
And having the translation look-aside buffer help us with the look-up, right?
Making this so much faster
will also compensate for these multiple steps
that we're actually paying extra for.
Okay.
And then, basically, once we have these small partitions,
we can actually do the same as we did before, right?
We can do the hashing, building the hash tables and doing the probing in the same as we did before, right? We can do the hashing, building the hash tables and doing the
probing in the same way. Okay, so the last five minutes I'll give the overview of the join and sort merge join. Who knows a sort merge join?
No? OK, cool.
Well, sort merge join, very simple.
It's a super cool join if your data is already sorted,
because then you basically skip the first phase.
So first phase is you're sorting both your input tables.
And the second phase, you're just
scanning through the input tables and the second phase you're just scanning through the input tables
and because they're basically sorted you can say okay i'm at key number five i'm on the other table
i'm a key number three i'm going to continue to key number five if there's a key number five
i have a match if i'm skip there's no key number 5. I'm key number 6. I'm continuing on the other table.
I'm at key number 6, for example.
I have a match again, so I can basically alternate between the two tables.
And I get, with a linear search through both tables,
I can get all the join results.
But I need to sort first.
So the data needs to be sorted.
So if my data is sorted, sort merge join is always the fastest.
Let me rephrase this.
It's typically the fastest.
If I have very small data, nested loop might even be faster
than having lots of complex code for something.
But if data is sorted, then sortMergeJoin is actually a good idea.
And typically, if the data is sorted, and even if the tables do not fit into memory,
we can actually just scan the data once.
So what this looks like in basically two phases is, well, first we have the two tables.
First we basically have to sort them.
And then we can just simply merge them.
And this representation should already show you, right?
So I mean, the white ones will basically match the
black ones will be imagine we can just scan through the two in a basic merge fashion and
we can also do this in kind of subsets right so if we know we're we're splitting this up here
here for example we're doing range partitioning here then we can do the merging also in parallel here.
For doing this in parallel, it's mostly about sorting, right? So the merging,
if we merge or if we have range-based partitions, merging is quite simple. I mean, you can
basically do this in parallel for all of the individual partitions.
But the sorting is expensive and it's kind of also hard to parallelize because there is communication that you need to do.
So, well, we basically want to do this as parallel as possible and we want to utilize the hardware as good as possible. And so one way is also to use
SIMD instructions. And you already know all about this, fortunately. So if we're doing a parallel
sort, basically what we're doing is again, we're splitting this up into sub parts. So we're basically sorting smaller sets of data and then again merging, right? So
basically this is what parallel sorting does. We're basically taking a small chunk of the data,
we're sorting this small chunk and then we're merging it with larger amounts of data. And again,
we want to be smart about it. We want to utilize the hardware.
So we want to merge, I mean, we want to sort within caches.
So we're basically sorting data that fits nicely
into caches because that's going to be fast.
Then we want to merge within caches
and that's called Bitonic Merge.
So we're building up one way of doing this
as a Bitonic Merge.
So we're building up a merge network and then is a Pythonic merge. So we're building up a merge network.
And then we need to do this multi-way merge.
So we basically have multiple partitions
that we're going to merge again.
And I don't have all the time today.
I'm going to show you the run generation in the sort phase,
and then we'll finish up the rest next week, which
is not much more than.
So in order to generate what we call runs, which
is basically this first subpart that we want to build.
So basically, we have unsorted data, we're splitting it up into
many small pieces and then we're sorting them. And this is what we call a run then,
because this is already a sorted set which then will be merged.
For this we can use a sorting network network. A sorting network is something that you can implement in hardware.
If you have an FPGA, for example, you can have a sorting network.
You can also build this in software by just having a fixed path for the elements.
And this is efficient because it doesn't need any branches.
Basically we're saying, okay, our data will always run through the sorting network, all of the data, all of the time,
just by basically comparing elements and swapping elements
if they're larger or smaller.
And in the end, I'll get the final result.
I'll show you in a second, basically.
And one way of implementing this is using min-max.
So say, for example, I have A, b, c, d that I want to sort,
so just four elements, then this would be say for example a sorting network. It's hard to read but
I'll show you there's basically 10 steps that I need to do and then with these 10 steps in y, x,
in w, x, y,, I will have the correct output
by just doing this min-max comparisons.
Basically, I'm always saying which one is smaller,
which one is larger.
And if they're already in the correct order,
they will be staying in this correct order.
If they're not, I'll be swapping them.
And this I can implement without any branching
with the difference or zero operator, for example.
So I can say if this is or not, if I'm basically
saying that the operator tells me if x is larger, then I'm going to get the difference.
If x is smaller, then it's going to be zero.
And because I can always calculate this, there's no branching.
This would just produce an output.
And if I do this with y, for example,
then getting the maximum, I can basically say, I'm going to get, either I'm going to get Y or I'm going to get X.
Or the other way with the minimum, I can, in this case, I get difference or zero.
So for X, if X is less, then I'm going to get y back.
No, I'm going to get 0 back.
If y is less, then I should get y back.
I'll have to figure it out.
On the slide, it's a bit hard.
But basically, believe me, this is a way
to implement min and max. And we can see there's no branching here, right? And this, by using this,
basically what I get is something like this network here, right? So I do these comparisons,
I actually can do in parallel, right? So the ones that don't depend on each other,
so E, F, G, H, for example,
I can do in parallel all four of them, these comparisons.
Then I can do Y, I and J, I can do in parallel.
And then again, I think I should be able to do
these last four in parallel.
And this is basically what you can see here, right?
So first I'm comparing these two and these two.
If they are already in order, they will stay.
If not, they will be swapped.
Then I'm comparing these two.
Again, if they're already in order, they will stay.
If not, they're swapped.
And finally, I only have to basically compare these two.
So in the end, I'm comparing all to all.
And I'm only swapping if I need to.
Otherwise, they will stay the same.
So after running through this network, I will have a correct output.
I don't need to do any branching.
And I can implement this in SIMD.
And this is what I'm going to show you next time,
because unfortunately, time is up.
Questions so far?
No?
Well, then thanks a lot.
Then we're going to talk about locking next time.
So this, I will definitely finish up this next time,
and I will have enough time for all the locking fun and this will be next week tuesday thank you very much