Skip to content

Add support for nesting temporary namespaces for ATX transactions. #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion expected/regression_ee.diff
Original file line number Diff line number Diff line change
Expand Up @@ -570,14 +570,43 @@ diff ../../../src/test/regress/expected/rowsecurity.out ../tmp_check/regress_out
diff ../../../src/test/regress/expected/atx.out ../tmp_check/regress_outdir/results/atx.out
--- ../../../src/test/regress/expected/atx.out CENSORED
+++ ../tmp_check/regress_outdir/results/atx.out CENSORED
@@ -1143,6 +1143,7 @@
@@ -1139,6 +1139,7 @@
RESET client_min_messages;
create database regression_atx_test_database;
ALTER DATABASE "regression_atx_test_database" SET lc_messages TO 'C';
+ERROR: [MTM] failed to prepare transaction at peer node
\c regression_atx_test_database
create table atx_test as select 1 as id;
begin;
diff ../../../src/test/regress/expected/atx4.out ../tmp_check/regress_outdir/results/atx4.out
--- ../../../src/test/regress/expected/atx4.out CENSORED
+++ ../tmp_check/regress_outdir/results/atx4.out CENSORED
@@ -142,8 +142,10 @@
(1 row)

commit autonomous;
+ERROR: [MTM] failed to prepare transaction at peer node
-- Multimaster: t2 table will not be created due to pg_temp_N not found on replicas
commit;
+WARNING: there is no transaction in progress
begin;
-- create temp table in top level temptable but abort
begin autonomous;
@@ -213,11 +215,9 @@
commit;
-- Multimaster: t2 were not created
select * from t2;
- a
-----
- hi
-(1 row)
-
+ERROR: relation "t2" does not exist
+LINE 1: select * from t2;
+ ^
select * from t3;
ERROR: relation "t3" does not exist
LINE 1: select * from t3;
diff ../../../src/test/regress/expected/atx5.out ../tmp_check/regress_outdir/results/atx5.out
--- ../../../src/test/regress/expected/atx5.out CENSORED
+++ ../tmp_check/regress_outdir/results/atx5.out CENSORED
Expand Down
4 changes: 4 additions & 0 deletions multimaster--1.0.sql
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ CREATE FUNCTION mtm.set_temp_schema(nsp text) RETURNS void
AS 'MODULE_PATHNAME','mtm_set_temp_schema'
LANGUAGE C;

CREATE FUNCTION mtm.set_temp_schema(nsp text, force bool) RETURNS void
AS 'MODULE_PATHNAME','mtm_set_temp_schema'
LANGUAGE C;

CREATE TABLE mtm.local_tables(
rel_schema name,
rel_name name,
Expand Down
4 changes: 4 additions & 0 deletions src/commit.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ static bool inside_mtm_begin;
static MtmConfig *mtm_cfg;

MtmCurrentTrans MtmTx;
int MtmTxAtxLevel = 0;

/* holds state defining cleanup actions in case of failure during commit */
static struct MtmCommitState
Expand Down Expand Up @@ -400,6 +401,9 @@ MtmTwoPhaseCommit(void)
StartTransactionCommand();
}

if (MtmTxAtxLevel > 0)
temp_schema_reset(true);

/* prepare for cleanup */
mtm_commit_state.gtx = NULL;
mtm_commit_state.inside_commit_sequence = true;
Expand Down
139 changes: 109 additions & 30 deletions src/ddl.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ MtmDDLInProgress DDLApplyInProgress;

static char MtmTempSchema[NAMEDATALEN];
static bool TempDropRegistered;
static int TempDropAtxLevel;

static void const *MtmDDLStatement;

Expand Down Expand Up @@ -247,9 +248,11 @@ temp_schema_reset_all(int my_node_id)
" nsp record; "
"begin "
" reset session_authorization; "
" for nsp in select nspname from pg_namespace where nspname ~ '^mtm_tmp_%d_.*' loop "
" for nsp in select nspname from pg_namespace where "
" nspname ~ '^mtm_tmp_%d_.*' and"
" nspname !~ '_toast$' loop "
" perform mtm.set_temp_schema(nsp.nspname); "
" execute format('drop schema if exists %%I cascade', format('%%s_toast', nsp.nspname)); "
" execute format('drop schema if exists %%I cascade', nsp.nspname||'_toast'); "
" execute format('drop schema if exists %%I cascade', nsp.nspname); "
" end loop; "
"end $$; ",
Expand All @@ -258,26 +261,27 @@ temp_schema_reset_all(int my_node_id)
}

/* Drop temp schemas on peer nodes */
static void
temp_schema_reset(void)
void
temp_schema_reset(bool transactional)
{
Assert(TempDropRegistered);
Assert(TempDropAtxLevel == MtmTxAtxLevel);

/*
* reset session_authorization restores permissions if previous ddl
* dropped them; set_temp_schema allows us to see temporary objects,
* otherwise they can't be dropped
*
* It is important to run it as 'V', otherwise it might interfere with
* later (if drop is due to DISCARD) or earlier command using the schema.
* If drop is due to DISCARD, it is important to run it as 'V', otherwise
* it might interfere with later or earlier command using the schema.
*/
MtmProcessDDLCommand(
psprintf("RESET session_authorization; "
"select mtm.set_temp_schema('%s'); "
"select mtm.set_temp_schema('%s', false); "
"DROP SCHEMA IF EXISTS %s_toast CASCADE; "
"DROP SCHEMA IF EXISTS %s CASCADE;",
MtmTempSchema, MtmTempSchema, MtmTempSchema),
false,
transactional,
false
);
MtmFinishDDLCommand();
Expand All @@ -290,52 +294,127 @@ temp_schema_at_exit(int status, Datum arg)
Assert(TempDropRegistered);
AbortOutOfAnyTransaction();
StartTransactionCommand();
temp_schema_reset();
for (; MtmTxAtxLevel >= 0; MtmTxAtxLevel--)
{
temp_schema_init();
temp_schema_reset(false);
}
CommitTransactionCommand();
}

/* Register cleanup callback and generate temp schema name */
static void
void
temp_schema_init(void)
{
if (!TempDropRegistered)
{
char *temp_schema;

/*
* NB: namespace.c:isMtmTemp() assumes 'mtm_tmp_' prefix for mtm temp
* tables to defuse autovacuum.
*/
temp_schema = psprintf("mtm_tmp_%d_%d",
Mtm->my_node_id, MyBackendId);
memcpy(&MtmTempSchema, temp_schema, strlen(temp_schema) + 1);
before_shmem_exit(temp_schema_at_exit, (Datum) 0);
TempDropRegistered = true;
pfree(temp_schema);
before_shmem_exit(temp_schema_at_exit, (Datum) 0);
}
if (MtmTxAtxLevel == 0)
snprintf(MtmTempSchema, sizeof(MtmTempSchema),
"mtm_tmp_%d_%d", Mtm->my_node_id, MyBackendId);
else
snprintf(MtmTempSchema, sizeof(MtmTempSchema),
"mtm_tmp_%d_%d_%d", Mtm->my_node_id, MyBackendId, MtmTxAtxLevel);
TempDropAtxLevel = MtmTxAtxLevel;
}

/*
* temp_schema_valid check format of temp schema name.
* Namespace name should be either mtm_tmp_\d+_\d+ or
* mtm_tmp_\d+_\d+_\d+ for non-zero atx level.
*/
static bool
temp_schema_valid(const char *temp_namespace, const char **atx_level)
{
const char *c;
const int mtm_tmp_len = strlen("mtm_tmp_");
int underscores = 0;
bool need_digit = true;
bool valid = true;

*atx_level = NULL;
if (strlen(temp_namespace) + strlen("_toast") + 1 > NAMEDATALEN)
valid = false;
else if(strncmp(temp_namespace, "mtm_tmp_", mtm_tmp_len) != 0)
valid = false;
for (c = temp_namespace+mtm_tmp_len; *c != 0 && valid; c++)
{
if (!need_digit && *c == '_')
{
underscores++;
if (underscores == 2)
*atx_level = c;
need_digit = true;
}
else if ((unsigned)*c - '0' <= '9' - '0')
need_digit = false;
else
valid = false;
}
if (need_digit || underscores < 1 || underscores > 2)
valid = false;
#ifndef PGPRO_EE
if (underscores == 2)
valid = false;
#endif

return valid;
}

Datum
mtm_set_temp_schema(PG_FUNCTION_ARGS)
{
char *temp_namespace = text_to_cstring(PG_GETARG_TEXT_P(0));
char *temp_toast_namespace = psprintf("%s_toast", temp_namespace);
Oid nsp_oid;
Oid toast_nsp_oid;
bool force = PG_NARGS() > 1 ? PG_GETARG_BOOL(1) : true;
char temp_toast_namespace[NAMEDATALEN] = {0};
Oid nsp_oid = InvalidOid;
Oid toast_nsp_oid = InvalidOid;
const char *atx_level_start = NULL;
#ifdef PGPRO_EE
char top_temp_namespace[NAMEDATALEN] = {0};
Oid top_nsp_oid = InvalidOid;
Oid top_toast_nsp_oid = InvalidOid;
#endif

if (!temp_schema_valid(temp_namespace, &atx_level_start))
mtm_log(ERROR, "mtm_set_temp_schema: wrong namespace name '%s'",
temp_namespace);

if (!SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(temp_namespace)))
snprintf(temp_toast_namespace, NAMEDATALEN, "%s_toast", temp_namespace);
if (SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(temp_namespace)))
{
nsp_oid = get_namespace_oid(temp_namespace, false);
toast_nsp_oid = get_namespace_oid(temp_toast_namespace, false);
}
else if (force)
{
nsp_oid = NamespaceCreate(temp_namespace, BOOTSTRAP_SUPERUSERID, true);
toast_nsp_oid = NamespaceCreate(temp_toast_namespace, BOOTSTRAP_SUPERUSERID, true);
CommandCounterIncrement();
}
else

#ifdef PGPRO_EE
if (atx_level_start != NULL)
{
nsp_oid = get_namespace_oid(temp_namespace, false);
toast_nsp_oid = get_namespace_oid(temp_toast_namespace, false);
memcpy(top_temp_namespace, temp_namespace, atx_level_start - temp_namespace);

if (SearchSysCacheExists1(NAMESPACENAME, PointerGetDatum(top_temp_namespace)))
{
top_nsp_oid = get_namespace_oid(top_temp_namespace, false);
strlcat(top_temp_namespace, "_toast", NAMEDATALEN);
top_toast_nsp_oid = get_namespace_oid(top_temp_namespace, false);
}
}

SetTempNamespaceState(nsp_oid, toast_nsp_oid);
SetTempNamespaceForMultimaster();
SetTempNamespaceStateEx(nsp_oid, toast_nsp_oid,
top_nsp_oid, top_toast_nsp_oid,
atx_level_start != NULL);
#else
SetTempNamespace(nsp_oid, toast_nsp_oid);
#endif
PG_RETURN_VOID();
}

Expand Down Expand Up @@ -1030,7 +1109,7 @@ MtmProcessUtilitySender(PlannedStmt *pstmt, const char *queryString,
{
/* nothing to do if temp schema wasn't created at all */
if (TempDropRegistered)
temp_schema_reset();
temp_schema_reset(false);
SkipCommand(true);
MtmGucDiscard();
}
Expand Down
2 changes: 2 additions & 0 deletions src/include/ddl.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ extern MtmDDLInProgress DDLApplyInProgress;
extern void MtmDDLReplicationInit(void);
extern void MtmDDLReplicationShmemStartup(void);
extern void temp_schema_reset_all(int my_node_id);
extern void temp_schema_reset(bool transactional);
extern void temp_schema_init(void);
extern bool MtmIsRelationLocal(Relation rel);
extern void MtmDDLResetStatement(void);
extern void MtmApplyDDLMessage(const char *messageBody, bool transactional);
Expand Down
1 change: 1 addition & 0 deletions src/include/multimaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ extern MtmShared *Mtm;

/* XXX: to delete */
extern MtmCurrentTrans MtmTx;
extern int MtmTxAtxLevel;
extern MemoryContext MtmApplyContext;

/* bgworker identities */
Expand Down
4 changes: 4 additions & 0 deletions src/multimaster.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ MtmSuspendTransaction(void)
MtmCurrentTrans *ctx = malloc(sizeof(MtmCurrentTrans));

*ctx = MtmTx;
MtmTxAtxLevel++;
temp_schema_init();
CallXactCallbacks(XACT_EVENT_START);
return ctx;
}
Expand All @@ -378,6 +380,8 @@ MtmResumeTransaction(void *ctx)
{
MtmTx = *(MtmCurrentTrans *) ctx;
free(ctx);
MtmTxAtxLevel--;
temp_schema_init();
}
#endif

Expand Down
17 changes: 15 additions & 2 deletions src/state.c
Original file line number Diff line number Diff line change
Expand Up @@ -3920,6 +3920,7 @@ MtmMonitor(Datum arg)
*/
{
int rc;
uint64 nfuncs;

StartTransactionCommand();
if (SPI_connect() != SPI_OK_CONNECT)
Expand Down Expand Up @@ -3955,15 +3956,27 @@ MtmMonitor(Datum arg)
true, 0);
if (rc < 0 || rc != SPI_OK_SELECT)
mtm_log(ERROR, "Failed to query pg_proc");
if (SPI_processed == 0)
nfuncs = SPI_processed;
if (nfuncs == 0)
{
rc = SPI_execute("CREATE FUNCTION mtm.set_temp_schema(nsp text) RETURNS void "
"AS '$libdir/multimaster','mtm_set_temp_schema' "
"LANGUAGE C; ", false, 0);
if (rc < 0 || rc != SPI_OK_UTILITY)
mtm_log(ERROR, "Failed to create mtm.set_temp_schema()");

mtm_log(LOG, "Creating mtm.set_temp_schema()");
mtm_log(LOG, "Creating mtm.set_temp_schema(nsp)");
}

if (nfuncs <= 1)
{
rc = SPI_execute("CREATE FUNCTION mtm.set_temp_schema(nsp text, force bool) RETURNS void "
"AS '$libdir/multimaster','mtm_set_temp_schema' "
"LANGUAGE C; ", false, 0);
if (rc < 0 || rc != SPI_OK_UTILITY)
mtm_log(ERROR, "Failed to create mtm.set_temp_schema()");

mtm_log(LOG, "Creating mtm.set_temp_schema(nsp, force)");
}

SPI_finish();
Expand Down