Skip to content

Commit 1145acc

Browse files
committed
Add a barrier primitive for synchronizing backends.
Provide support for dynamic or static parties of processes to wait for all processes to reach point in the code before continuing. This is similar to the mechanism of the same name in POSIX threads and MPI, though has explicit phasing and dynamic party support like the Java core library's Phaser. This will be used by an upcoming patch adding support for parallel hash joins. Author: Thomas Munro Reviewed-By: Andres Freund Discussion: https://postgr.es/m/CAEepm=2_y7oi01OjA_wLvYcWMc9_d=LaoxrY3eiROCZkB_qakA@mail.gmail.com
1 parent fa330f9 commit 1145acc

File tree

3 files changed

+357
-1
lines changed

3 files changed

+357
-1
lines changed

src/backend/storage/ipc/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ subdir = src/backend/storage/ipc
88
top_builddir = ../../../..
99
include $(top_builddir)/src/Makefile.global
1010

11-
OBJS = dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \
11+
OBJS = barrier.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \
1212
procsignal.o shmem.o shmqueue.o shm_mq.o shm_toc.o sinval.o \
1313
sinvaladt.o standby.o
1414

src/backend/storage/ipc/barrier.c

+311
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* barrier.c
4+
* Barriers for synchronizing cooperating processes.
5+
*
6+
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7+
* Portions Copyright (c) 1994, Regents of the University of California
8+
*
9+
* From Wikipedia[1]: "In parallel computing, a barrier is a type of
10+
* synchronization method. A barrier for a group of threads or processes in
11+
* the source code means any thread/process must stop at this point and cannot
12+
* proceed until all other threads/processes reach this barrier."
13+
*
14+
* This implementation of barriers allows for static sets of participants
15+
* known up front, or dynamic sets of participants which processes can join or
16+
* leave at any time. In the dynamic case, a phase number can be used to
17+
* track progress through a parallel algorithm, and may be necessary to
18+
* synchronize with the current phase of a multi-phase algorithm when a new
19+
* participant joins. In the static case, the phase number is used
20+
* internally, but it isn't strictly necessary for client code to access it
21+
* because the phase can only advance when the declared number of participants
22+
* reaches the barrier, so client code should be in no doubt about the current
23+
* phase of computation at all times.
24+
*
25+
* Consider a parallel algorithm that involves separate phases of computation
26+
* A, B and C where the output of each phase is needed before the next phase
27+
* can begin.
28+
*
29+
* In the case of a static barrier initialized with 4 participants, each
30+
* participant works on phase A, then calls BarrierArriveAndWait to wait until
31+
* all 4 participants have reached that point. When BarrierArriveAndWait
32+
* returns control, each participant can work on B, and so on. Because the
33+
* barrier knows how many participants to expect, the phases of computation
34+
* don't need labels or numbers, since each process's program counter implies
35+
* the current phase. Even if some of the processes are slow to start up and
36+
* begin running phase A, the other participants are expecting them and will
37+
* patiently wait at the barrier. The code could be written as follows:
38+
*
39+
* perform_a();
40+
* BarrierArriveAndWait(&barrier, ...);
41+
* perform_b();
42+
* BarrierArriveAndWait(&barrier, ...);
43+
* perform_c();
44+
* BarrierArriveAndWait(&barrier, ...);
45+
*
46+
* If the number of participants is not known up front, then a dynamic barrier
47+
* is needed and the number should be set to zero at initialization. New
48+
* complications arise because the number necessarily changes over time as
49+
* participants attach and detach, and therefore phases B, C or even the end
50+
* of processing may be reached before any given participant has started
51+
* running and attached. Therefore the client code must perform an initial
52+
* test of the phase number after attaching, because it needs to find out
53+
* which phase of the algorithm has been reached by any participants that are
54+
* already attached in order to synchronize with that work. Once the program
55+
* counter or some other representation of current progress is synchronized
56+
* with the barrier's phase, normal control flow can be used just as in the
57+
* static case. Our example could be written using a switch statement with
58+
* cases that fall-through, as follows:
59+
*
60+
* phase = BarrierAttach(&barrier);
61+
* switch (phase)
62+
* {
63+
* case PHASE_A:
64+
* perform_a();
65+
* BarrierArriveAndWait(&barrier, ...);
66+
* case PHASE_B:
67+
* perform_b();
68+
* BarrierArriveAndWait(&barrier, ...);
69+
* case PHASE_C:
70+
* perform_c();
71+
* BarrierArriveAndWait(&barrier, ...);
72+
* }
73+
* BarrierDetach(&barrier);
74+
*
75+
* Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic
76+
* barriers behave similarly to Java's java.util.concurrent.Phaser.
77+
*
78+
* [1] https://en.wikipedia.org/wiki/Barrier_(computer_science)
79+
*
80+
* IDENTIFICATION
81+
* src/backend/storage/ipc/barrier.c
82+
*
83+
*-------------------------------------------------------------------------
84+
*/
85+
86+
#include "postgres.h"
87+
#include "storage/barrier.h"
88+
89+
static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive);
90+
91+
/*
92+
* Initialize this barrier. To use a static party size, provide the number of
93+
* participants to wait for at each phase indicating that that number of
94+
* backends is implicitly attached. To use a dynamic party size, specify zero
95+
* here and then use BarrierAttach() and
96+
* BarrierDetach()/BarrierArriveAndDetach() to register and deregister
97+
* participants explicitly.
98+
*/
99+
void
100+
BarrierInit(Barrier *barrier, int participants)
101+
{
102+
SpinLockInit(&barrier->mutex);
103+
barrier->participants = participants;
104+
barrier->arrived = 0;
105+
barrier->phase = 0;
106+
barrier->elected = 0;
107+
barrier->static_party = participants > 0;
108+
ConditionVariableInit(&barrier->condition_variable);
109+
}
110+
111+
/*
112+
* Arrive at this barrier, wait for all other attached participants to arrive
113+
* too and then return. Increments the current phase. The caller must be
114+
* attached.
115+
*
116+
* While waiting, pg_stat_activity shows a wait_event_class and wait_event
117+
* controlled by the wait_event_info passed in, which should be a value from
118+
* from one of the WaitEventXXX enums defined in pgstat.h.
119+
*
120+
* Return true in one arbitrarily chosen participant. Return false in all
121+
* others. The return code can be used to elect one participant to execute a
122+
* phase of work that must be done serially while other participants wait.
123+
*/
124+
bool
125+
BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
126+
{
127+
bool release = false;
128+
bool elected;
129+
int start_phase;
130+
int next_phase;
131+
132+
SpinLockAcquire(&barrier->mutex);
133+
start_phase = barrier->phase;
134+
next_phase = start_phase + 1;
135+
++barrier->arrived;
136+
if (barrier->arrived == barrier->participants)
137+
{
138+
release = true;
139+
barrier->arrived = 0;
140+
barrier->phase = next_phase;
141+
barrier->elected = next_phase;
142+
}
143+
SpinLockRelease(&barrier->mutex);
144+
145+
/*
146+
* If we were the last expected participant to arrive, we can release our
147+
* peers and return true to indicate that this backend has been elected to
148+
* perform any serial work.
149+
*/
150+
if (release)
151+
{
152+
ConditionVariableBroadcast(&barrier->condition_variable);
153+
154+
return true;
155+
}
156+
157+
/*
158+
* Otherwise we have to wait for the last participant to arrive and
159+
* advance the phase.
160+
*/
161+
elected = false;
162+
ConditionVariablePrepareToSleep(&barrier->condition_variable);
163+
for (;;)
164+
{
165+
/*
166+
* We know that phase must either be start_phase, indicating that we
167+
* need to keep waiting, or next_phase, indicating that the last
168+
* participant that we were waiting for has either arrived or detached
169+
* so that the next phase has begun. The phase cannot advance any
170+
* further than that without this backend's participation, because
171+
* this backend is attached.
172+
*/
173+
SpinLockAcquire(&barrier->mutex);
174+
Assert(barrier->phase == start_phase || barrier->phase == next_phase);
175+
release = barrier->phase == next_phase;
176+
if (release && barrier->elected != next_phase)
177+
{
178+
/*
179+
* Usually the backend that arrives last and releases the other
180+
* backends is elected to return true (see above), so that it can
181+
* begin processing serial work while it has a CPU timeslice.
182+
* However, if the barrier advanced because someone detached, then
183+
* one of the backends that is awoken will need to be elected.
184+
*/
185+
barrier->elected = barrier->phase;
186+
elected = true;
187+
}
188+
SpinLockRelease(&barrier->mutex);
189+
if (release)
190+
break;
191+
ConditionVariableSleep(&barrier->condition_variable, wait_event_info);
192+
}
193+
ConditionVariableCancelSleep();
194+
195+
return elected;
196+
}
197+
198+
/*
199+
* Arrive at this barrier, but detach rather than waiting. Returns true if
200+
* the caller was the last to detach.
201+
*/
202+
bool
203+
BarrierArriveAndDetach(Barrier *barrier)
204+
{
205+
return BarrierDetachImpl(barrier, true);
206+
}
207+
208+
/*
209+
* Attach to a barrier. All waiting participants will now wait for this
210+
* participant to call BarrierArriveAndWait(), BarrierDetach() or
211+
* BarrierArriveAndDetach(). Return the current phase.
212+
*/
213+
int
214+
BarrierAttach(Barrier *barrier)
215+
{
216+
int phase;
217+
218+
Assert(!barrier->static_party);
219+
220+
SpinLockAcquire(&barrier->mutex);
221+
++barrier->participants;
222+
phase = barrier->phase;
223+
SpinLockRelease(&barrier->mutex);
224+
225+
return phase;
226+
}
227+
228+
/*
229+
* Detach from a barrier. This may release other waiters from BarrierWait and
230+
* advance the phase if they were only waiting for this backend. Return true
231+
* if this participant was the last to detach.
232+
*/
233+
bool
234+
BarrierDetach(Barrier *barrier)
235+
{
236+
return BarrierDetachImpl(barrier, false);
237+
}
238+
239+
/*
240+
* Return the current phase of a barrier. The caller must be attached.
241+
*/
242+
int
243+
BarrierPhase(Barrier *barrier)
244+
{
245+
/*
246+
* It is OK to read barrier->phase without locking, because it can't
247+
* change without us (we are attached to it), and we executed a memory
248+
* barrier when we either attached or participated in changing it last
249+
* time.
250+
*/
251+
return barrier->phase;
252+
}
253+
254+
/*
255+
* Return an instantaneous snapshot of the number of participants currently
256+
* attached to this barrier. For debugging purposes only.
257+
*/
258+
int
259+
BarrierParticipants(Barrier *barrier)
260+
{
261+
int participants;
262+
263+
SpinLockAcquire(&barrier->mutex);
264+
participants = barrier->participants;
265+
SpinLockRelease(&barrier->mutex);
266+
267+
return participants;
268+
}
269+
270+
/*
271+
* Detach from a barrier. If 'arrive' is true then also increment the phase
272+
* if there are no other participants. If there are other participants
273+
* waiting, then the phase will be advanced and they'll be released if they
274+
* were only waiting for the caller. Return true if this participant was the
275+
* last to detach.
276+
*/
277+
static inline bool
278+
BarrierDetachImpl(Barrier *barrier, bool arrive)
279+
{
280+
bool release;
281+
bool last;
282+
283+
Assert(!barrier->static_party);
284+
285+
SpinLockAcquire(&barrier->mutex);
286+
Assert(barrier->participants > 0);
287+
--barrier->participants;
288+
289+
/*
290+
* If any other participants are waiting and we were the last participant
291+
* waited for, release them. If no other participants are waiting, but
292+
* this is a BarrierArriveAndDetach() call, then advance the phase too.
293+
*/
294+
if ((arrive || barrier->participants > 0) &&
295+
barrier->arrived == barrier->participants)
296+
{
297+
release = true;
298+
barrier->arrived = 0;
299+
++barrier->phase;
300+
}
301+
else
302+
release = false;
303+
304+
last = barrier->participants == 0;
305+
SpinLockRelease(&barrier->mutex);
306+
307+
if (release)
308+
ConditionVariableBroadcast(&barrier->condition_variable);
309+
310+
return last;
311+
}

src/include/storage/barrier.h

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* barrier.h
4+
* Barriers for synchronizing cooperating processes.
5+
*
6+
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7+
* Portions Copyright (c) 1994, Regents of the University of California
8+
*
9+
* src/include/storage/barrier.h
10+
*
11+
*-------------------------------------------------------------------------
12+
*/
13+
#ifndef BARRIER_H
14+
#define BARRIER_H
15+
16+
/*
17+
* For the header previously known as "barrier.h", please include
18+
* "port/atomics.h", which deals with atomics, compiler barriers and memory
19+
* barriers.
20+
*/
21+
22+
#include "storage/condition_variable.h"
23+
#include "storage/spin.h"
24+
25+
typedef struct Barrier
26+
{
27+
slock_t mutex;
28+
int phase; /* phase counter */
29+
int participants; /* the number of participants attached */
30+
int arrived; /* the number of participants that have
31+
* arrived */
32+
int elected; /* highest phase elected */
33+
bool static_party; /* used only for assertions */
34+
ConditionVariable condition_variable;
35+
} Barrier;
36+
37+
extern void BarrierInit(Barrier *barrier, int num_workers);
38+
extern bool BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info);
39+
extern bool BarrierArriveAndDetach(Barrier *barrier);
40+
extern int BarrierAttach(Barrier *barrier);
41+
extern bool BarrierDetach(Barrier *barrier);
42+
extern int BarrierPhase(Barrier *barrier);
43+
extern int BarrierParticipants(Barrier *barrier);
44+
45+
#endif /* BARRIER_H */

0 commit comments

Comments
 (0)