Skip to content

Commit

Permalink
Adjust CopyAggregation aggregators to have the option of not freeing …
Browse files Browse the repository at this point in the history
…buffers on flush (chapel-lang#26681)

Before this PR, CopyAggregation aggregators always free the buffers on a
`flush`, because that function was written primarily for the case of
freeing the aggregator, in which case freeing the buffers immediately
avoids the need to do it later.

However, in some application use cases, it's useful to keep aggregators
around for longer to avoid overheads of creating and destroying
aggregators. It might be necessary to call 'flush' on SrcAggregator so
that the appropriate data is loaded, but that aggregator might be used
again, so it's not helpful to free the buffers.

This PR adjusts 'flush' to accept an argument to indicate if the buffers
should be freed or not. In my opinion, it would make more sense for the
default for `flush` to be to not free the buffer. However, this PR does
not change that, to avoid creating performance noise for any existing
uses of this function (as in Arkouda).

This change was suggested by @ronawho.

Reviewed by @stonea - thanks!

- [x] full comm=none testing
- [x] full comm=gasnet oversubscribed testing
  • Loading branch information
mppf authored Feb 11, 2025
2 parents c202f8b + 345b345 commit d615b0e
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 15 deletions.
20 changes: 10 additions & 10 deletions modules/packages/CopyAggregation.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ module CopyAggregation {
if aggregate then agg.copy(dst, srcVal);
else dst = srcVal;
}
inline proc ref flush() {
if aggregate then agg.flush();
inline proc ref flush(freeBuffers=false) {
if aggregate then agg.flush(freeBuffers=freeBuffers);
}
}

Expand All @@ -134,8 +134,8 @@ module CopyAggregation {
if aggregate then agg.copy(dst, src);
else dst = src;
}
inline proc ref flush() {
if aggregate then agg.flush();
inline proc ref flush(freeBuffers=false) {
if aggregate then agg.flush(freeBuffers=freeBuffers);
}
}

Expand All @@ -162,18 +162,18 @@ module CopyAggregation {
}

proc ref deinit() {
flush();
flush(freeBuffers=true);
for loc in myLocaleSpace {
deallocate(lBuffers[loc]);
}
deallocate(lBuffers);
deallocate(bufferIdxs);
}

proc ref flush() {
proc ref flush(freeBuffers: bool) {
for offsetLoc in myLocaleSpace + lastLocale {
const loc = offsetLoc % numLocales;
_flushBuffer(loc, bufferIdxs[loc], freeData=true);
_flushBuffer(loc, bufferIdxs[loc], freeData=freeBuffers);
}
}

Expand Down Expand Up @@ -263,7 +263,7 @@ module CopyAggregation {
}

proc ref deinit() {
flush();
flush(freeBuffers=true);
for loc in myLocaleSpace {
deallocate(dstAddrs[loc]);
deallocate(lSrcAddrs[loc]);
Expand All @@ -273,10 +273,10 @@ module CopyAggregation {
deallocate(bufferIdxs);
}

proc ref flush() {
proc ref flush(freeBuffers: bool) {
for offsetLoc in myLocaleSpace + lastLocale {
const loc = offsetLoc % numLocales;
_flushBuffer(loc, bufferIdxs[loc], freeData=true);
_flushBuffer(loc, bufferIdxs[loc], freeData=freeBuffers);
}
}

Expand Down
27 changes: 22 additions & 5 deletions test/library/packages/CopyAggregation/test-copy-agg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ use CopyAggregation;
use BlockDist;
use RangeChunk;

config const freeBuffers = true;
config const n = 1_000_000;

proc main() {
assert(n > 2); // this test assumes this

const nTasksPerLocale =
if dataParTasksPerLocale > 0
then dataParTasksPerLocale
else here.maxTaskPar;

// store numbers in forward order in Fwd
var Fwd = blockDist.createArray(0..<n, int);
Fwd = 0..<n;
Expand Down Expand Up @@ -43,11 +45,19 @@ proc main() {
const locRegion = locDom.dim(0)[0..<n];
coforall chunk in chunks(locRegion, nTasksPerLocale) {
var agg = new DstAggregator(int);
for i in chunk {
const halfway = chunk.low + chunk.size / 2;
const firstchunk = chunk.low..<halfway;
const secondchunk = halfway..chunk.high;
for i in firstchunk {
const elt = A[i];
agg.copy(B[elt], elt);
}
agg.flush(freeBuffers=freeBuffers);
// check that the aggregator still works after flush
for i in secondchunk {
const elt = A[i];
agg.copy(B[elt], elt);
}
agg.flush();
}
}
}
Expand All @@ -69,10 +79,17 @@ proc main() {
const locRegion = locDom.dim(0)[0..<n];
coforall chunk in chunks(locRegion, nTasksPerLocale) {
var agg = new SrcAggregator(int);
for i in chunk {
const halfway = chunk.low + chunk.size / 2;
const firstchunk = chunk.low..<halfway;
const secondchunk = halfway..chunk.high;
for i in firstchunk {
agg.copy(B[i], A[n-1-i]);
}
agg.flush(freeBuffers=freeBuffers);
// check that the aggregator still works after flush
for i in secondchunk {
agg.copy(B[i], A[n-1-i]);
}
agg.flush();
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions test/library/packages/CopyAggregation/test-copy-agg.execopts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
--freeBuffers=true
--freeBuffers=false
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
4

0 comments on commit d615b0e

Please sign in to comment.