-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathmultimaster.h
281 lines (234 loc) · 7.72 KB
/
multimaster.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/*-------------------------------------------------------------------------
*
* mm.h
* General definitions
*
*
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef MULTIMASTER_H
#define MULTIMASTER_H
#include "postgres.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "access/xact.h"
#include "bgwpool.h"
#include "dmq.h" /* DmqDestinationId */
#include "resolver.h" /* XXX: rework message and get rid of this */
#define MULTIMASTER_SCHEMA_NAME "mtm"
#define MULTIMASTER_NAME "multimaster"
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
/* XXX: change to one NODENAME_FMT */
#define MTM_SUBNAME_FMT "mtm_sub_%d"
#define MTM_DMQNAME_FMT "node%d"
#define MULTIMASTER_FILTER_SLOT_PATTERN "mtm_filter_slot_%d"
#define MULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
#define MULTIMASTER_MAX_CTL_STR_SIZE 256
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
#define MULTIMASTER_ADMIN "mtm_admin"
#define MULTIMASTER_PRECOMMITTED "precommitted"
#define MULTIMASTER_PREABORTED "preaborted"
#define MTM_MAX_NODES 16
typedef uint64 nodemask_t;
#define LSN_FMT "%" INT64_MODIFIER "X"
#define MAX_NODES 64
#define BIT_CHECK(mask, bit) (((mask) & ((nodemask_t)1 << (bit))) != 0)
#define BIT_CLEAR(mask, bit) ((mask) &= ~((nodemask_t)1 << (bit)))
#define BIT_SET(mask, bit) (mask |= ((nodemask_t)1 << (bit)))
#define ALL_BITS ((nodemask_t)~0)
#define EQUAL_GTID(x,y) ((x).node == (y).node && (x).xid == (y).xid)
/*
* Definitions for the "mtm.cluster_nodes" table.
*/
#define MTM_NODES "mtm.cluster_nodes"
#define Natts_mtm_nodes 4
#define Anum_mtm_nodes_id 1 /* node_id, same across cluster */
#define Anum_mtm_nodes_connifo 2 /* connection string */
#define Anum_mtm_nodes_is_self 3 /* is that tuple for our node? */
/*
* Definitions for the "mtm.cluster_status" type.
*/
#define Natts_mtm_status 7
#define Anum_mtm_status_node_id 1
#define Anum_mtm_status_status 2
#define Anum_mtm_status_connected 3
#define Anum_mtm_status_gen_num 4
#define Anum_mtm_status_gen_members 5
#define Anum_mtm_status_gen_members_online 6
#define Anum_mtm_status_gen_configured 7
/*
* Definitions for the "mtm.node_info" type.
*/
#define Natts_mtm_node_info 6
#define Anum_mtm_node_info_enabled 1
#define Anum_mtm_node_info_connected 2
#define Anum_mtm_node_info_sender_pid 3
#define Anum_mtm_node_info_receiver_pid 4
#define Anum_mtm_node_info_n_workers 5
#define Anum_mtm_node_info_receiver_mode 6
/* Identifier of global transaction */
typedef struct
{
int node; /* One based id of node initiating transaction */
TransactionId xid; /* Transaction ID at origin node */
TransactionId my_xid; /* Transaction ID at our node */
} GlobalTransactionId;
typedef struct
{
bool contains_ddl;
bool contains_dml;
bool distributed;
} MtmCurrentTrans;
typedef struct MtmSeqPosition
{
Oid seqid;
int64 next;
} MtmSeqPosition;
typedef enum
{
PGLOGICAL_COMMIT,
/* can be sent if prepared xact is aborted during decoding */
PGLOGICAL_ABORT,
PGLOGICAL_PREPARE,
PGLOGICAL_COMMIT_PREPARED,
PGLOGICAL_ABORT_PREPARED,
PGLOGICAL_PREPARE_PHASE2A
} PGLOGICAL_EVENT;
typedef struct
{
Oid sourceTable;
nodemask_t targetNodes;
} MtmCopyRequest;
#define MtmInvalidNodeId 0
typedef struct
{
int node_id;
char *conninfo;
RepOriginId origin_id;
bool init_done;
} MtmNode;
typedef struct
{
int n_nodes; /* num of entries in nodes[]. *Does* include myself. */
int my_node_id;
int backup_node_id;
XLogRecPtr backup_end_lsn;
MtmNode nodes[MTM_MAX_NODES];
/*
* Some users prefer nodemask representation of nodes[]. Moreover, unlike
* nodes[] it *includes* myself.
*/
nodemask_t mask;
} MtmConfig;
extern MtmConfig *receiver_mtm_cfg;
extern bool receiver_mtm_cfg_valid;
typedef struct
{
LWLock *lock;
LWLock *syncpoint_lock;
/*
* We do not need to go into the database every time that we resolve
* deadlocks.
*/
bool IsEnabled;
Oid DatabaseId;
int my_node_id;
/* configured members (initially + add/rm), maintained by monitor */
pg_atomic_uint64 configured_mask;
XLogRecPtr latestSyncpoint;
bool localTablesHashLoaded; /* Whether data from local_tables
* table is loaded in shared memory
* hash table */
ConditionVariable receiver_barrier_cv;
/*
* These 1) ensure that receiver in REPLMODE_RECOVERY excludes any other
* receiver (to avoid applying the same record twice). Counters are
* protected by Mtm->lock; it could be separate as well, but there is no
* contention.
*/
int nreceivers_recovery;
int nreceivers_normal;
pid_t resolver_pid;
struct
{
/*
* mode in which receiver is actually running (or has been running the
* last time it was alive)
*/
MtmReplicationMode receiver_mode;
pid_t walsender_pid;
PGPROC *walreceiver_proc;
int dmq_dest_id;
pid_t dmq_receiver_pid;
/*
* which LSN we should ack to this node? caches get_recovery_horizon
*/
pg_atomic_uint64 horizon;
} peers[MTM_MAX_NODES];
BgwPool pools[MTM_MAX_NODES]; /* [Mtm->nAllNodes]: per-node data */
/* for debugging/monitoring purposes */
nodemask_t walsenders_mask;
nodemask_t walreceivers_mask;
bool monitor_loaded;
bool replier_loaded;
} MtmShared;
extern MtmShared *Mtm;
/* XXX: to delete */
extern MtmCurrentTrans MtmTx;
extern int MtmTxAtxLevel;
extern MemoryContext MtmApplyContext;
/* bgworker identities */
extern bool MtmBackgroundWorker;
extern bool MtmIsLogicalReceiver;
extern bool MtmIsReceiver;
extern bool MtmIsPoolWorker;
extern bool MtmIsMonitorWorker;
/* GUCs */
extern int MtmTransSpillThreshold;
extern int MtmHeartbeatSendTimeout;
extern int MtmHeartbeatRecvTimeout;
extern char *MtmRefereeConnStr;
#define IS_REFEREE_ENABLED() (MtmRefereeConnStr && *MtmRefereeConnStr)
extern int MtmMaxWorkers;
extern bool MtmBreakConnection;
extern bool MtmWaitPeerCommits;
extern bool MtmNo3PC;
extern bool MtmBinaryBasetypes;
extern void MtmSleep(int64 interval);
extern TimestampTz MtmGetIncreasingTimestamp(void);
extern bool MtmAllApplyWorkersFinished(void);
extern MtmConfig *MtmLoadConfig(int elevel_on_absent);
typedef void (*mtm_cfg_change_cb) (int node_id, MtmConfig *new_cfg, Datum arg);
extern MtmConfig *MtmReloadConfig(MtmConfig *old_cfg,
mtm_cfg_change_cb node_add_cb,
mtm_cfg_change_cb node_drop_cb,
Datum arg,
int elevel_on_absent);
extern void MtmConfigFree(MtmConfig *cfg);
extern bool Quorum(int ntotal, int nvotes);
extern bool MtmQuorum(MtmConfig *mtm_cfg, int nvotes);
extern MtmNode *MtmNodeById(MtmConfig *cfg, int node_id);
extern bool MtmIsEnabled(void);
extern void MtmToggleReplication(void);
/* nodemask_t stuff */
extern int popcount(nodemask_t mask);
extern int first_set_bit(nodemask_t mask);
extern bool is_submask(nodemask_t submask, nodemask_t mask);
extern char *maskToString(nodemask_t mask);
struct MtmMessage; /* forward declaration for gather prototype */
typedef bool (*gather_hook_t)(struct MtmMessage *anymsg, Datum arg);
extern bool gather(nodemask_t participants,
struct MtmMessage **messages, int *senders, int *msg_count,
gather_hook_t msg_ok, Datum msg_ok_arg,
int *sendconn_cnt, uint64 gen_num);
/* boilerplate for config updates in bgws */
extern bool mtm_config_valid;
extern void mtm_pubsub_change_cb(Datum arg, int cacheid, uint32 hashvalue);
extern void mtm_attach_node(int node_id, MtmConfig *new_cfg, Datum arg);
extern void mtm_detach_node(int node_id, MtmConfig *new_cfg, Datum arg);
#endif /* MULTIMASTER_H */