Skip to content

Foreign copy from for pg_shardman #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/include/partition_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,15 @@ typedef struct
} ResultRelInfoHolder;


/* Forward declaration (for on_new_rri_holder()) */
/* Forward declaration (for on_rri_holder()) */
Copy link
Collaborator

@funbringer funbringer Dec 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a necessary change? This callback is intended to be called when a new ResultRelInfo holder is created.

struct ResultPartsStorage;
typedef struct ResultPartsStorage ResultPartsStorage;

/*
* Callback to be fired at rri_holder creation.
* Callback to be fired at rri_holder creation/destruction.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO there should be 2 callbacks: one for creation, and one for destruction. Why would we want to mix these roles?

*/
typedef void (*on_new_rri_holder)(EState *estate,
ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage,
void *arg);
typedef void (*on_rri_holder)(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage);

/*
* Cached ResultRelInfos of partitions.
Expand All @@ -66,7 +64,7 @@ struct ResultPartsStorage

bool speculative_inserts; /* for ExecOpenIndices() */

on_new_rri_holder on_new_rri_holder_callback;
on_rri_holder on_new_rri_holder_callback;
void *callback_arg;

EState *estate; /* pointer to executor's state */
Expand Down Expand Up @@ -116,11 +114,11 @@ void init_result_parts_storage(ResultPartsStorage *parts_storage,
EState *estate,
bool speculative_inserts,
Size table_entry_size,
on_new_rri_holder on_new_rri_holder_cb,
on_rri_holder on_new_rri_holder_cb,
void *on_new_rri_holder_cb_arg);

void fini_result_parts_storage(ResultPartsStorage *parts_storage,
bool close_rels);
bool close_rels, on_rri_holder hook);

ResultRelInfoHolder * scan_result_parts_storage(Oid partid,
ResultPartsStorage *storage);
Expand Down
82 changes: 37 additions & 45 deletions src/partition_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,12 @@ CustomScanMethods partition_filter_plan_methods;
CustomExecMethods partition_filter_exec_methods;


static void prepare_rri_for_insert(EState *estate,
ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage,
void *arg);
static void prepare_rri_returning_for_insert(EState *estate,
ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage,
void *arg);
static void prepare_rri_fdw_for_insert(EState *estate,
ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage,
void *arg);
static void prepare_rri_for_insert(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage);
static void prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage);
static void prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage);
static Node *fix_returning_list_mutator(Node *node, void *state);

static Index append_rte_to_estate(EState *estate, RangeTblEntry *rte);
Expand Down Expand Up @@ -143,7 +137,7 @@ init_result_parts_storage(ResultPartsStorage *parts_storage,
EState *estate,
bool speculative_inserts,
Size table_entry_size,
on_new_rri_holder on_new_rri_holder_cb,
on_rri_holder on_new_rri_holder_cb,
void *on_new_rri_holder_cb_arg)
{
HASHCTL *result_rels_table_config = &parts_storage->result_rels_table_config;
Expand Down Expand Up @@ -177,16 +171,21 @@ init_result_parts_storage(ResultPartsStorage *parts_storage,

/* Free ResultPartsStorage (close relations etc) */
void
fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels)
fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels,
on_rri_holder hook)
{
HASH_SEQ_STATUS stat;
ResultRelInfoHolder *rri_holder; /* ResultRelInfo holder */

/* Close partitions and free free conversion-related stuff */
if (close_rels)
hash_seq_init(&stat, parts_storage->result_rels_table);
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
{
hash_seq_init(&stat, parts_storage->result_rels_table);
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
/* Call destruction hook, if needed */
if (hook != NULL)
hook(rri_holder, parts_storage);

/* Close partitions and free free conversion-related stuff */
if (close_rels)
{
ExecCloseIndices(rri_holder->result_rel_info);

Expand All @@ -202,13 +201,8 @@ fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels)

free_conversion_map(rri_holder->tuple_map);
}
}

/* Else just free conversion-related stuff */
else
{
hash_seq_init(&stat, parts_storage->result_rels_table);
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
/* Else just free conversion-related stuff */
else
{
/* Skip if there's no map */
if (!rri_holder->tuple_map)
Expand Down Expand Up @@ -329,10 +323,8 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)

/* Call on_new_rri_holder_callback() if needed */
if (parts_storage->on_new_rri_holder_callback)
parts_storage->on_new_rri_holder_callback(parts_storage->estate,
rri_holder,
parts_storage,
parts_storage->callback_arg);
parts_storage->on_new_rri_holder_callback(rri_holder,
parts_storage);

/* Finally append ResultRelInfo to storage->es_alloc_result_rels */
append_rri_to_estate(parts_storage->estate, child_result_rel_info);
Expand Down Expand Up @@ -702,7 +694,7 @@ partition_filter_end(CustomScanState *node)
PartitionFilterState *state = (PartitionFilterState *) node;

/* Executor will close rels via estate->es_result_relations */
fini_result_parts_storage(&state->result_parts, false);
fini_result_parts_storage(&state->result_parts, false, NULL);

Assert(list_length(node->custom_ps) == 1);
ExecEndNode((PlanState *) linitial(node->custom_ps));
Expand Down Expand Up @@ -793,34 +785,33 @@ pfilter_build_tlist(Relation parent_rel, List *tlist)

/* Main trigger */
static void
prepare_rri_for_insert(EState *estate,
ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage,
void *arg)
prepare_rri_for_insert(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage)
{
prepare_rri_returning_for_insert(estate, rri_holder, rps_storage, arg);
prepare_rri_fdw_for_insert(estate, rri_holder, rps_storage, arg);
prepare_rri_returning_for_insert(rri_holder, rps_storage);
prepare_rri_fdw_for_insert(rri_holder, rps_storage);
}

/* Prepare 'RETURNING *' tlist & projection */
static void
prepare_rri_returning_for_insert(EState *estate,
ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage,
void *arg)
prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage)
{
PartitionFilterState *pfstate;
List *returning_list;
ResultRelInfo *child_rri,
*parent_rri;
Index parent_rt_idx;
TupleTableSlot *result_slot;
EState *estate;

estate = rps_storage->estate;

/* We don't need to do anything ff there's no map */
if (!rri_holder->tuple_map)
return;

pfstate = (PartitionFilterState *) arg;
pfstate = (PartitionFilterState *) rps_storage->callback_arg;
returning_list = pfstate->returning_list;

/* Exit if there's no RETURNING list */
Expand Down Expand Up @@ -857,14 +848,15 @@ prepare_rri_returning_for_insert(EState *estate,

/* Prepare FDW access structs */
static void
prepare_rri_fdw_for_insert(EState *estate,
ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage,
void *arg)
prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage)
{
ResultRelInfo *rri = rri_holder->result_rel_info;
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
Oid partid;
EState *estate;

estate = rps_storage->estate;

/* Nothing to do if not FDW */
if (fdw_routine == NULL)
Expand Down
101 changes: 77 additions & 24 deletions src/utility_stmt_hooking.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "access/xact.h"
#include "catalog/namespace.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "commands/trigger.h"
#include "commands/tablecmds.h"
#include "foreign/fdwapi.h"
Expand Down Expand Up @@ -64,10 +65,10 @@ static uint64 PathmanCopyFrom(CopyState cstate,
List *range_table,
bool old_protocol);

static void prepare_rri_for_copy(EState *estate,
ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage,
void *arg);
static void prepare_rri_for_copy(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage);
static void finish_rri_copy(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage);


/*
Expand Down Expand Up @@ -110,12 +111,18 @@ is_pathman_related_copy(Node *parsetree)
/* Analyze options list */
foreach (lc, copy_stmt->options)
{
DefElem *defel = (DefElem *) lfirst(lc);

Assert(IsA(defel, DefElem));
DefElem *defel = lfirst_node(DefElem, lc);

/* We do not support freeze */
if (strcmp(defel->defname, "freeze") == 0)
/*
* It would be great to allow copy.c extract option value and
* check it ready. However, there is no possibility (hooks) to do
* that before messaging 'ok, begin streaming data' to the client,
* which is ugly and confusing: e.g. it would require us to
* actually send something in regression tests before we notice
* the error.
*/
if (strcmp(defel->defname, "freeze") == 0 && defGetBoolean(defel))
elog(ERROR, "freeze is not supported for partitioned tables");
}

Expand Down Expand Up @@ -481,7 +488,6 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,

uint64 processed = 0;


tupDesc = RelationGetDescr(parent_rel);

parent_result_rel = makeNode(ResultRelInfo);
Expand All @@ -499,7 +505,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
/* Initialize ResultPartsStorage */
init_result_parts_storage(&parts_storage, estate, false,
ResultPartsStorageStandard,
prepare_rri_for_copy, NULL);
prepare_rri_for_copy, cstate);
parts_storage.saved_rel_info = parent_result_rel;

/* Set up a tuple slot too */
Expand Down Expand Up @@ -634,13 +640,22 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
/* Check the constraints of the tuple */
if (child_result_rel->ri_RelationDesc->rd_att->constr)
ExecConstraints(child_result_rel, slot, estate);
if (!child_result_rel->ri_FdwRoutine)
{
/* OK, store the tuple and create index entries for it */
simple_heap_insert(child_result_rel->ri_RelationDesc, tuple);

/* OK, store the tuple and create index entries for it */
simple_heap_insert(child_result_rel->ri_RelationDesc, tuple);

if (child_result_rel->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
estate, false, NULL, NIL);
if (child_result_rel->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
estate, false, NULL, NIL);
}
#ifdef PG_SHARDMAN
else /* FDW table */
{
child_result_rel->ri_FdwRoutine->ForeignNextCopyFrom(
estate, child_result_rel, cstate);
}
#endif

/* AFTER ROW INSERT Triggers (FIXME: NULL transition) */
ExecARInsertTriggersCompat(estate, child_result_rel, tuple,
Expand Down Expand Up @@ -678,7 +693,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
ExecResetTupleTable(estate->es_tupleTable, false);

/* Close partitions and destroy hash table */
fini_result_parts_storage(&parts_storage, true);
fini_result_parts_storage(&parts_storage, true, finish_rri_copy);

/* Close parent's indices */
ExecCloseIndices(parent_result_rel);
Expand All @@ -689,20 +704,58 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
}

/*
* COPY FROM does not support FDWs, emit ERROR.
* Init COPY FROM, if supported.
*/
static void
prepare_rri_for_copy(EState *estate,
ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage,
void *arg)
prepare_rri_for_copy(ResultRelInfoHolder *rri_holder,
const ResultPartsStorage *rps_storage)
{
ResultRelInfo *rri = rri_holder->result_rel_info;
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
ResultRelInfo *rri = rri_holder->result_rel_info;
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;

if (fdw_routine != NULL)
{
/*
* If this Postgres has no idea about shardman, behave as usual:
* vanilla Postgres doesn't support COPY FROM to foreign partitions.
* However, shardman patches to core extend FDW API to allow it.
*/
#ifdef PG_SHARDMAN
/* shardman COPY FROM requested? */
if (*find_rendezvous_variable(
"shardman_pathman_copy_from_rendezvous") != NULL &&
FdwCopyFromIsSupported(fdw_routine))
{
CopyState cstate = (CopyState) rps_storage->callback_arg;
ResultRelInfo *parent_rri = rps_storage->saved_rel_info;
EState *estate = rps_storage->estate;

fdw_routine->BeginForeignCopyFrom(estate, rri, cstate, parent_rri);
return;
}
#endif

elog(ERROR, "cannot copy to foreign partition \"%s\"",
get_rel_name(RelationGetRelid(rri->ri_RelationDesc)));
}
}

/*
* Shut down FDWs.
*/
static void
finish_rri_copy(ResultRelInfoHolder *rri_holder,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be passed as a "delete callback". Maybe we should call them rri_init_callback & rri_fini_callback?

const ResultPartsStorage *rps_storage)
{
#ifdef PG_SHARDMAN
ResultRelInfo *resultRelInfo = rri_holder->result_rel_info;

if (resultRelInfo->ri_FdwRoutine)
{
resultRelInfo->ri_FdwRoutine->EndForeignCopyFrom(
rps_storage->estate, resultRelInfo);
}
#endif
}

/*
Expand Down