11/*
22 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
33 *
4- * Copyright 2025 Neil C Smith.
4+ * Copyright 2026 Neil C Smith.
55 *
66 * This code is free software; you can redistribute it and/or modify it
77 * under the terms of the GNU Lesser General Public License version 3 only, as
@@ -216,7 +216,8 @@ public static final <T> Pipe<T> with(Consumer<? super T> consumer) {
216216 public static abstract class Sink <T > implements Lookup .Provider {
217217
218218 private final Pipe <T > input ;
219- private final SinkPacket <T > packet ;
219+ private final SinkPacket <T > basePacket ;
220+ private final List <SinkPacket <T >> packets ;
220221
221222 private UnaryOperator <T > creator ;
222223 private UnaryOperator <T > clearer ;
@@ -227,7 +228,8 @@ public static abstract class Sink<T> implements Lookup.Provider {
227228 private long pass ;
228229
229230 public Sink () {
230- packet = new SinkPacket <>(this , null );
231+ basePacket = new SinkPacket <>(this , null );
232+ packets = new ArrayList <>();
231233 defaultFunctions ();
232234 input = new Pipe <T >() {
233235 @ Override
@@ -275,6 +277,23 @@ public void reset() {
275277 input .disconnectSources ();
276278 }
277279
280+ /**
281+ * Flush all cached data. Any data cached in packets in the pipeline
282+ * will be disposed, using the
283+ * {@link #onDispose(java.util.function.Consumer)} function if set.
284+ */
285+ public void flushCaches () {
286+ List <SinkPacket <T >> caches = new ArrayList <>(packets );
287+ packets .clear ();
288+ caches .forEach (p -> {
289+ try {
290+ p .dispose ();
291+ } catch (Exception ex ) {
292+ log (ex );
293+ }
294+ });
295+ }
296+
278297 /**
279298 * Get the input pipe for this sink. The input pipe only supports the
280299 * addition of sources - it cannot be used as a source.
@@ -295,18 +314,18 @@ public Pipe<T> input() {
295314 * @return data of type T (may or may not be the input data)
296315 */
297316 public T process (T data ) {
298- packet .data = Objects .requireNonNull (data );
317+ basePacket .data = Objects .requireNonNull (data );
299318 try {
300319 if (input .sources .size () == 1 ) {
301- input .processInPlace (packet , true , ++pass );
320+ input .processInPlace (basePacket , true , ++pass );
302321 } else {
303- input .processCached (packet , true , ++pass );
304- input .writeOutput (input .dataPackets , packet , 0 );
322+ input .processCached (basePacket , true , ++pass );
323+ input .writeOutput (input .dataPackets , basePacket , 0 );
305324 }
306325 } catch (Exception ex ) {
307326 log (ex );
308327 }
309- return packet .data ;
328+ return basePacket .data ;
310329 }
311330
312331 /**
@@ -472,17 +491,23 @@ public boolean isCompatible(Packet<T> packet) {
472491
473492 @ Override
474493 public Packet <T > createPacket () {
475- return new SinkPacket <>(this .sink , sink .creator .apply (data ));
494+ SinkPacket <T > packet = new SinkPacket <>(this .sink , sink .creator .apply (data ));
495+ sink .packets .add (packet );
496+ return packet ;
476497 }
477498
478499 @ Override
479500 public void dispose () {
480- try {
481- Objects .requireNonNull (data );
482- sink .disposer .accept (data );
483- data = null ;
484- } catch (Exception ex ) {
485- sink .log (ex );
501+ sink .packets .remove (this );
502+ if (data != null ) {
503+ try {
504+ Objects .requireNonNull (data );
505+ sink .disposer .accept (data );
506+ } catch (Exception ex ) {
507+ sink .log (ex );
508+ } finally {
509+ data = null ;
510+ }
486511 }
487512 }
488513
0 commit comments