Hardware-Conscious Data Processing (ST 2024) - tele-TASK - Multicore Parallelism
Episode Date: May 22, 2024...
Transcript
Discussion (0)
Okay, so the number of students is decreasing.
As soon as we hit zero, I'm not going to do the lecture here.
Then you can watch the videos, just as a heads up.
So as long as somebody is showing up, I'm happy. Thanks a lot for coming.
If there's nobody here in the classroom, there's recordings from previous years.
This is mostly for the people enjoying this online.
Okay, so today we're going to talk about hardware-conscious data processing
and multi-core processing or parallelism in particular.
So here I have a nice picture of a Xeon Phi,
so one of the first many core processors.
And you can nicely see that there's lots and lots of similarly shaped processor cores on
here, I think up to 24.
And you can see there's not super exactly the same size, but more or less you can really
recognize the patterns.
And we've seen this multiple times. so that's how to use this efficiently this is what we
will talk about today and before we do so I have two announcements today later
in the afternoon we'll have a talk by Vasya or Vasilyki Kalavri from Boston
University about secure and expressive time series analytics
in untrusted cloud.
So trusted enclaves and stuff like this.
So that should be interesting.
It's not super relevant.
It's somewhat relevant to this lecture
because it's like a specific type of hardware
that's used in this lecture or in the talk
that we'll not used in this lecture or in the talk that we'll not
cover in this lecture. And then also or even more relevant to this lecture we'll
have a presentation by Gertz Gräfer, you might also have seen this in the public
announcements. He'll talk about modifying an existing sort order with offset
value codes. So Gertz is one of the well-established database researchers from Germany, not in
Germany, he's in the US forever and now at Google, been in different places before. And
we'll talk about one of his works later in this lecture already. You've seen
it two sessions ago. So the volcano style processing and the volcano optimizer. This
is what he did basically. So let's say all of the fundamental traditional query or query
execution plans. This is based on his work. And you can enjoy him in person on a Friday
at one in this lecture hall. And I would really appreciate if some of you can show up
because it's great to have somebody like him here give a presentation. And you can ask questions and he'll be there for a bit longer so if uh if we if you're interested
uh reach out then we can also have some like a longer q a session or something like this afterwards
okay so where are we uh we're close to finishing off the on socket um let's say, in core stuff.
So we're on the CPU still,
and now looking into how, let's say,
within the single socket,
how can we deal with parallelism in there.
Not so much cross sockets yet. Well well it's also kind of similar to that, but still within the package, let's say.
And we'll talk more about this in then also across different CPUs, right,
different sockets on the same system. Yeah, so and you can see we're basically halfway through.
So this is where we are. Next time we'll talk about the next task and this will be query
compilation. This will be on Tuesday.
Wednesday, we'll finish up with the multicore.
And then after the next two sessions,
unlocking and NUMA will actually go out of the package
into the peripherals, so storage,
all of the interfaces, et cetera.
So that should be, well, that's also fun.
Everything I like actually here.
Okay, so I'll start off with parallelism.
We've talked a bit about parallelism already.
So you know some of this already, but it's still, I mean,
this is really what it's all about in order to get this to high performance.
So we'll talk about this in more detail yet again.
Some parallelism basics, some scalability basics, etc. Intra and intra query parallelism. And then we'll start with joins
probably today and finish up with joins next time. So how to parallelize joins efficiently. And this is all based on my dear colleagues work or colleagues
presentations and lectures. Okay, so you've already seen this figure here,
right? So I've discussed this in length at the very beginning and several times
and you will continue to see this because it's really kind of the trends that we're fighting or working with. I mean for us it's actually good because we have more and more stuff
to do as long as things are changing and there is like changing setups. For people who want to use
their old hardware it's not so great, old software it's not so great. Old software is not so great because this really means we have to adapt.
So right now, this is basically what we're looking at. The number, of course, is increasing. As soon
as we're at the number, of course, where there's not just many different things
to do, like maintenance, whatever tasks,
and the single cores and single threads are not really getting faster anymore,
which is this here.
I mean, we get some performance, but not that much.
At this stage, we somehow have to employ parallelism
within our system.
We cannot basically just say, well, there's
different things anyway, right? So I have like a user that somehow needs to be
dealt with. I have some other maintenance tasks that the OS does, etc.
So at a certain point, we need to have so much parallelism that we have to
program for it. This is what we're looking at. And this is an example of an older system.
This is already NUMA, but here you can basically see,
we can get like an ice lake architecture.
We can get up to eight CPUs, each with up to 40 cores.
So how much does this make?
240 cores.
So quite a bit of parallelism that we somehow
need to deal with.
So this is not going to be like different tasks
that we are doing.
And like a significant amount of RAM, right?
So up to six terabytes of RAM.
If you're adding persistent memory to this,
then triple that.
So this is like, or even more than that.
So this is basically the amount of data
that we can work with and the amount of threads
that we can work with.
And this is somehow what we have to program against.
Okay, so the question now is how do we utilize this, right? So we say we
have up to, I mean, 240 in this older generation, so you will get like tens to
hundreds of course in a system. How can we utilize this efficiently? And well,
this means we can have multiple tasks to be running concurrently. So this is some kind of concurrency or multi-programming.
And this is kind of the easy way of doing things.
In a database system, this means inter-query parallelism.
So we have multiple different queries and we parallelize.
We basically just let them run in parallel so each query uh individually
can or cannot be run in parallel or like can have some parallelism inside or not i mean of course we
can still do whatever is in the core right within the single core we will have some parallelism
if we're not making things very stupid or very inefficient then the CPU will try to do it
and the compiler will try to deal with this anyway however was like if you just program it
single threaded per query then this will stick to one core and be executed in a single thread
still an easy way of getting some parallelism, sort of easy way, is just having
multiple queries at the same time. And this means basically we're increasing the throughput of the
query stream. We're not increasing the performance or the latency of a single query. So a single
query is the same if we just run it isolated or if we run multiple at the same time
unless we're overloading the system.
So if we're running, like, say we have 10 cores,
if we're running one query, it will take X amount of time.
If we're running two queries at the same time,
hopefully, if there's not too much contention on certain things,
it will also take X for each of those, right?
Same kind of. But,
instead of running one query in x amount of time, we run two queries in x amount of time. So,
the throughput gets higher, the latency stays the same. And this is called interquery parallelism,
because it's parallel in between these queries, not within a query. And the opposite, or let's say the more fine-grained
parallelism, is basically implementing or parallelizing a single task, a single query.
So in this case, and this is where we need parallel programming. So somehow we have to
split up the work that a single query does in multiple smaller items of work,
maybe need some kind of communication in between, and then we can execute a single query in parallel
either intra operator or inter operator. So we can basically like if we think about
pipeline parallelism for example this would be inter-operator, right?
So we can parallelize just because we have like a long query
with individual operators.
We can parallelize these operators, let them run in parallel.
Then we get pipeline parallelism,
but each operator itself is still single-threaded,
so not parallelized.
Or we can actually parallelize each individual operator,
so getting even more parallelism there. Then we have intra-operator parallelism.
And this is basically, we're decreasing the response time of a single query.
And this is important for OLAP queries. So as soon as we're looking at very large amounts of data,
very large queries, then this is where we want to somehow make use
of the parallelism. So in an OLAP setup, so an online analytical processing setup,
the assumption is we have few but large queries. So we process a lot of data. We do some analytics
on the data. So this means it takes a long time and somebody is waiting for the answer for this.
So this is supposed to be fast, right?
So we want fast response times.
And for this, we want some parallelism.
And then OLTP setup, the assumption is
we have small updates and requests.
So just accessing a few individual records in our tables.
So there, while it also might make sense to parallelize, or while it also might give us
some speed up, the assumption is, since we're just touching a few records, probably this
is fast enough, so we don't necessarily need to parallelize in there.
Okay, so, but the general idea is somehow
we want to split the queries into smaller tasks
or somehow split the query workload into tasks
and execute these tasks in parallel on multiple cores.
This can be done with inter or intra-query parallelism.
So we need from a parallelism point of view, we need multiple tasks that can
somehow be put on these multiple cores. Okay, and there's a few basic concepts. So one concept is
work partitioning. So we want to somehow express the parallelism. So we're splitting up the work that needs to be done in parallel tasks.
Or this is also known as domain decomposition.
So if you ever do like an HPC lecture or some HPC work,
this is what you're going to get across.
And HPC is all about partitioning and scheduling.
So if you have a large supercomputer,
or if you want to do some AI stuff later on,
and you want to be efficient about it,
training your neural networks, et cetera,
then it's all about basically partitioning the work
and scheduling it efficiently,
so that your GPUs will run with work all the time,
and you have different kinds of tasks
that you can parallelize.
This is kind of the scheduling, already said that.
So somehow we have like the different work partitions,
and then these work partitions need,
or the tasks need to be mapped on the execution context.
And of course, this is really different
depending on the application.
So meaning if we have a long running query,
we can schedule more freely.
If we have short running queries,
we need short latency, right?
So in OLTP setup, this means we really need to make sure
that the scheduling doesn't take too much time.
So if this is an offline setting, so an HPC setting,
often, or we have analytical tasks that run overnight,
often we were very flexible to come up
with a very nice bin packing architecture and schedule that
somehow optimizes how all of the resources are utilized.
But that takes time.
In an online session, we just want to make sure that everything gets scheduled as fast as possible
and nothing gets lost in between, right?
So nothing starves.
Yeah, and then there's, of course, the task granularity.
So this is the amount of work per task.
If it's too little, then scheduling a task, like doing all the scheduling,
et cetera, the partitioning, et cetera, takes too much time.
So there's a lot of overhead.
If it's too much, if we have very large tasks,
then the load balancing will get inefficient most likely.
So some tasks will run longer, some tasks will be faster,
and we'll have some resources that are underutilized
and other resources which are overutilized there.
So we're waiting a lot.
And of course, especially in the database context,
we're always about correctness, right?
So all of our queries need to be correct.
So that's kind of the contract with the database system is that we have, like, we write a SQL
query, we have the database, and however you execute this, whatever algorithms you use,
et cetera, the result should always be the exact same.
So given a certain order of transactions, order of queries that are sent to
the system, we always want to have the exact same results and all of these results should also be
correct. Although you'd be surprised how often this is not the case. So if you think about the
TPC-H benchmarks, et cetera, for the very large scale factors, the results are somewhat deterministic,
let's say. So these are also not necessarily always correctly checked. So every now and then
people should come up and see, oh, there was actually a bug here or there.
In very large data set, often it's not super harmful, but it somehow breaks this contract and this idea.
So in the basic assumption or let's say the basic contract says we always get the exact
same result.
And that's also what should happen.
Another source is actually there's lots of small hardware bugs, right?
And this is also every now and then people figure this out.
That's yeah, there's many small problems here and there in these large-scale systems,
salient errors in the CPUs, etc.
This you can only see if you're operating at cloud scale.
All of a sudden you will find, oh, there's some problem here or there.
And there's lots of stuff that people do in order to somehow figure this out
and work against it.
OK.
So what we want with parallelism or with running our stuff
in parallel is somehow scale or make our workloads scalable,
our execution.
And this is an overloaded concept, right?
So scalability, there's lots of different ways what we can think about if we say scalability.
So somehow what we want is to figure out how well does our system react to increased load
or to increased resources, right?
So what happens if we give more load to our system?
What happens if we give more resources
to our system essentially?
And there's something called speed up.
So this is basically how much faster is my query execution
if I add more resources, right?
So what if my CPU has two times the number of cores,
for example, with roundabout the same kind of per thread
efficiency, same kind of frequency,
or some kind of per thread throughput?
Then our speedup is basically given the new hardware,
the increased number of cores.
How much faster will this be?
I mean, this is very important in cloud setups, of course.
What if I run it on one machine or one virtual machine
versus two virtual machines versus four, et cetera?
So do I get two, four?
Probably not.
So usually this is a bit lower than the number
that we would have, like the new resources.
So ideally, this would be a linear function.
If we have two cores, then we get two times the performance,
four cores, four times the performance, etc. This somehow works well with this interquery parallelism, assuming that we constantly have
load, which also does not necessarily happen.
Also we don't necessarily want this.
We don't want to overload our system continuously, making sure that we have a long queue of queries all the time.
So usually, the speedup is more from a theoretical setting.
If we have the same data set size, the same problem size,
this is what we call strong scaling.
And initially, for me, this was somewhat confusing that we call this strong scaling,
but it's actually the harder problem.
So if we have one gigabyte of data
and say one large query,
we run this on a single core,
we run this on two cores, on four cores,
on eight cores, et cetera.
We should, if we have a linear function,
then we have perfect strong scalability
because basically we need to split up the work
in ever smaller portions and still be able
to get the same kind of throughput.
In contrast to that, we have scale up,
which is basically what if we have more load, right?
We have the same kind of problem, but we're just increasing the problem size.
In essence, basically having, say, instead of one gigabyte, we have two gigabyte, have four gigabyte, have eight gigabyte.
And our query actually touches two, four, eight gigabytes, right?
Or instead of 10 queries, we have 20 queries,
40 queries, 80 queries, right?
Same kind of idea with maybe an increasing data set size.
So here, basically, let's say the throughput
or the latency should stay constant, right?
So if we have one large query that touches the same amount
of data or the same ratio of data in like one gigabyte
in two eight gigabytes, so say, I mean,
we're doing a complete scan, simple basic examples.
So if we completely scan one gigabyte, two,
four, eight gigabytes on one, two, four, eight cores.
Ideally, the amount of time should stay constant.
And that's called weak scaling
because by scaling the problem size,
typically we get more opportunities for parallelism.
So we can basically more fine-grainedly split up the work.
And then ideally, or this is what we would call a scale up.
So we can nicely scale up the system.
We can just shoot more work at it.
And this is also what we often need in practice.
So in practice, in many cases,
we don't need to get like ever faster performance, but we need to make sure that
we sustain an ever increasing load.
So we have more transactions that need to be run.
We have larger data sets that need to be executed.
Then this is basically what we want.
And well, I've basically mixed the two already.
So the scale up is basically using larger machines.
Scale out is using more machines.
So sorry about this.
Again, it should be the same.
I mean, it's basically the same idea,
except for instead of using one larger machine
with more memory, more cores,
we're using many more machines.
So using more VMs, that would be the scale out,
scale up is larger machines.
And there's always this kind of back and forth
between the two.
So if you look at cloud for a long time,
people were just like investing in scale out.
So making sure that our systems are ever more scalable, like adding more VMs should be faster.
But this has a certain limitation, right, in terms of just how many VMs can we get, right?
So at a certain point, you hit the limit of number of VMs
within one cloud region.
So if you're at 10,000 VMs, all of a sudden,
everything gets thin, maybe 100,000.
But then there is a physical limit.
So bringing it back to more highly integrated systems
back again, like the scale-up systems,
then all of a sudden makes sense.
And there's also something that has to do with efficiency, right? So scalability, people often confuse performance and scalability with efficiency. And that's not true, right? Also,
like just because I have one algorithm that's faster than another, it doesn't necessarily mean it's efficient.
Because if I need a much larger system to do this, or have much higher constant overhead,
I'm not efficient.
Right?
If my problem size is small, then maybe running this on a single thread might actually be
much more efficient.
That's just important to keep in mind.
So a single thread or a single node
is often surprisingly fast.
And there's a nice paper about this
that was published by Frank McSherry and friends
at HotOS in 2015.
So here, this is on graph processing.
So here, basically, he like a Twitter and some other kind of click graph data set.
One was 5 gigabytes, one was 14 gigabytes.
And he looked at all of the graph processing frameworks that were out there at the time.
So there was many publications about how to process graphs on large numbers of cores and nodes in a scale-out fashion.
So you have SPARC, Girafe, GraphLab, GraphX, etc.
This is 2015, so this means 9 years ago. Today these systems are more efficient.
Still, it shows you something that's quite interesting.
So this is basically numbers that were reported.
Let me find my pointer.
So here in Spark, we have 128 cores.
I think it was, I don't remember,
a certain number of machines.
So it's a scaled out system with each node
having a certain number of cores, so eight nodes or
something like this.
And you can see how long this took on the two datasets, right?
So for Spark, Giraffe, GraphLab, so in the hundreds, so eight hundreds to thousands of
seconds for these datasets.
And these datasets, I mean, they're not small,
but they're also not too large
to not fit on a single laptop, right?
So what Frank did is basically took his laptop,
implemented the same algorithm in a reasonable,
but single threaded fashion, and ran this on his laptop.
And like the initial run, just running this on the laptop.
And at that time you wouldn't have that much memory in your laptop.
Like the first run was 300 seconds and 600 seconds.
And with a bit of optimization or a faster SSD, I think it was, he got it
down to 110 seconds over on a single thread in contrast to 128 threads.
And this is something you should keep in mind, right?
So if you're doing your own scale out, scale up, whatever solutions,
run it against a good baseline,
meaning a smart single thread implementation,
and that's what you have to beat, right?
So, and usually two threads like that,
you won't necessarily get a speed up in the first place,
just because all of a sudden there's so much overhead
in scheduling, et cetera.
Or if you have a good single node implementation
and your workload fits on the single node,
with two nodes, it's unlikely,
unless everything is embarrassingly parallel,
it's unlikely that you actually get
a good performance improvement.
Graph is a bit special because of course,
if you have a large graph, right,
there's lots of communication that you need to do.
I mean, inherently there's like, you cannot nice,
like at least not fully partition the workload,
but this is kind of like something where you can see,
okay, this is not like, I mean, 128 versus one thread, and we're still faster.
So this is orders of magnitude more efficient, even though these systems are scalable, right?
So if you give them like 256 nodes, most likely they will be again faster. So they claim scalability, but still they're not
going to be as fast as a single thread. So they're scalable, but the baseline is just very bad. And
having a bad baseline makes scalability very easy. Okay. So let's talk a bit more about speedup.
Again, so this was just an excursion, because I think it's very important to keep that in mind.
So to be smart about the numbers that you see, think about what you get.
Like we did this back of the envelope calculations, so this is something that I always try to do in order to check, does this make sense, right?
So, and I mean, we also see this with all the installations,
like all the software that we're using
in the data center, et cetera.
A lot of it is just really, really slow.
And then you can think of it and then see,
okay, there's lots of things that you can do
already on the application level to fix things.
Okay, so our speed up is basically if we have a sequential execution time and a parallel execution time so how off like what's the ratio in between so how much faster is the the parallel
execution compared to the to the single threaded or sequential execution.
Ideally, as we said, this should be linear.
So if we have two threads, two cores, two CPUs, two VMs,
then we should get like half of the execution time, meaning a speedup of two.
So that would be perfect.
Usually we have a sublinear speedup.
So this is due to Amdahl's law.
We'll talk a bit about this in one way, then some other things where we basically lose
some time on the way due to some communication, etc.
So this is why we typically have a superlinear speedup.
However, often we also see a superlinear speedup.
So that also happens.
Anybody have an idea why we could see that?
So basically I'm using two threads or two cores
and all of a sudden I'm faster than with one.
We already have an idea.
Also, access to faster memory.
JAN-FELIX SCHWARTZMANN- Access to faster.
So what do you mean by access to faster memory?
Maybe an I-O limitation.
JAN-FELIX SCHWARTZMANN- Yes.
Yeah, it's the right direction.
So the answer was it's access to faster memory.
But if we have two threads or two cores,
why would that give us access to faster memory?
Because we have more caches.
Yes, exactly.
So this is usually due to caching effects.
So typically, if we have strong scaling, a strong scaling problem,
all of a sudden, if we have more resources,
our problem might fit into caches.
Initially, it might just fit into RAM, which gives us a huge boost.
So rather than having everything on disk or having to access disk every now and then,
we can fit everything in RAM. so this will basically make everything much faster
then it might fit into caches makes everything much faster it might flick the address translation
all of a sudden might fit into the translation look aside buffer? So this will make everything much faster again. So basically as soon as our
problem size per core shrinks or per unit, whatever, things might fit into caches, we get
super linear speedup. So, and that's possible. It's not the usual case. I mean, if you have an arbitrary large problem size,
you stay within the same kind of hardware ranges,
like you're not all of a sudden skipping certain levels,
then your speedup will not be super linear.
If your speedup is sublinear, typically you might have
or you have some problems.
So some parts of your problem might not
have enough parallelism.
And well, if there is something in your program that's
inherently sequential, you will always
end up with a sublinear speedup at a certain point.
So that's just always going to happen. The parallelization itself has some overheads,
just establishing everything, setting up the context, et cetera, some kind of synchronization communicating with each other so that costs
you some time and well then there might be architectural limitations right so you like
on a single thread you get like the maximum bandwidth of like to memory that you can achieve
this will linearly grow to a certain number of threads. You can still get the maximum memory bandwidth just by using more parallelism to the memory.
But this stops at a certain point.
And this stops before you're actually reaching, typically, before you're actually reaching the maximum number of threads.
All of a sudden, your memory will be limited.
Your memory bandwidth will be saturated,
so you're not getting any more performance
just by adding more threads.
Your speedup will basically flatten out, as an example.
And similarly with other kinds of things in there.
And there's Amdahl's Law.
Who's heard of Amdahl's law? No? Okay, very good. So we can talk
about it. So if, I mean, we can basically split up a program from an abstract point of view
or a task into a parallel in a sequential part, right? So say, say for example we need to synchronize,
that's something we need to do sequentially. If we just read data from memory for example,
that's basically something we can do completely in parallel. There's nothing like we need to do
one after the other. If we need to summarize, so we do an aggregation over a whole data set, then
we can do individual aggregations in parallel. In the end, at a certain point, we'll need
to summarize everything to a single value, and this cannot be done before the other stuff.
So this is not completely parallelizable, so we can split up the work in a parallel portion
and in a non-parallel portion, again,
from an abstract point of view.
If you have different kind of algorithms in your program,
basically some could be completely parallel,
some could be sequential, then you
might be able to do a clear cut here.
And depending on the amount of parallel portion
and non-parallel portion, you can determine the maximum
achievable speedup.
And this is basically the maximum achievable speedup
is basically saying we're pushing down
the parallel portion,
like we're endlessly parallelizing the parallel portion
such that this will take no time at all anymore, basically.
Because we have unlimited parallel resources,
this is unlimitedly fast, so this is basically going to zero.
And then what remains is the sequential portion
of the program that will always kind of need
to be run sequentially.
And say we have a 50% parallel portion of the program.
So this means 50% we can arbitrarily parallelize.
The rest we cannot parallelize.
This means our maximum speedup we can everarily parallelize, the rest we cannot parallelize. This means our maximum speedup that we can ever achieve is 2.
That makes sense, basically, because the 50%, the first,
let's say the first part of the program, the 50% are arbitrarily parallelizable.
We put unlimited amount of parallelism or resources to this.
The amount of time this or resources to this.
The amount of time this will take is basically approaching zero.
We remain with 50% of the workload that needs to be done sequentially.
That will still be there.
So our speedup is just like the runtime will be half
because the rest is basically already processed
or is processed in zero time.
So the speedup is two.
And I mean, this is not really great.
And also if this is like 50%,
like in everything is like the 50% that are parallelizable,
are perfectly parallelizable, we get a 50%,
like we get a linear speedup on that,
then say if we add two,
or we take two nodes to parallelize this,
or two resources, whatever, right?
This means this will be basically half,
take half of the time, so our speedup won't be two,
but 1.5, right?
So we're basically, because this part will be twice as fast,
the rest will still take the regular time.
So in this, you can basically directly calculate how fast or what our speedup will be twice as fast. The rest will still take the regular time.
So in this, you can basically directly calculate
how fast or what our speedup will be.
But we're very quickly,
we're not getting any additional performance improvements
if we're just adding more resources here.
But similarly, we can do this for 95%, right?
So if we have 95% of parallel portion,
then we can only approach a parallelism of 20.
And I mean, here you can also,
here we have the, basically if we calculate this, right?
So the speed up, you can calculate with this formula
if you want to.
And I mean mean if you have
95 so five percent will be sequential which is like 1 20th of the workload so the maximum speed
up that we can get is 20. um so uh we're approaching this like initially we have like a
good linear speed up but very quickly we're basically approaching this, like these 5%
become really critical and we need quite a number of resources to ever get like close
to this, right?
So you can see if we want like something like a speedup of 18, then we're in the 128 nodes
already, or even beyond that. So 256 is where we would get this
if the parallel portion is perfectly scalable.
We can get perfect speedup for the parallel portion.
So just to know something, but yeah, so this
is for if 90% of the work is paralyzed,
maximum speedup is 10.
And this typically holds for strong scaling,
but not necessarily for weak scaling.
And this is also important.
Because if we would basically stick to this,
then at a certain point, well, if we
say this is the so and
So algorithm, we can have like 95% parallel portion.
Then, i mean, scaling beyond or using more resource really
Doesn't make that much sense anymore, right?
But typically, as soon as we have larger problem sizes, we
Can actually find more parallel portion, right?
So we can actually, it makes sense to parallelize
other stuff than before.
So just by redesigning kind of the way of the execution,
by further parallelizing the sequential part,
we get more speed up here.
So think about this kind of aggregation example.
If we do this over way more data,
then the sequential part actually decreases,
because we have much more part that
can be run in parallel.
If we don't get this kind of speed up,
or we're kind of stuck somewhere,
then often a problem might be like we
are a non-scalable algorithm.
So say searching a tree, if you're doing it,
let me ask you, so what can we parallelize here?
We actually talked about this in the SIMD part already.
So if we have a certain algorithm,
there's ways we can run it in parallel or more easily run it in parallel,
and other ways might be not as parallel.
So think about the search tree.
What is easier, breadth-first search or depth first search.
Depends. Think about the SIMD part.
How did we parallelize this one? I'll let you think about it.
Let's do it open.
Then often another problem is load imbalance.
So if we cannot perfectly split up the work, right?
So I mean, just scanning data usually is easily parallelizable.
We can just break up the data into equally sized chunks,
and we're scanning these data parts, these data parts in parallel.
Other things might not be as easily parallelizable.
So we have some kind of skewed distributions and we're running some filtering on that and
certain parts, like all of the tuples match other parts.
None of the tuples match, we end up with some portions of the data set that are basically
quite, or some tasks that have much more to do than others.
So then we somehow need to rebalance,
we need to reschedule, et cetera,
so in order to not have a completely imbalanced load.
Because if some resources are much more utilized than others, then we don't get our perfect
speedup again, or we might not get a speedup at all.
This is like in MapReduce, a typical problem is in the Map phase, usually super parallel,
the Reduce phase, if everything, all of the data
ends up in a single reducer, just because of a skewed load,
well, then we're screwed, right?
So the skewed phase is sequential, essentially,
no parallelism there at all.
And large scale or very fast, not fast,
like let's say scale out execution engines
often use this MapReduce-like kind of execution.
And then, of course, the task overhead can be a problem.
If our tasks are too small, so we end up with one, two tuples
or even a few tuples within each individual task,
well, then just scheduling the task, starting the task
might just become way too much overhead.
Okay, so with this,
let's do a break here.
Four minutes break and then talk about how we can do this parallelism, use this parallelism in a database.
Efficiently, hopefully.
Questions so far? Okay, parallelism in database management systems. So we talked about parallelism so far, now let's see how we can parallelize database
workloads. I touched on that already. So there's inter-query parallelism, meaning we're running multiple queries in parallel and it's i mean if we want to
do that the first thing we need to have is multiple queries in parallel if you have a single query
at a time interquery parallelism doesn't give us anything right so this is basically, think about your smartphone.
So you have databases on your smartphones,
typically SQLite.
If you're the only one actually using this,
inter-query parallelism won't give you any parallelism
on this database.
Although I think SQLite is anyway single threaded,
but just as a comparison.
So this means we need a sufficient number
of co-running queries all of the time,
as long as we like, if we want to actually continuously,
yeah, utilize parallelism in one way.
This typically works well in OLTP workloads, which
usually have many simple queries, because then an individual query doesn't need to
be parallelized, just because like an individual query is basically fast
enough, if we're sticking to just interquery parallelism. And data
analytics and OLAP are typically resource heavy,
meaning we do lots of computations also on the data.
We read a lot of data and their interquery parallelism
or just interquery parallelism won't help us
with an individual query.
So that's probably not what we want.
What we want then is intraquery parallelism.
So this is basically in an OLAP setup.
We definitely want to have this, right?
So our system has multiple cores, has multiple CPUs, maybe multiple machines.
So this basically needs to be parallelized in one way or the other.
However, we still also want to be able to run
multiple queries in parallel, so in a database system. Think about your MapReduce system again,
or Flink, or whatever. These systems schedule an individual query. Of course, they have a resource
manager, etc., that should do that for for them but if you just have your cluster and
you just have your scale out system on there it will try to utilize the full cluster just for the
single query and nothing else can be run at in parallel unless you're kind of keeping some
resources on the side that's also not good for an olAP setup right so as I mean assume each query takes a couple of
seconds or a couple of minutes if you have 10 users that are doing stuff in parallel all of
a sudden like if we're in the couple of seconds range all of a sudden we're in a couple of minutes
range until our individual query is scheduled which is is too slow. So typically we want to make sure,
also in terms of efficiency again,
that we can run stuff in parallel, multiple queries.
So in order to do this, we need to program for this, right?
So there is no way around this.
In order to get parallelism on the single query level,
we need to run.
And also on the multi-threaded level,
so out of a single core, this is a manual effort.
I mean, unless you have some kind of library that
does this for you, but still you need to use this library.
So you need to program against this.
And there's different constructs that you can use in order to actually address multiple cores.
So if you're on a single CPU, then either you can use multiple processes,
you can use multiple threads, or different, let's say, levels of threads.
So there's kernel threads, there's user level threads, and there's fibers, all different
kind of abstractions.
And the process is the most heavy one.
So this is basically meant for isolation.
If you have like different programs, they can basically have, they have their own address space,
they have their own, we talked about this virtual memory,
they have their private memory basically, there's no interaction,
you have additional communication if you talk across different processes.
Essentially, a process gives you your own virtual
computer. It's a nice abstraction because all of a sudden you're not getting in
the way with other processes, right? So you can schedule multiple queries in that
way, but it's scheduled by the kernel. It's fairly heavyweight. A thread, on the other hand, is like an abstraction or a control, a unit of control
within a running program, so within a process. So this means it's more of a virtual abstraction of
a processor. Again, individual threads are scheduled by the kernel, but they
share the same memory. And that's a very different concept, right? So, think about
virtual memory again. If you have two processors with, not two, not two
processors with different virtual memory, the translation works differently, right?
So, the addresses, you might have like different addresses
for the same kind of memory.
You might use the same kind of memory with different addresses, etc.
Within a thread, you can basically reuse this, right?
So within multiple threads, you can basically access the same thing.
You will have better cache locality,
etc. And you don't need to switch in between these different virtual memory tables, etc.
So, this is all already fixed for the same thread.
There's also user-level threads. So, a regular thread that the OS gives you, the OS will also take care of scheduling, et cetera.
And it might also take care of this
at times when you don't want this, right?
So all of a sudden the OS thinks,
well, now I want this other thread to run here,
or I have this additional task that every now and then,
like cron jobs, whatever, or some other stuff
that the OS wants to do every now and then, file system
management, et cetera, it will just schedule this by itself.
And it will maybe unschedule your thread
at a certain point in time.
Your thread is still safe, but it might just not be running
at the time when you want it.
So for this kind of stuff, you have user-level threads
that are invisible to the kernel,
so the kernel cannot really do anything about it,
cannot interfere with your scheduling.
So you then can basically make sure that the scheduling,
you can still get this parallelization,
but the kernel won't basically interfere with your scheduling.
And then there's also something called fibers.
I've actually never used this.
This is like library supported where basically the fibers will use themselves if another
fiber is run.
So this is basically very cooperatively.
They run inside threads one at a time.
So a single thread can have multiple fibers, but there will always only be one fiber running at a time.
And this is basically yet another level of abstraction
to have multiple tasks running within a single thread.
So you can, of course, also program your way around this.
But this is basically a library abstraction for this.
Yes, there's a question.
.
Well, it's just an abstraction.
So basically, you have a scheduling abstraction
for running multiple tasks within a thread.
And they will basically, if you program against fibers,
you run an additional fiber, the other fiber will be stopped.
It's not like you don't have to deal
with this kind of abstraction yourself.
And it can basically run again, et cetera.
So it's an additional thread-like construct
within the thread.
Again, I have not really dealt with this.
But it's yet another abstraction to get more fine-grained
scheduling primitives.
Now, given, of course, we need one process at least.
So I mean, we can only have threads if we have a process.
So the process basically gives us this computer or whole system
abstraction with our virtual memory, et cetera.
So we'll have one process.
And now we have to think about what's
our abstraction for this process?
How do we use it?
And there's different ways and classically in
older systems you would have like one process per DBMS worker. So basically all
database workers are directly mapped to OS processes,
which is kind of heavy weight, right?
So you have like every,
like they're completely separate individual processes,
et cetera.
And again, the question is what does an individual
will come to this?
What does an individual DB worker do?
So there's different things.
But say, for example, think about executing a single query.
That might be a typical task.
If we're parallelizing inter-query
or we're using inter-query parallelization,
then each database worker gets its own query, basically.
And the heavyweight way and traditional way of
doing so is basically one process per each of these workers and then we directly have each
os process separately so and os will then do all the heavyweight management and scheduling for this.
Then we also could have an OS thread per worker.
So this is basically, we have one single process
that have all of the threads within
and all of the database workers within.
And then say, for example, we have a dispatcher thread
that listens in to new connections.
So meaning new user interactions,
some kind of new queries coming in,
and then each connection could be allocated a new thread,
or we might have some kind of worker pool. Or we can have directly
database management threads, so user space threads which are replacing these OOS threads.
So for this, we then have a faster switching within switching in between the threads.
So if we have an OS thread, the OS will basically take care of switching
and deploying the individual threads to the CPU.
So if we have hyper-threading or we have more threads
than we actually have on cores or hyperthreads on our system,
then the OS will basically need to do the scheduling for us.
If we have DBMS, so we're implementing our threading
constructs ourselves, we'll just have basically OS threads
permanently mapped, and then we basically schedule stuff
ourselves in there.
But of course, we need to replicate some of the OS logic for
the task switching, for the threat stage management, for the scheduling, etc. So this might be
faster but needs more programming. And I don't have it here, but there is a trend or some
research going on right now where people actually say, well, if we're starting this, right,
so why do we need the OS at all, right?
So if we're actually in a cloud kind of environment
or like a single server setup
where we're not doing anything except for our database,
and this is, in many cases, this is the assumption, right?
So we have hardware or let's say some kind of resource
which is exclusive to the database.
Well, then maybe we don't wanna replicate everything
that the OS does, but we're just gonna wanna replace
everything that the OS does with only the logic
that we need.
But that's maybe stuff for another lecture at some point.
Okay, so if we have multi-threaded architecture,
it has advantages. So there's less overhead per context switch.
So we don't need to, for example, the address space.
So this is the major thing, actually.
So switching the address space means basically doing a lot of maintenance work
within the OS in order to change all these tables, the virtual memory address space, etc.
So if you think about what an OS typically has, per process you have an address space,
you have global variables, we have open files, child processes, alarm signals, handlers, account information, etc.
Per thread, you have the program counter, register, stack, and state that you also have
per process, but you have it per process per thread.
So if you're just running like processes,
basically this means you have all of the things
like the thread and process stuff.
If you're switching in between processes,
then we have to switch all of this.
If we're switching in between threads,
we only have to switch this, right?
So we have to basically fix the registers,
the program counter, the stack and the state of the current
thread, but not the address space, not the global variables, not the open files. This is all stuff
that the OS needs to basically fix if we're switching in between processes.
The only downside is here on the bottom. So if we have one single thread that's not behaving very well,
a single thread basically can kill our process.
And if we have a single process database,
then we have no database after a single thread basically crashed.
Because that will kill the whole database.
This is actually also a problem.
There's in-process databases like SQLite or DuckDB.
If they have a problem, they will also basically
crash the whole application.
If you have like a separate process with your database,
if the database has some issue, well, the database process
might crash, but the application is
still there, so you can maybe revive this.
As soon as everything is interleaved within the same process, well, everything breaks.
So you've got to be much more careful if you're within the same kind of process.
Okay, so now let's talk about scheduling.
So scheduling is the process to distribute tasks, and the question is how do we distribute
the work, right?
So which thread executes which work?
And DBMSs typically manage this themselves.
So often there might be a dispatcher thread that assigns tasks
to workers, so meaning that we have a push model, so we have many tasks, and then we have one thread
that basically says, thread number one, do this, thread number two, do this, etc. So that's why
it's called push. So the dispatcher pushes this work, and this is true for DB2, SQL Server, MySQL, Oracle, HANA, etc.
The other model would be pull-based, meaning we have many workers and the workers do their work
and then they say, give me work, right? So this is a pull model. So thread two did its job, then it pulls yet another work out of the work queue.
And if you have a dispatcher, it's a bit more complex than for a web server, but this is basically what a web server would look like. And I mean, you can think about it in a similar way,
because I mean, of course, the database management has more tasks to do, more management internally in terms of locking and managing multiple accesses because we have interaction in between users.
On a web server, we're assuming we're just sending back some information.
So this is basically read-only in terms of like sending the information.
Well, I mean, these days, of course, this gets more complicated as well.
But if we have a dispatcher thread, so we have kind of our web server process.
If we have a new network connection coming in, then the dispatcher thread will basically take one of the worker threads and tell it, okay, please answer this request.
The worker thread will then basically load the information from the web page cache
or recalculate whatever is there, for example, and then send it back.
And if there's too many connections, well, then the dispatcher thread will send back something
saying too many connections, well, then the dispatcher thread will send back something saying too many connections right now.
And in contrast to that, the database management has to decide where, when, how to execute
it in an optimal manner in like a web server environment, hopefully, unless we have like
very complex applications.
Hopefully, this is very homogeneous.
Like every web page access basically hits the web page cache
or computes the website.
Might be differently complex,
but here we have data in different portions,
different parts.
We have to optimize this, et cetera.
The queries might be very different in terms of size.
So if we have an HTAP system, so hybrid transaction
and analytical processing, we might
have some queries that are OLAP style, very small.
We might have some queries that are very large.
So we need to figure out how much resources
do we give to this. where should we actually store this,
etc. So this is some stuff that additionally needs to basically be figured out.
And here then, and this is something like an HPC person would say, right, scheduling is key,
like it's the key in terms of efficiency and parallelism.
And of course, I mean, it's one of the things that need to be done efficiently.
But many systems will use this kind of like a pool-based way or doing so.
So let's keep it dynamic, essentially, especially if we have like a very dynamic system, then
dynamic scheduling basically is a key
to efficiency and parallelism.
We already briefly hinted on that,
so there's a push approach.
Basically, we have a central dispatcher
that pushes the tasks.
This means there's global control on who gets what,
or we have a pull approach, where we basically
have some kind of work queue, which then would be filled
by some kind of dispatcher threat,
and the users, the workers basically just say, OK,
what's going on next?
What can I do?
So we might also have different kind of queues,
depending on where to schedule it, etc.
And this is kind of decentralized and somewhat more dynamic.
So some systems take push, some systems take pull approach.
And it really depends on the kind of workload, which one works better.
And then, of course, we need to think about the worker allocation.
So if we have multiple workers, which we need we need to think about the worker allocation.
So if we have multiple workers, which we need in order to get parallelism,
we can either say we have one worker per core,
meaning one thread or one process per core,
then typically one thread, of course, so we can pin that with scheduling set affinity or p thread set affinity np.
So we can actually directly pin a thread to some core and then say you stay there forever and do your work there.
And that's good because there's no scheduling overhead or we have clear resources for something, we might have some good cache
efficiency if the thread stays there and does similar work all the time. But we might also
not fully utilize the core very well if the thread only does a certain thing all the time.
So instead of that, we can also have multiple workers per core. It's kind of the same as before.
We're still scheduling the thread or pinning the thread to a core.
But maybe let's pin multiple threads to the same core.
And with that, we might get better use of the functional units, right?
Thinking back on how we basically execute within a single core if we have a single
thread running on the single core that just does scanning for example it will not fully saturate
all of the functional units on this core right so the core will do work so it's not going to be idle
but it's not going to utilize all of the functional
units efficiently.
So it will basically have probably a lot of stalls because of loading data from memory,
while all of the logical units and SIMD units are maybe not doing anything.
So because of that, leveraging the hyperthreading, especially if our workloads are somewhat heterogeneous,
makes a lot of sense.
If we're doing scanning across everything anyway,
it doesn't make a difference, right?
So because then if I'm loading more
than a single thread there, then it's not gonna be more,
we're just gonna do more task switching,
like thread switching,
we're not gonna to do more task switching like thread switching we're not going to get any better
performance but if they utilize different kind of functional units we we can better utilize through
the internal reorganization within the cpu within the core right remember the there's like a queue
for the instructions and the this can be a buffer.
And in there, we can reorder something.
So there, we can get some additional performance just
by just getting like a higher IPC, so instructions
per cycle, than one.
And I mean, if we're just scanning, again,
this will be even lower than one,
because we're just going to be memory bound.
Okay.
So now we know, okay, let's parallelize, right?
So now we have to somehow think about what to parallelize, right?
So of course we can do individual queries, but we want to have intra-query parallelism.
So we want to break up-query parallelism. So we want to break up
the query into multiple parts. And this means we're basically breaking the query into separate
parts. So thinking about a simple query here, where we have a table or tables A and B with a join with some selection criteria, two filters. So we have select A ID
and B value from A and B over the join criteria and with some filter criteria. So it's very simple.
It looks like this. Of course, already optimized. We do a selection push down.
We do the projection towards the end.
And we do the join.
We could also push down the projection a bit more.
But we're not dealing with this.
Well, we actually do some of the projection here already.
And this, we can basically, each of these processes,
each of these operators, we can parallelize and do them in smaller tasks.
So rather than doing the complete table A, for example,
we can break down the table into separate partitions
and execute these separate partitions on separate cores,
separate threads, separate processes, etc.
And then we can do the same thing with B.
So here, for example, we're splitting up table A into three partitions,
table B into three partitions.
We're running selection and projection in parallel,
and we're running a parallel join operator.
So here this is a hash join operator.
We'll go through this in a bit more.
Maybe this time, maybe next time and then we're basically we have some additional parallelization or exchange
constructs so something where we communicate across different nodes in order to hide the
parallelism here so there's one way of doing this. But you can already
see, building a hash table, or at least some parts of the hash table, we can do in parallel.
We can do projections in parallel. We can do selections in parallel. We can also do a join
in parallel if we know how to do this. And I'll remember back what's the volcano execution strategy. Do you remember?
Do you remember?
Okay, volcano is basically tuple at a time, right? So it's an iterator-based way of executing the query.
So we're building iterators over like each operator
essentially becomes an iterator
and we're asking for the next tuple
and until we can basically output something.
So in the way we basically process this at the output node,
so the sink, essentially, so the last projection,
probably, we ask the last projection,
please give me the next tuple,
and the projection then says to the next,
so in our example here, to the join,
so let's keep it at the simple part, right?
So, this projection here
is our last operator the projection asks to join for the next tuple the join actually needs to do
a bit more to get the first tuple right so it will ask both sides to do something depending on the type of of join this might include going through one of the tables completely
already or we just like in a in a parallel like in a nested loops join for example
we start basically running through one of the uh like say we're picking one tuple
in a and we're going through the tuples b until until we find the first hit and so on and so forth.
Of course, before doing so, we're not going through A.
First, we have to ask the selection every time.
Give me the next tuple, right?
And the selection might, if the filter basically matches, we get the tuple.
If the filter doesn't match, the selection
will ask the scan operator for the next tuple.
So that's the volcano style of execution.
And now in order to do parallelization,
and here you can see this is, I put it on the bottom, right? Goetz-Greffe, this is what he came up way back when,
when disks were slow and networks were slow.
So that basically made a lot of sense in 1994.
And so here, rather than basically having parallelism all over the place
and having to deal with it everywhere, the idea was well
maybe let's encapsulate this in a query operator by itself and not in the query plan
infrastructure. So rather than having to deal with parallelism everywhere,
let's have something like an operator that deals with the communication for us.
And have something like an operator that deals with the communication for us. And in terms of parallelism, we have two.
We talked about this.
We have pipeline parallelism and we have partitioning.
We can have inter or intra operator parallelism.
So inter operator means we have two operators
and we can run them in parallel.
So in a bushy tree.
So going back here, this sort of, I mean, maybe you remember, right?
So we had this left deep tree, for example.
This, given, say we have another join here,
if we have the joins next to each other rather than in,
so let's make it three joints,
otherwise it's not super bushy.
So we have another joint here and a third joint here.
If we have the two joints basically on the same level,
this is called a bushy tree.
And then we have parallelism in between these joints.
We have parallelism in between all of the individual operators.
If we have this in a row, right, so in the left deep tree, then we only can pipeline.
We don't have parallelism in between the joins. We still have parallelism
in between the selections, for example. But we can also have it within each individual
operator. Say we have a parallel hash join, parallel sorting,
it's a parallel scan.
Very simple.
And of course, we want it all.
So in order to be efficient, we're
not going to stick to just interquery or pipeline
parallelism or just partitioning.
We want all of the parallelism all of the time,
including setup, teardown, runtime logic, et cetera,
in a clean, encapsulated way.
And that's the exchange operator.
So this is basically an operator that you
put into your data flow graph as desired, which
is anonymous to all of the other operations
and basically a synchronization point.
So all of your data flows in there.
You deal with re-communicating the data in there
and you get your parallelism.
So it's basically plan-driven.
So the optimizer at the point of compilation
or whenever
at query plan time determines a degree of parallelism
and instantiates one query operator plan for each thread.
And each of these operator plans is basically then connected
with this exchange operator, which basically manages
the communication in between, right, and manages the threads. And this is an elegant model,
which is used by many systems. So in the end, this will look something like this, right? So we have
our scan, the scan that can be parallel. And then we have the exchange operator, which basically hides the fact that we somehow have to
basically get the information to the right parts in the join.
So to get kind of this partitioning. So we exchange the information in a way,
we might partition the data in a way that then in the join we can get some parallelism in there as well again. And then of course we need some kind of exchange towards the end again,
because each of these operators in here, like the join in here, the scan in here,
these then are not just a single operator, but physically this will be multiple parallel
instances of the operator and the exchange operator says, OK, here we
need to actually communicate.
Sometimes it might nicely partition.
And then the exchange operator basically just forwards
the data to the next operator or set of operators.
It's positive because the operators are largely oblivious to the parallelism, so the operators
can just run on their data partition efficiently.
However, you have a static partitioning, and this can and will lead to imbalance. And you cannot really change the degree
of parallelism in midquery.
So if you figure out this was too parallel
or too little parallel, we cannot really change anything.
And there can be different kinds of overheads in there.
And as a last step for today, rather than going too deep in here today, we'll look in how modern systems will do that. So today, a query coordinator manages parallel execution.
So it basically checks how many parallel servers, threads,
et cetera, are there, and then determines, based on this,
some kind of partitioning and load distribution.
The partitioning, again, could be static.
So we could say,
well, we always use a thousand tuples, for example, or something like this.
Or we can be more flexible based on how much data we actually have, right? So some tables might not
have a thousand tuples, so then we want to deal with this. it just might weirdly overlap so we can be a bit more flexible.
And then there's parallelism within and between the operators and there might be like a producer
consumer pair, say here like an example with a parallel scan and a group by. So then we have our large table,
we're parallelly scanning this.
And then again, we need to repartition the data, right?
So if we do a parallel group by with like a key aggregation,
so not just like on a global single group,
but on multiple groups, then we need to redistribute the data.
And then we can do the separate aggregations
or the separate groupings in parallel again
and output all of the resulting information to our user process.
So say for example, here we have an example
where we have a large table, we have a parallel scan,
then we're ordering, for example,
so then we can do with the scan,
we can also already do like a partitioning
such that we get get ranges of data,
then these can be, say, ordered, can be grouped,
can something else.
And the parallel order coordinator
will deal with the rest.
And the coordinator will deal with the degree of parallelism,
so the number of parallel threads.
This can be chosen manually or automatically and can be
adaptive depending on the load of the system,
depending on the amount of data that is seen.
So if we have a filter in here, somewhere in between,
or we have a join in there, we might
up with very different kinds of distributions,
then we might want to further parallelize here
or reduce the parallelism just because the groups are so small.
Okay, and with that, I would say, let me see.
My watch stopped working.
Yeah, so let's stop here.
Do we have questions?
Up to here.
No questions?
Very good.
Then we'll continue with multi-core parallelism next time.
Look into a couple of different kinds of parallel joins.
So how to parallelize hash joins.
And before that, we'll have the task presentation on Tuesday.
Thank you very much for coming.