2424#include "utils/guc.h"
2525#include "utils/rel.h"
2626
27+
28+ #define MTHackField (mt_state , field ) ( (mt_state)->field )
29+
30+
2731bool pg_pathman_enable_partition_router = true;
2832
2933CustomScanMethods partition_router_plan_methods ;
3034CustomExecMethods partition_router_exec_methods ;
3135
3236
37+ /* FIXME: replace this magic with a CustomScan */
38+ static ExecProcNodeMtd mt_method = NULL ;
39+
40+
41+ static TupleTableSlot * router_run_modify_table (PlanState * state );
42+
43+ static TupleTableSlot * router_set_slot (PartitionRouterState * state ,
44+ TupleTableSlot * slot ,
45+ CmdType operation );
46+ static TupleTableSlot * router_get_slot (PartitionRouterState * state ,
47+ bool * should_process );
48+
3349static void router_lazy_init_junkfilter (PartitionRouterState * state , EState * estate );
3450static void router_lazy_init_constraint (PartitionRouterState * state );
3551
@@ -110,6 +126,7 @@ prepare_modify_table_for_partition_router(PlanState *state, void *context)
110126 if (IsA (state , ModifyTableState ))
111127 {
112128 ModifyTableState * mt_state = (ModifyTableState * ) state ;
129+ bool changed_method = false;
113130 int i ;
114131
115132 for (i = 0 ; i < mt_state -> mt_nplans ; i ++ )
@@ -121,8 +138,19 @@ prepare_modify_table_for_partition_router(PlanState *state, void *context)
121138 if (IsPartitionFilterState (pf_state ) &&
122139 IsPartitionRouterState (pr_state = linitial (pf_state -> custom_ps )))
123140 {
124- /* HACK: PartitionRouter might change ModifyTable's state */
141+ /* HACK: point to ModifyTable in PartitionRouter */
125142 pr_state -> mt_state = mt_state ;
143+
144+ if (!changed_method )
145+ {
146+ if (!mt_method )
147+ mt_method = state -> ExecProcNodeReal ;
148+
149+ /* HACK: replace ModifyTable's execution method */
150+ ExecSetExecProcNode (state , router_run_modify_table );
151+
152+ changed_method = true;
153+ }
126154 }
127155 }
128156 }
@@ -166,17 +194,18 @@ partition_router_begin(CustomScanState *node, EState *estate, int eflags)
166194TupleTableSlot *
167195partition_router_exec (CustomScanState * node )
168196{
169- EState * estate = node -> ss .ps .state ;
170- PlanState * child_ps = (PlanState * ) linitial ( node -> custom_ps ) ;
171- PartitionRouterState * state = ( PartitionRouterState * ) node ;
172- TupleTableSlot * slot ;
197+ EState * estate = node -> ss .ps .state ;
198+ PartitionRouterState * state = (PartitionRouterState * ) node ;
199+ TupleTableSlot * slot ;
200+ bool should_process ;
173201
174202take_next_tuple :
175- /* execute PartitionFilter child node */
176- slot = ExecProcNode ( child_ps );
203+ /* Get next tuple for processing */
204+ slot = router_get_slot ( state , & should_process );
177205
178- if (! TupIsNull ( slot ) )
206+ if (should_process )
179207 {
208+ CmdType new_cmd ;
180209 bool deleted ;
181210 ItemPointerData ctid ;
182211
@@ -203,13 +232,14 @@ partition_router_exec(CustomScanState *node)
203232 if (TupIsNull (slot ))
204233 goto take_next_tuple ;
205234
206- /* HACK: change command type in ModifyTable */
207- state -> mt_state -> operation = deleted ? CMD_INSERT : CMD_UPDATE ;
235+ /* Should we use UPDATE or DELETE + INSERT? */
236+ new_cmd = deleted ? CMD_INSERT : CMD_UPDATE ;
208237
209- return slot ;
238+ /* Alter ModifyTable's state and return */
239+ return router_set_slot (state , slot , new_cmd );
210240 }
211241
212- return NULL ;
242+ return slot ;
213243}
214244
215245void
@@ -218,15 +248,20 @@ partition_router_end(CustomScanState *node)
218248 PartitionRouterState * state = (PartitionRouterState * ) node ;
219249
220250 Assert (list_length (node -> custom_ps ) == 1 );
221- EvalPlanQualEnd (& state -> epqstate );
222251 ExecEndNode ((PlanState * ) linitial (node -> custom_ps ));
252+
253+ EvalPlanQualEnd (& state -> epqstate );
223254}
224255
225256void
226257partition_router_rescan (CustomScanState * node )
227258{
259+ PartitionRouterState * state = (PartitionRouterState * ) node ;
260+
228261 Assert (list_length (node -> custom_ps ) == 1 );
229262 ExecReScan ((PlanState * ) linitial (node -> custom_ps ));
263+
264+ state -> saved_slot = NULL ;
230265}
231266
232267void
@@ -236,6 +271,87 @@ partition_router_explain(CustomScanState *node, List *ancestors, ExplainState *e
236271}
237272
238273
274+ static TupleTableSlot *
275+ router_run_modify_table (PlanState * state )
276+ {
277+ ModifyTableState * mt_state ;
278+ TupleTableSlot * slot ;
279+ int mt_plans_old ,
280+ mt_plans_new ;
281+
282+ mt_state = (ModifyTableState * ) state ;
283+
284+ mt_plans_old = MTHackField (mt_state , mt_nplans );
285+
286+ /* Fetch next tuple */
287+ slot = mt_method (state );
288+
289+ mt_plans_new = MTHackField (mt_state , mt_nplans );
290+
291+ /* PartitionRouter asked us to restart */
292+ if (mt_plans_new != mt_plans_old )
293+ {
294+ int state_idx = mt_state -> mt_whichplan - 1 ;
295+
296+ /* HACK: partially restore ModifyTable's state */
297+ MTHackField (mt_state , mt_done ) = false;
298+ MTHackField (mt_state , mt_nplans ) = mt_plans_old ;
299+ MTHackField (mt_state , mt_whichplan ) = state_idx ;
300+
301+ /* Restart ModifyTable */
302+ return mt_method (state );
303+ }
304+
305+ return slot ;
306+ }
307+
308+ static TupleTableSlot *
309+ router_set_slot (PartitionRouterState * state ,
310+ TupleTableSlot * slot ,
311+ CmdType operation )
312+ {
313+ ModifyTableState * mt_state = state -> mt_state ;
314+
315+ Assert (!TupIsNull (slot ));
316+
317+ if (mt_state -> operation == operation )
318+ return slot ;
319+
320+ /* HACK: alter ModifyTable's state */
321+ MTHackField (mt_state , mt_nplans ) = - mt_state -> mt_whichplan ;
322+ MTHackField (mt_state , operation ) = operation ;
323+
324+ /* Set saved_slot and yield */
325+ state -> saved_slot = slot ;
326+ return NULL ;
327+ }
328+
329+ static TupleTableSlot *
330+ router_get_slot (PartitionRouterState * state ,
331+ bool * should_process )
332+ {
333+ TupleTableSlot * slot ;
334+
335+ if (!TupIsNull (state -> saved_slot ))
336+ {
337+ /* Reset saved_slot */
338+ slot = state -> saved_slot ;
339+ state -> saved_slot = NULL ;
340+
341+ /* We shouldn't process preserved slot... */
342+ * should_process = false;
343+ }
344+ else
345+ {
346+ slot = ExecProcNode ((PlanState * ) linitial (state -> css .custom_ps ));
347+
348+ /* But we have to process non-empty slot */
349+ * should_process = !TupIsNull (slot );
350+ }
351+
352+ return slot ;
353+ }
354+
239355static void
240356router_lazy_init_junkfilter (PartitionRouterState * state , EState * estate )
241357{
0 commit comments