forked from zhangh43/vectorize_engine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnodeAgg.c
4143 lines (3660 loc) · 127 KB
/
nodeAgg.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*-------------------------------------------------------------------------
*
* nodeAgg.c
* Routines to handle aggregate nodes.
*
* ExecAgg normally evaluates each aggregate in the following steps:
*
* transvalue = initcond
* foreach input_tuple do
* transvalue = transfunc(transvalue, input_value(s))
* result = finalfunc(transvalue, direct_argument(s))
*
* If a finalfunc is not supplied then the result is just the ending
* value of transvalue.
*
* Other behaviors can be selected by the "aggsplit" mode, which exists
* to support partial aggregation. It is possible to:
* * Skip running the finalfunc, so that the output is always the
* final transvalue state.
* * Substitute the combinefunc for the transfunc, so that transvalue
* states (propagated up from a child partial-aggregation step) are merged
* rather than processing raw input rows. (The statements below about
* the transfunc apply equally to the combinefunc, when it's selected.)
* * Apply the serializefunc to the output values (this only makes sense
* when skipping the finalfunc, since the serializefunc works on the
* transvalue data type).
* * Apply the deserializefunc to the input values (this only makes sense
* when using the combinefunc, for similar reasons).
* It is the planner's responsibility to connect up Agg nodes using these
* alternate behaviors in a way that makes sense, with partial aggregation
* results being fed to nodes that expect them.
*
* If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the
* input tuples and eliminate duplicates (if required) before performing
* the above-depicted process. (However, we don't do that for ordered-set
* aggregates; their "ORDER BY" inputs are ordinary aggregate arguments
* so far as this module is concerned.) Note that partial aggregation
* is not supported in these cases, since we couldn't ensure global
* ordering or distinctness of the inputs.
*
* If transfunc is marked "strict" in pg_proc and initcond is NULL,
* then the first non-NULL input_value is assigned directly to transvalue,
* and transfunc isn't applied until the second non-NULL input_value.
* The agg's first input type and transtype must be the same in this case!
*
* If transfunc is marked "strict" then NULL input_values are skipped,
* keeping the previous transvalue. If transfunc is not strict then it
* is called for every input tuple and must deal with NULL initcond
* or NULL input_values for itself.
*
* If finalfunc is marked "strict" then it is not called when the
* ending transvalue is NULL, instead a NULL result is created
* automatically (this is just the usual handling of strict functions,
* of course). A non-strict finalfunc can make its own choice of
* what to return for a NULL ending transvalue.
*
* Ordered-set aggregates are treated specially in one other way: we
* evaluate any "direct" arguments and pass them to the finalfunc along
* with the transition value.
*
* A finalfunc can have additional arguments beyond the transvalue and
* any "direct" arguments, corresponding to the input arguments of the
* aggregate. These are always just passed as NULL. Such arguments may be
* needed to allow resolution of a polymorphic aggregate's result type.
*
* We compute aggregate input expressions and run the transition functions
* in a temporary econtext (aggstate->tmpcontext). This is reset at least
* once per input tuple, so when the transvalue datatype is
* pass-by-reference, we have to be careful to copy it into a longer-lived
* memory context, and free the prior value to avoid memory leakage. We
* store transvalues in another set of econtexts, aggstate->aggcontexts
* (one per grouping set, see below), which are also used for the hashtable
* structures in AGG_HASHED mode. These econtexts are rescanned, not just
* reset, at group boundaries so that aggregate transition functions can
* register shutdown callbacks via AggRegisterCallback.
*
* The node's regular econtext (aggstate->ss.ps.ps_ExprContext) is used to
* run finalize functions and compute the output tuple; this context can be
* reset once per output tuple.
*
* The executor's AggState node is passed as the fmgr "context" value in
* all transfunc and finalfunc calls. It is not recommended that the
* transition functions look at the AggState node directly, but they can
* use AggCheckCallContext() to verify that they are being called by
* nodeAgg.c (and not as ordinary SQL functions). The main reason a
* transition function might want to know this is so that it can avoid
* palloc'ing a fixed-size pass-by-ref transition value on every call:
* it can instead just scribble on and return its left input. Ordinarily
* it is completely forbidden for functions to modify pass-by-ref inputs,
* but in the aggregate case we know the left input is either the initial
* transition value or a previous function result, and in either case its
* value need not be preserved. See int8inc() for an example. Notice that
* advance_transition_function() is coded to avoid a data copy step when
* the previous transition value pointer is returned. It is also possible
* to avoid repeated data copying when the transition value is an expanded
* object: to do that, the transition function must take care to return
* an expanded object that is in a child context of the memory context
* returned by AggCheckCallContext(). Also, some transition functions want
* to store working state in addition to the nominal transition value; they
* can use the memory context returned by AggCheckCallContext() to do that.
*
* Note: AggCheckCallContext() is available as of PostgreSQL 9.0. The
* AggState is available as context in earlier releases (back to 8.1),
* but direct examination of the node is needed to use it before 9.0.
*
* As of 9.4, aggregate transition functions can also use AggGetAggref()
* to get hold of the Aggref expression node for their aggregate call.
* This is mainly intended for ordered-set aggregates, which are not
* supported as window functions. (A regular aggregate function would
* need some fallback logic to use this, since there's no Aggref node
* for a window function.)
*
* Grouping sets:
*
* A list of grouping sets which is structurally equivalent to a ROLLUP
* clause (e.g. (a,b,c), (a,b), (a)) can be processed in a single pass over
* ordered data. We do this by keeping a separate set of transition values
* for each grouping set being concurrently processed; for each input tuple
* we update them all, and on group boundaries we reset those states
* (starting at the front of the list) whose grouping values have changed
* (the list of grouping sets is ordered from most specific to least
* specific).
*
* Where more complex grouping sets are used, we break them down into
* "phases", where each phase has a different sort order. During each
* phase but the last, the input tuples are additionally stored in a
* tuplesort which is keyed to the next phase's sort order; during each
* phase but the first, the input tuples are drawn from the previously
* sorted data. (The sorting of the data for the first phase is handled by
* the planner, as it might be satisfied by underlying nodes.)
*
* From the perspective of aggregate transition and final functions, the
* only issue regarding grouping sets is this: a single call site (flinfo)
* of an aggregate function may be used for updating several different
* transition values in turn. So the function must not cache in the flinfo
* anything which logically belongs as part of the transition value (most
* importantly, the memory context in which the transition value exists).
* The support API functions (AggCheckCallContext, AggRegisterCallback) are
* sensitive to the grouping set for which the aggregate function is
* currently being called.
*
* TODO: AGG_HASHED doesn't support multiple grouping sets yet.
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/executor/nodeAgg.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
#include "miscadmin.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/tlist.h"
#include "parser/parse_agg.h"
#include "parser/parse_coerce.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/tuplesort.h"
#include "utils/datum.h"
/*
* AggStatePerTransData - per aggregate state value information
*
* Working state for updating the aggregate's state value, by calling the
* transition function with an input row. This struct does not store the
* information needed to produce the final aggregate result from the transition
* state, that's stored in AggStatePerAggData instead. This separation allows
* multiple aggregate results to be produced from a single state value.
*/
typedef struct AggStatePerTransData
{
/*
* These values are set up during ExecInitAgg() and do not change
* thereafter:
*/
/*
* Link to an Aggref expr this state value is for.
*
* There can be multiple Aggref's sharing the same state value, as long as
* the inputs and transition function are identical. This points to the
* first one of them.
*/
Aggref *aggref;
/*
* Nominal number of arguments for aggregate function. For plain aggs,
* this excludes any ORDER BY expressions. For ordered-set aggs, this
* counts both the direct and aggregated (ORDER BY) arguments.
*/
int numArguments;
/*
* Number of aggregated input columns. This includes ORDER BY expressions
* in both the plain-agg and ordered-set cases. Ordered-set direct args
* are not counted, though.
*/
int numInputs;
/*
* Number of aggregated input columns to pass to the transfn. This
* includes the ORDER BY columns for ordered-set aggs, but not for plain
* aggs. (This doesn't count the transition state value!)
*/
int numTransInputs;
/* Oid of the state transition or combine function */
Oid transfn_oid;
/* Oid of the serialization function or InvalidOid */
Oid serialfn_oid;
/* Oid of the deserialization function or InvalidOid */
Oid deserialfn_oid;
/* Oid of state value's datatype */
Oid aggtranstype;
/* ExprStates of the FILTER and argument expressions. */
ExprState *aggfilter; /* state of FILTER expression, if any */
List *args; /* states of aggregated-argument expressions */
List *aggdirectargs; /* states of direct-argument expressions */
/*
* fmgr lookup data for transition function or combine function. Note in
* particular that the fn_strict flag is kept here.
*/
FmgrInfo transfn;
/* fmgr lookup data for serialization function */
FmgrInfo serialfn;
/* fmgr lookup data for deserialization function */
FmgrInfo deserialfn;
/* Input collation derived for aggregate */
Oid aggCollation;
/* number of sorting columns */
int numSortCols;
/* number of sorting columns to consider in DISTINCT comparisons */
/* (this is either zero or the same as numSortCols) */
int numDistinctCols;
/* deconstructed sorting information (arrays of length numSortCols) */
AttrNumber *sortColIdx;
Oid *sortOperators;
Oid *sortCollations;
bool *sortNullsFirst;
/*
* fmgr lookup data for input columns' equality operators --- only
* set/used when aggregate has DISTINCT flag. Note that these are in
* order of sort column index, not parameter index.
*/
FmgrInfo *equalfns; /* array of length numDistinctCols */
/*
* initial value from pg_aggregate entry
*/
Datum initValue;
bool initValueIsNull;
/*
* We need the len and byval info for the agg's input and transition data
* types in order to know how to copy/delete values.
*
* Note that the info for the input type is used only when handling
* DISTINCT aggs with just one argument, so there is only one input type.
*/
int16 inputtypeLen,
transtypeLen;
bool inputtypeByVal,
transtypeByVal;
/*
* Stuff for evaluation of inputs. We used to just use ExecEvalExpr, but
* with the addition of ORDER BY we now need at least a slot for passing
* data to the sort object, which requires a tupledesc, so we might as
* well go whole hog and use ExecProject too.
*/
TupleDesc evaldesc; /* descriptor of input tuples */
ProjectionInfo *evalproj; /* projection machinery */
/*
* Slots for holding the evaluated input arguments. These are set up
* during ExecInitAgg() and then used for each input row.
*/
TupleTableSlot *evalslot; /* current input tuple */
TupleTableSlot *uniqslot; /* used for multi-column DISTINCT */
/*
* These values are working state that is initialized at the start of an
* input tuple group and updated for each input tuple.
*
* For a simple (non DISTINCT/ORDER BY) aggregate, we just feed the input
* values straight to the transition function. If it's DISTINCT or
* requires ORDER BY, we pass the input values into a Tuplesort object;
* then at completion of the input tuple group, we scan the sorted values,
* eliminate duplicates if needed, and run the transition function on the
* rest.
*
* We need a separate tuplesort for each grouping set.
*/
Tuplesortstate **sortstates; /* sort objects, if DISTINCT or ORDER BY */
/*
* This field is a pre-initialized FunctionCallInfo struct used for
* calling this aggregate's transfn. We save a few cycles per row by not
* re-initializing the unchanging fields; which isn't much, but it seems
* worth the extra space consumption.
*/
FunctionCallInfoData transfn_fcinfo;
/* Likewise for serialization and deserialization functions */
FunctionCallInfoData serialfn_fcinfo;
FunctionCallInfoData deserialfn_fcinfo;
} AggStatePerTransData;
/*
* AggStatePerAggData - per-aggregate information
*
* This contains the information needed to call the final function, to produce
* a final aggregate result from the state value. If there are multiple
* identical Aggrefs in the query, they can all share the same per-agg data.
*
* These values are set up during ExecInitAgg() and do not change thereafter.
*/
typedef struct AggStatePerAggData
{
/*
* Link to an Aggref expr this state value is for.
*
* There can be multiple identical Aggref's sharing the same per-agg. This
* points to the first one of them.
*/
Aggref *aggref;
/* index to the state value which this agg should use */
int transno;
/* Optional Oid of final function (may be InvalidOid) */
Oid finalfn_oid;
/*
* fmgr lookup data for final function --- only valid when finalfn_oid oid
* is not InvalidOid.
*/
FmgrInfo finalfn;
/*
* Number of arguments to pass to the finalfn. This is always at least 1
* (the transition state value) plus any ordered-set direct args. If the
* finalfn wants extra args then we pass nulls corresponding to the
* aggregated input columns.
*/
int numFinalArgs;
/*
* We need the len and byval info for the agg's result data type in order
* to know how to copy/delete values.
*/
int16 resulttypeLen;
bool resulttypeByVal;
} AggStatePerAggData;
/*
* AggStatePerGroupData - per-aggregate-per-group working state
*
* These values are working state that is initialized at the start of
* an input tuple group and updated for each input tuple.
*
* In AGG_PLAIN and AGG_SORTED modes, we have a single array of these
* structs (pointed to by aggstate->pergroup); we re-use the array for
* each input group, if it's AGG_SORTED mode. In AGG_HASHED mode, the
* hash table contains an array of these structs for each tuple group.
*
* Logically, the sortstate field belongs in this struct, but we do not
* keep it here for space reasons: we don't support DISTINCT aggregates
* in AGG_HASHED mode, so there's no reason to use up a pointer field
* in every entry of the hashtable.
*/
typedef struct AggStatePerGroupData
{
Datum transValue; /* current transition value */
bool transValueIsNull;
bool noTransValue; /* true if transValue not set yet */
/*
* Note: noTransValue initially has the same value as transValueIsNull,
* and if true both are cleared to false at the same time. They are not
* the same though: if transfn later returns a NULL, we want to keep that
* NULL and not auto-replace it with a later input value. Only the first
* non-NULL input will be auto-substituted.
*/
} AggStatePerGroupData;
/*
* AggStatePerPhaseData - per-grouping-set-phase state
*
* Grouping sets are divided into "phases", where a single phase can be
* processed in one pass over the input. If there is more than one phase, then
* at the end of input from the current phase, state is reset and another pass
* taken over the data which has been re-sorted in the mean time.
*
* Accordingly, each phase specifies a list of grouping sets and group clause
* information, plus each phase after the first also has a sort order.
*/
typedef struct AggStatePerPhaseData
{
int numsets; /* number of grouping sets (or 0) */
int *gset_lengths; /* lengths of grouping sets */
Bitmapset **grouped_cols; /* column groupings for rollup */
FmgrInfo *eqfunctions; /* per-grouping-field equality fns */
Agg *aggnode; /* Agg node for phase data */
Sort *sortnode; /* Sort node for input ordering for phase */
} AggStatePerPhaseData;
/*
* To implement hashed aggregation, we need a hashtable that stores a
* representative tuple and an array of AggStatePerGroup structs for each
* distinct set of GROUP BY column values. We compute the hash key from
* the GROUP BY columns.
*/
typedef struct AggHashEntryData *AggHashEntry;
typedef struct AggHashEntryData
{
TupleHashEntryData shared; /* common header for hash table entries */
/* per-aggregate transition status array */
AggStatePerGroupData pergroup[FLEXIBLE_ARRAY_MEMBER];
} AggHashEntryData;
static void initialize_phase(AggState *aggstate, int newphase);
static TupleTableSlot *fetch_input_tuple(AggState *aggstate);
static void initialize_aggregates(AggState *aggstate,
AggStatePerGroup pergroup,
int numReset);
static void advance_transition_function(AggState *aggstate,
AggStatePerTrans pertrans,
AggStatePerGroup pergroupstate);
static void advance_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
static void advance_combine_function(AggState *aggstate,
AggStatePerTrans pertrans,
AggStatePerGroup pergroupstate);
static void combine_aggregates(AggState *aggstate, AggStatePerGroup pergroup);
static void process_ordered_aggregate_single(AggState *aggstate,
AggStatePerTrans pertrans,
AggStatePerGroup pergroupstate);
static void process_ordered_aggregate_multi(AggState *aggstate,
AggStatePerTrans pertrans,
AggStatePerGroup pergroupstate);
static void finalize_aggregate(AggState *aggstate,
AggStatePerAgg peragg,
AggStatePerGroup pergroupstate,
Datum *resultVal, bool *resultIsNull);
static void finalize_partialaggregate(AggState *aggstate,
AggStatePerAgg peragg,
AggStatePerGroup pergroupstate,
Datum *resultVal, bool *resultIsNull);
static void prepare_projection_slot(AggState *aggstate,
TupleTableSlot *slot,
int currentSet);
static void finalize_aggregates(AggState *aggstate,
AggStatePerAgg peragg,
AggStatePerGroup pergroup,
int currentSet);
static TupleTableSlot *project_aggregates(AggState *aggstate);
static Bitmapset *find_unaggregated_cols(AggState *aggstate);
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos);
static void build_hash_table(AggState *aggstate);
static void agg_fill_hash_table(AggState *aggstate);
static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
static void build_pertrans_for_aggref(AggStatePerTrans pertrans,
AggState *aggstate, EState *estate,
Aggref *aggref, Oid aggtransfn, Oid aggtranstype,
Oid aggserialfn, Oid aggdeserialfn,
Datum initValue, bool initValueIsNull,
Oid *inputTypes, int numArguments);
static int find_compatible_peragg(Aggref *newagg, AggState *aggstate,
int lastaggno, List **same_input_transnos);
static int find_compatible_pertrans(AggState *aggstate, Aggref *newagg,
Oid aggtransfn, Oid aggtranstype,
Oid aggserialfn, Oid aggdeserialfn,
Datum initValue, bool initValueIsNull,
List *transnos);
/*-------------------------- Vectorize part of nodeAgg ---------------------------------*/
#include "nodeAgg.h"
#include "execTuples.h"
#include "utils.h"
#include "nodes/extensible.h"
#include "vectorTupleSlot.h"
/* CustomScanMethods */
static Node *CreateVectorAggState(CustomScan *custom_plan);
/* CustomScanExecMethods */
static void BeginVectorAgg(CustomScanState *node, EState *estate, int eflags);
static TupleTableSlot *ExecVectorAgg(CustomScanState *node);
static void EndVectorAgg(CustomScanState *node);
static AggState *VExecInitAgg(Agg *node, EState *estate, int eflags);
static TupleTableSlot *VExecAgg(VectorAggState *node);
static void VExecEndAgg(VectorAggState *node);
static void InitAggResultSlot(VectorAggState *vas, EState *estate);
static void Vadvance_aggregates(AggState *aggstate, AggHashEntry *entries);
static void Vadvance_transition_function(AggState *aggstate,
AggStatePerTrans pertrans,
AggHashEntry *entries);
/* lookup_hash_entry now return a batch of hash entries. */
static AggHashEntry *lookup_hash_entry(AggState *aggstate,
TupleTableSlot *inputslot);
static TupleTableSlot *agg_retrieve_hash_table(VectorAggState *aggstate);
static TupleTableSlot *agg_retrieve_direct(VectorAggState *vas);
static CustomScanMethods vectoragg_scan_methods = {
"vectoragg", /* CustomName */
CreateVectorAggState, /* CreateCustomScanState */
};
static CustomExecMethods vectoragg_exec_methods = {
"vectoragg", /* CustomName */
BeginVectorAgg, /* BeginCustomScan */
ExecVectorAgg, /* ExecCustomScan */
EndVectorAgg, /* EndCustomScan */
NULL, /* ReScanCustomScan */
NULL, /* MarkPosCustomScan */
NULL, /* RestrPosCustomScan */
NULL, /* EstimateDSMCustomScan */
NULL, /* InitializeDSMCustomScan */
NULL, /* InitializeWorkerCustomScan */
NULL, /* ExplainCustomScan */
};
/*
* CreateVectorAggState - A method of CustomScan; that populate a custom
* object being delivered from CustomScanState type, according to the
* supplied CustomPath object.
*
*/
static Node *
CreateVectorAggState(CustomScan *custom_plan)
{
VectorAggState *vas = MemoryContextAllocZero(CurTransactionContext,
sizeof(VectorAggState));
/* Set tag and executor callbacks */
NodeSetTag(vas, T_CustomScanState);
vas->css.methods = &vectoragg_exec_methods;
return (Node *) vas;
}
static void
BeginVectorAgg(CustomScanState *css, EState *estate, int eflags)
{
VectorAggState *vas;
CustomScan *cscan;
Agg *node;
/* clear state initialized in ExecInitCustomScan */
ClearCustomScanState(css);
vas = (VectorAggState*)css;
cscan = (CustomScan *)css->ss.ps.plan;
node = (Agg *)linitial(cscan->custom_plans);
vas->aggstate = VExecInitAgg(node, estate, eflags);
InitAggResultSlot(vas, estate);
vas->css.ss.ps.ps_ResultTupleSlot = vas->aggstate->ss.ps.ps_ResultTupleSlot;
}
static TupleTableSlot *
ExecVectorAgg(CustomScanState *node)
{
return VExecAgg((VectorAggState *)node);
}
static void
EndVectorAgg(CustomScanState *node)
{
VExecEndAgg((VectorAggState *)node);
}
static void
InitAggResultSlot(VectorAggState *vas, EState *estate)
{
VectorTupleSlot *vslot;
TupleDesc vdesc;
int i;
vas->resultSlot = VExecInitExtraTupleSlot(estate);
vslot = (VectorTupleSlot *)vas->resultSlot;
vdesc = vas->aggstate->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor;
ExecSetSlotDescriptor(vas->resultSlot, vdesc);
/* initailize tuple batch */
for (i = 0; i < vdesc->natts; i++)
{
Oid typid;
vtype *column;
typid = vas->resultSlot->tts_tupleDescriptor->attrs[i]->atttypid;
column = buildvtype(typid, BATCHSIZE, vslot->skip);
vas->resultSlot->tts_values[i] = PointerGetDatum(column);
/* tts_isnull not used yet */
vas->resultSlot->tts_isnull[i] = false;
}
}
/*
* Advance each aggregate transition state for one input tuple. The input
* tuple has been stored in tmpcontext->ecxt_outertuple, so that it is
* accessible to ExecEvalExpr. pergroup is the array of per-group structs to
* use (this might be in a hashtable entry).
*
* When called, CurrentMemoryContext should be the per-query context.
*/
static void
Vadvance_aggregates(AggState *aggstate, AggHashEntry *entries)
{
int transno;
int setno = 0;
int numGroupingSets = Max(aggstate->phase->numsets, 1);
int numTrans = aggstate->numtrans;
for (transno = 0; transno < numTrans; transno++)
{
AggStatePerTrans pertrans = &aggstate->pertrans[transno];
ExprState *filter = pertrans->aggfilter;
int numTransInputs = pertrans->numTransInputs;
int i;
TupleTableSlot *slot;
/* Skip anything FILTERed out */
if (filter)
{
Datum res;
bool isnull;
res = ExecEvalExprSwitchContext(filter, aggstate->tmpcontext,
&isnull, NULL);
if (isnull || !DatumGetBool(res))
continue;
}
/* Evaluate the current input expressions for this aggregate */
slot = ExecProject(pertrans->evalproj, NULL);
if (pertrans->numSortCols > 0)
{
/* DISTINCT and/or ORDER BY case */
Assert(slot->tts_nvalid == pertrans->numInputs);
/*
* If the transfn is strict, we want to check for nullity before
* storing the row in the sorter, to save space if there are a lot
* of nulls. Note that we must only check numTransInputs columns,
* not numInputs, since nullity in columns used only for sorting
* is not relevant here.
*/
if (pertrans->transfn.fn_strict)
{
for (i = 0; i < numTransInputs; i++)
{
if (slot->tts_isnull[i])
break;
}
if (i < numTransInputs)
continue;
}
for (setno = 0; setno < numGroupingSets; setno++)
{
/* OK, put the tuple into the tuplesort object */
if (pertrans->numInputs == 1)
tuplesort_putdatum(pertrans->sortstates[setno],
slot->tts_values[0],
slot->tts_isnull[0]);
else
tuplesort_puttupleslot(pertrans->sortstates[setno], slot);
}
}
else
{
/* We can apply the transition function immediately */
FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
/* Load values into fcinfo */
/* Start from 1, since the 0th arg will be the transition value */
Assert(slot->tts_nvalid >= numTransInputs);
for (i = 0; i < numTransInputs; i++)
{
fcinfo->arg[i + 2] = slot->tts_values[i];
fcinfo->argnull[i + 2] = slot->tts_isnull[i];
}
for (setno = 0; setno < numGroupingSets; setno++)
{
//AggStatePerGroup pergroupstate = &pergroup[transno + (setno * numTrans)];
int groupOffset = offsetof(AggHashEntryData, pergroup) + 2 * sizeof(AggStatePerGroup) * (transno + (setno * numTrans));
aggstate->current_set = setno;
fcinfo->arg[1] = Int32GetDatum(groupOffset);
fcinfo->argnull[1] = false;
Vadvance_transition_function(aggstate, pertrans, entries);
}
}
}
}
/*
* Given new input value(s), advance the transition function of one aggregate
* state within one grouping set only (already set in aggstate->current_set)
*
* The new values (and null flags) have been preloaded into argument positions
* 1 and up in pertrans->transfn_fcinfo, so that we needn't copy them again to
* pass to the transition function. We also expect that the static fields of
* the fcinfo are already initialized; that was done by ExecInitAgg().
*
* It doesn't matter which memory context this is called in.
*/
static void
Vadvance_transition_function(AggState *aggstate,
AggStatePerTrans pertrans,
AggHashEntry *entries)
{
FunctionCallInfo fcinfo = &pertrans->transfn_fcinfo;
MemoryContext oldContext;
if (pertrans->transfn.fn_strict)
{
/*
* For a strict transfn, nothing happens when there's a NULL input; we
* just keep the prior transValue.
*/
int numTransInputs = pertrans->numTransInputs;
int i;
for (i = 1; i <= numTransInputs; i++)
{
if (fcinfo->argnull[i])
return;
}
}
/* We run the transition functions in per-input-tuple memory context */
oldContext = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory);
/* set up aggstate->curpertrans for AggGetAggref() */
aggstate->curpertrans = pertrans;
/*
* OK to call the transition function
*/
fcinfo->arg[0] = PointerGetDatum(entries);
fcinfo->argnull[0] = false;
fcinfo->isnull = false; /* just in case transfn doesn't set it */
FunctionCallInvoke(fcinfo);
aggstate->curpertrans = NULL;
MemoryContextSwitchTo(oldContext);
}
/*
* Interface to get the custom scan plan for vector scan
*/
CustomScan *
MakeCustomScanForAgg(void)
{
CustomScan *cscan = (CustomScan *)makeNode(CustomScan);
cscan->methods = &vectoragg_scan_methods;
return cscan;
}
/*
* Initialize vectoragg CustomScan node.
*/
void
InitVectorAgg(void)
{
/* Register a vscan type of custom scan node */
RegisterCustomScanMethods(&vectoragg_scan_methods);
}
/*---------------------- End of vectorized part of nodeAgg ---------------------------*/
/*
* Switch to phase "newphase", which must either be 0 (to reset) or
* current_phase + 1. Juggle the tuplesorts accordingly.
*/
static void
initialize_phase(AggState *aggstate, int newphase)
{
Assert(newphase == 0 || newphase == aggstate->current_phase + 1);
/*
* Whatever the previous state, we're now done with whatever input
* tuplesort was in use.
*/
if (aggstate->sort_in)
{
tuplesort_end(aggstate->sort_in);
aggstate->sort_in = NULL;
}
if (newphase == 0)
{
/*
* Discard any existing output tuplesort.
*/
if (aggstate->sort_out)
{
tuplesort_end(aggstate->sort_out);
aggstate->sort_out = NULL;
}
}
else
{
/*
* The old output tuplesort becomes the new input one, and this is the
* right time to actually sort it.
*/
aggstate->sort_in = aggstate->sort_out;
aggstate->sort_out = NULL;
Assert(aggstate->sort_in);
tuplesort_performsort(aggstate->sort_in);
}
/*
* If this isn't the last phase, we need to sort appropriately for the
* next phase in sequence.
*/
if (newphase < aggstate->numphases - 1)
{
Sort *sortnode = aggstate->phases[newphase + 1].sortnode;
PlanState *outerNode = outerPlanState(aggstate);
TupleDesc tupDesc = ExecGetResultType(outerNode);
aggstate->sort_out = tuplesort_begin_heap(tupDesc,
sortnode->numCols,
sortnode->sortColIdx,
sortnode->sortOperators,
sortnode->collations,
sortnode->nullsFirst,
work_mem,
false);
}
aggstate->current_phase = newphase;
aggstate->phase = &aggstate->phases[newphase];
}
/*
* Fetch a tuple from either the outer plan (for phase 0) or from the sorter
* populated by the previous phase. Copy it to the sorter for the next phase
* if any.
*/
static TupleTableSlot *
fetch_input_tuple(AggState *aggstate)
{
TupleTableSlot *slot;
if (aggstate->sort_in)
{
if (!tuplesort_gettupleslot(aggstate->sort_in, true, aggstate->sort_slot,
NULL))
return NULL;
slot = aggstate->sort_slot;
}
else
slot = ExecProcNode(outerPlanState(aggstate));
if (!TupIsNull(slot) && aggstate->sort_out)
tuplesort_puttupleslot(aggstate->sort_out, slot);
return slot;
}
/*
* (Re)Initialize an individual aggregate.
*
* This function handles only one grouping set (already set in
* aggstate->current_set).
*
* When called, CurrentMemoryContext should be the per-query context.
*/
static void
initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
AggStatePerGroup pergroupstate)
{
/*
* Start a fresh sort operation for each DISTINCT/ORDER BY aggregate.
*/
if (pertrans->numSortCols > 0)
{
/*
* In case of rescan, maybe there could be an uncompleted sort
* operation? Clean it up if so.
*/
if (pertrans->sortstates[aggstate->current_set])
tuplesort_end(pertrans->sortstates[aggstate->current_set]);
/*
* We use a plain Datum sorter when there's a single input column;
* otherwise sort the full tuple. (See comments for
* process_ordered_aggregate_single.)
*/
if (pertrans->numInputs == 1)
pertrans->sortstates[aggstate->current_set] =
tuplesort_begin_datum(pertrans->evaldesc->attrs[0]->atttypid,
pertrans->sortOperators[0],
pertrans->sortCollations[0],
pertrans->sortNullsFirst[0],
work_mem, false);
else
pertrans->sortstates[aggstate->current_set] =
tuplesort_begin_heap(pertrans->evaldesc,
pertrans->numSortCols,
pertrans->sortColIdx,
pertrans->sortOperators,
pertrans->sortCollations,
pertrans->sortNullsFirst,
work_mem, false);
}
/*
* (Re)set transValue to the initial value.
*
* Note that when the initial value is pass-by-ref, we must copy it (into
* the aggcontext) since we will pfree the transValue later.
*/
if (pertrans->initValueIsNull)
pergroupstate->transValue = pertrans->initValue;
else
{
MemoryContext oldContext;
oldContext = MemoryContextSwitchTo(
aggstate->aggcontexts[aggstate->current_set]->ecxt_per_tuple_memory);
pergroupstate->transValue = datumCopy(pertrans->initValue,
pertrans->transtypeByVal,
pertrans->transtypeLen);
MemoryContextSwitchTo(oldContext);
}
pergroupstate->transValueIsNull = pertrans->initValueIsNull;
/*
* If the initial value for the transition state doesn't exist in the
* pg_aggregate table then we will let the first non-NULL value returned
* from the outer procNode become the initial value. (This is useful for
* aggregates like max() and min().) The noTransValue flag signals that we
* still need to do this.
*/
pergroupstate->noTransValue = pertrans->initValueIsNull;
}
/*
* Initialize all aggregate transition states for a new group of input values.
*
* If there are multiple grouping sets, we initialize only the first numReset
* of them (the grouping sets are ordered so that the most specific one, which
* is reset most often, is first). As a convenience, if numReset is < 1, we
* reinitialize all sets.
*