Skip to content

Commit ee03666

Browse files
authoredDec 1, 2017
Merge pull request #136 from arssher/foreign_copy_from
Foreign copy from for pg_shardman (PGPRO)
2 parents c963933 + d7520bb commit ee03666

File tree

3 files changed

+121
-78
lines changed

3 files changed

+121
-78
lines changed
 

‎src/include/partition_filter.h

+7-9
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,15 @@ typedef struct
4343
} ResultRelInfoHolder;
4444

4545

46-
/* Forward declaration (for on_new_rri_holder()) */
46+
/* Forward declaration (for on_rri_holder()) */
4747
struct ResultPartsStorage;
4848
typedef struct ResultPartsStorage ResultPartsStorage;
4949

5050
/*
51-
* Callback to be fired at rri_holder creation.
51+
* Callback to be fired at rri_holder creation/destruction.
5252
*/
53-
typedef void (*on_new_rri_holder)(EState *estate,
54-
ResultRelInfoHolder *rri_holder,
55-
const ResultPartsStorage *rps_storage,
56-
void *arg);
53+
typedef void (*on_rri_holder)(ResultRelInfoHolder *rri_holder,
54+
const ResultPartsStorage *rps_storage);
5755

5856
/*
5957
* Cached ResultRelInfos of partitions.
@@ -66,7 +64,7 @@ struct ResultPartsStorage
6664

6765
bool speculative_inserts; /* for ExecOpenIndices() */
6866

69-
on_new_rri_holder on_new_rri_holder_callback;
67+
on_rri_holder on_new_rri_holder_callback;
7068
void *callback_arg;
7169

7270
EState *estate; /* pointer to executor's state */
@@ -116,11 +114,11 @@ void init_result_parts_storage(ResultPartsStorage *parts_storage,
116114
EState *estate,
117115
bool speculative_inserts,
118116
Size table_entry_size,
119-
on_new_rri_holder on_new_rri_holder_cb,
117+
on_rri_holder on_new_rri_holder_cb,
120118
void *on_new_rri_holder_cb_arg);
121119

122120
void fini_result_parts_storage(ResultPartsStorage *parts_storage,
123-
bool close_rels);
121+
bool close_rels, on_rri_holder hook);
124122

125123
ResultRelInfoHolder * scan_result_parts_storage(Oid partid,
126124
ResultPartsStorage *storage);

‎src/partition_filter.c

+37-45
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,12 @@ CustomScanMethods partition_filter_plan_methods;
6868
CustomExecMethods partition_filter_exec_methods;
6969

7070

71-
static void prepare_rri_for_insert(EState *estate,
72-
ResultRelInfoHolder *rri_holder,
73-
const ResultPartsStorage *rps_storage,
74-
void *arg);
75-
static void prepare_rri_returning_for_insert(EState *estate,
76-
ResultRelInfoHolder *rri_holder,
77-
const ResultPartsStorage *rps_storage,
78-
void *arg);
79-
static void prepare_rri_fdw_for_insert(EState *estate,
80-
ResultRelInfoHolder *rri_holder,
81-
const ResultPartsStorage *rps_storage,
82-
void *arg);
71+
static void prepare_rri_for_insert(ResultRelInfoHolder *rri_holder,
72+
const ResultPartsStorage *rps_storage);
73+
static void prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder,
74+
const ResultPartsStorage *rps_storage);
75+
static void prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder,
76+
const ResultPartsStorage *rps_storage);
8377
static Node *fix_returning_list_mutator(Node *node, void *state);
8478

8579
static Index append_rte_to_estate(EState *estate, RangeTblEntry *rte);
@@ -143,7 +137,7 @@ init_result_parts_storage(ResultPartsStorage *parts_storage,
143137
EState *estate,
144138
bool speculative_inserts,
145139
Size table_entry_size,
146-
on_new_rri_holder on_new_rri_holder_cb,
140+
on_rri_holder on_new_rri_holder_cb,
147141
void *on_new_rri_holder_cb_arg)
148142
{
149143
HASHCTL *result_rels_table_config = &parts_storage->result_rels_table_config;
@@ -177,16 +171,21 @@ init_result_parts_storage(ResultPartsStorage *parts_storage,
177171

178172
/* Free ResultPartsStorage (close relations etc) */
179173
void
180-
fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels)
174+
fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels,
175+
on_rri_holder hook)
181176
{
182177
HASH_SEQ_STATUS stat;
183178
ResultRelInfoHolder *rri_holder; /* ResultRelInfo holder */
184179

185-
/* Close partitions and free free conversion-related stuff */
186-
if (close_rels)
180+
hash_seq_init(&stat, parts_storage->result_rels_table);
181+
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
187182
{
188-
hash_seq_init(&stat, parts_storage->result_rels_table);
189-
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
183+
/* Call destruction hook, if needed */
184+
if (hook != NULL)
185+
hook(rri_holder, parts_storage);
186+
187+
/* Close partitions and free free conversion-related stuff */
188+
if (close_rels)
190189
{
191190
ExecCloseIndices(rri_holder->result_rel_info);
192191

@@ -202,13 +201,8 @@ fini_result_parts_storage(ResultPartsStorage *parts_storage, bool close_rels)
202201

203202
free_conversion_map(rri_holder->tuple_map);
204203
}
205-
}
206-
207-
/* Else just free conversion-related stuff */
208-
else
209-
{
210-
hash_seq_init(&stat, parts_storage->result_rels_table);
211-
while ((rri_holder = (ResultRelInfoHolder *) hash_seq_search(&stat)) != NULL)
204+
/* Else just free conversion-related stuff */
205+
else
212206
{
213207
/* Skip if there's no map */
214208
if (!rri_holder->tuple_map)
@@ -329,10 +323,8 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
329323

330324
/* Call on_new_rri_holder_callback() if needed */
331325
if (parts_storage->on_new_rri_holder_callback)
332-
parts_storage->on_new_rri_holder_callback(parts_storage->estate,
333-
rri_holder,
334-
parts_storage,
335-
parts_storage->callback_arg);
326+
parts_storage->on_new_rri_holder_callback(rri_holder,
327+
parts_storage);
336328

337329
/* Finally append ResultRelInfo to storage->es_alloc_result_rels */
338330
append_rri_to_estate(parts_storage->estate, child_result_rel_info);
@@ -702,7 +694,7 @@ partition_filter_end(CustomScanState *node)
702694
PartitionFilterState *state = (PartitionFilterState *) node;
703695

704696
/* Executor will close rels via estate->es_result_relations */
705-
fini_result_parts_storage(&state->result_parts, false);
697+
fini_result_parts_storage(&state->result_parts, false, NULL);
706698

707699
Assert(list_length(node->custom_ps) == 1);
708700
ExecEndNode((PlanState *) linitial(node->custom_ps));
@@ -793,34 +785,33 @@ pfilter_build_tlist(Relation parent_rel, List *tlist)
793785

794786
/* Main trigger */
795787
static void
796-
prepare_rri_for_insert(EState *estate,
797-
ResultRelInfoHolder *rri_holder,
798-
const ResultPartsStorage *rps_storage,
799-
void *arg)
788+
prepare_rri_for_insert(ResultRelInfoHolder *rri_holder,
789+
const ResultPartsStorage *rps_storage)
800790
{
801-
prepare_rri_returning_for_insert(estate, rri_holder, rps_storage, arg);
802-
prepare_rri_fdw_for_insert(estate, rri_holder, rps_storage, arg);
791+
prepare_rri_returning_for_insert(rri_holder, rps_storage);
792+
prepare_rri_fdw_for_insert(rri_holder, rps_storage);
803793
}
804794

805795
/* Prepare 'RETURNING *' tlist & projection */
806796
static void
807-
prepare_rri_returning_for_insert(EState *estate,
808-
ResultRelInfoHolder *rri_holder,
809-
const ResultPartsStorage *rps_storage,
810-
void *arg)
797+
prepare_rri_returning_for_insert(ResultRelInfoHolder *rri_holder,
798+
const ResultPartsStorage *rps_storage)
811799
{
812800
PartitionFilterState *pfstate;
813801
List *returning_list;
814802
ResultRelInfo *child_rri,
815803
*parent_rri;
816804
Index parent_rt_idx;
817805
TupleTableSlot *result_slot;
806+
EState *estate;
807+
808+
estate = rps_storage->estate;
818809

819810
/* We don't need to do anything ff there's no map */
820811
if (!rri_holder->tuple_map)
821812
return;
822813

823-
pfstate = (PartitionFilterState *) arg;
814+
pfstate = (PartitionFilterState *) rps_storage->callback_arg;
824815
returning_list = pfstate->returning_list;
825816

826817
/* Exit if there's no RETURNING list */
@@ -857,14 +848,15 @@ prepare_rri_returning_for_insert(EState *estate,
857848

858849
/* Prepare FDW access structs */
859850
static void
860-
prepare_rri_fdw_for_insert(EState *estate,
861-
ResultRelInfoHolder *rri_holder,
862-
const ResultPartsStorage *rps_storage,
863-
void *arg)
851+
prepare_rri_fdw_for_insert(ResultRelInfoHolder *rri_holder,
852+
const ResultPartsStorage *rps_storage)
864853
{
865854
ResultRelInfo *rri = rri_holder->result_rel_info;
866855
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
867856
Oid partid;
857+
EState *estate;
858+
859+
estate = rps_storage->estate;
868860

869861
/* Nothing to do if not FDW */
870862
if (fdw_routine == NULL)

‎src/utility_stmt_hooking.c

+77-24
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "access/xact.h"
2323
#include "catalog/namespace.h"
2424
#include "commands/copy.h"
25+
#include "commands/defrem.h"
2526
#include "commands/trigger.h"
2627
#include "commands/tablecmds.h"
2728
#include "foreign/fdwapi.h"
@@ -64,10 +65,10 @@ static uint64 PathmanCopyFrom(CopyState cstate,
6465
List *range_table,
6566
bool old_protocol);
6667

67-
static void prepare_rri_for_copy(EState *estate,
68-
ResultRelInfoHolder *rri_holder,
69-
const ResultPartsStorage *rps_storage,
70-
void *arg);
68+
static void prepare_rri_for_copy(ResultRelInfoHolder *rri_holder,
69+
const ResultPartsStorage *rps_storage);
70+
static void finish_rri_copy(ResultRelInfoHolder *rri_holder,
71+
const ResultPartsStorage *rps_storage);
7172

7273

7374
/*
@@ -110,12 +111,18 @@ is_pathman_related_copy(Node *parsetree)
110111
/* Analyze options list */
111112
foreach (lc, copy_stmt->options)
112113
{
113-
DefElem *defel = (DefElem *) lfirst(lc);
114-
115-
Assert(IsA(defel, DefElem));
114+
DefElem *defel = lfirst_node(DefElem, lc);
116115

117116
/* We do not support freeze */
118-
if (strcmp(defel->defname, "freeze") == 0)
117+
/*
118+
* It would be great to allow copy.c extract option value and
119+
* check it ready. However, there is no possibility (hooks) to do
120+
* that before messaging 'ok, begin streaming data' to the client,
121+
* which is ugly and confusing: e.g. it would require us to
122+
* actually send something in regression tests before we notice
123+
* the error.
124+
*/
125+
if (strcmp(defel->defname, "freeze") == 0 && defGetBoolean(defel))
119126
elog(ERROR, "freeze is not supported for partitioned tables");
120127
}
121128

@@ -481,7 +488,6 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
481488

482489
uint64 processed = 0;
483490

484-
485491
tupDesc = RelationGetDescr(parent_rel);
486492

487493
parent_result_rel = makeNode(ResultRelInfo);
@@ -499,7 +505,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
499505
/* Initialize ResultPartsStorage */
500506
init_result_parts_storage(&parts_storage, estate, false,
501507
ResultPartsStorageStandard,
502-
prepare_rri_for_copy, NULL);
508+
prepare_rri_for_copy, cstate);
503509
parts_storage.saved_rel_info = parent_result_rel;
504510

505511
/* Set up a tuple slot too */
@@ -634,13 +640,22 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
634640
/* Check the constraints of the tuple */
635641
if (child_result_rel->ri_RelationDesc->rd_att->constr)
636642
ExecConstraints(child_result_rel, slot, estate);
643+
if (!child_result_rel->ri_FdwRoutine)
644+
{
645+
/* OK, store the tuple and create index entries for it */
646+
simple_heap_insert(child_result_rel->ri_RelationDesc, tuple);
637647

638-
/* OK, store the tuple and create index entries for it */
639-
simple_heap_insert(child_result_rel->ri_RelationDesc, tuple);
640-
641-
if (child_result_rel->ri_NumIndices > 0)
642-
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
643-
estate, false, NULL, NIL);
648+
if (child_result_rel->ri_NumIndices > 0)
649+
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
650+
estate, false, NULL, NIL);
651+
}
652+
#ifdef PG_SHARDMAN
653+
else /* FDW table */
654+
{
655+
child_result_rel->ri_FdwRoutine->ForeignNextCopyFrom(
656+
estate, child_result_rel, cstate);
657+
}
658+
#endif
644659

645660
/* AFTER ROW INSERT Triggers (FIXME: NULL transition) */
646661
ExecARInsertTriggersCompat(estate, child_result_rel, tuple,
@@ -678,7 +693,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
678693
ExecResetTupleTable(estate->es_tupleTable, false);
679694

680695
/* Close partitions and destroy hash table */
681-
fini_result_parts_storage(&parts_storage, true);
696+
fini_result_parts_storage(&parts_storage, true, finish_rri_copy);
682697

683698
/* Close parent's indices */
684699
ExecCloseIndices(parent_result_rel);
@@ -689,20 +704,58 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
689704
}
690705

691706
/*
692-
* COPY FROM does not support FDWs, emit ERROR.
707+
* Init COPY FROM, if supported.
693708
*/
694709
static void
695-
prepare_rri_for_copy(EState *estate,
696-
ResultRelInfoHolder *rri_holder,
697-
const ResultPartsStorage *rps_storage,
698-
void *arg)
710+
prepare_rri_for_copy(ResultRelInfoHolder *rri_holder,
711+
const ResultPartsStorage *rps_storage)
699712
{
700-
ResultRelInfo *rri = rri_holder->result_rel_info;
701-
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
713+
ResultRelInfo *rri = rri_holder->result_rel_info;
714+
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
702715

703716
if (fdw_routine != NULL)
717+
{
718+
/*
719+
* If this Postgres has no idea about shardman, behave as usual:
720+
* vanilla Postgres doesn't support COPY FROM to foreign partitions.
721+
* However, shardman patches to core extend FDW API to allow it.
722+
*/
723+
#ifdef PG_SHARDMAN
724+
/* shardman COPY FROM requested? */
725+
if (*find_rendezvous_variable(
726+
"shardman_pathman_copy_from_rendezvous") != NULL &&
727+
FdwCopyFromIsSupported(fdw_routine))
728+
{
729+
CopyState cstate = (CopyState) rps_storage->callback_arg;
730+
ResultRelInfo *parent_rri = rps_storage->saved_rel_info;
731+
EState *estate = rps_storage->estate;
732+
733+
fdw_routine->BeginForeignCopyFrom(estate, rri, cstate, parent_rri);
734+
return;
735+
}
736+
#endif
737+
704738
elog(ERROR, "cannot copy to foreign partition \"%s\"",
705739
get_rel_name(RelationGetRelid(rri->ri_RelationDesc)));
740+
}
741+
}
742+
743+
/*
744+
* Shut down FDWs.
745+
*/
746+
static void
747+
finish_rri_copy(ResultRelInfoHolder *rri_holder,
748+
const ResultPartsStorage *rps_storage)
749+
{
750+
#ifdef PG_SHARDMAN
751+
ResultRelInfo *resultRelInfo = rri_holder->result_rel_info;
752+
753+
if (resultRelInfo->ri_FdwRoutine)
754+
{
755+
resultRelInfo->ri_FdwRoutine->EndForeignCopyFrom(
756+
rps_storage->estate, resultRelInfo);
757+
}
758+
#endif
706759
}
707760

708761
/*

0 commit comments

Comments
 (0)
Please sign in to comment.