Skip to content

Commit 3fa6a64

Browse files
authored
Merge pull request #1054 from knutwalker/wcc-logging
Make sure to log progress if it is incremented by values other than 1
2 parents f95217f + 8621183 commit 3fa6a64

File tree

2 files changed

+109
-7
lines changed

2 files changed

+109
-7
lines changed

core/src/main/java/org/neo4j/graphalgo/core/utils/BatchingProgressLogger.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.commons.lang3.mutable.MutableLong;
2323
import org.neo4j.logging.Log;
2424

25-
import java.util.concurrent.atomic.AtomicLong;
2625
import java.util.concurrent.atomic.LongAdder;
2726
import java.util.function.Supplier;
2827

@@ -65,16 +64,16 @@ public BatchingProgressLogger(Log log, long taskVolume, long batchSize, String t
6564
@Override
6665
public void logProgress(Supplier<String> msgFactory) {
6766
progressCounter.increment();
68-
long localProgress = callCounter.get().incrementAndGet();
69-
70-
if ((localProgress & (batchSize - 1)) == 0) {
67+
var localProgress = callCounter.get();
68+
if (localProgress.longValue() < batchSize && (localProgress.incrementAndGet() >= batchSize)) {
7169
String message = msgFactory != ProgressLogger.NO_MESSAGE ? msgFactory.get() : null;
7270
int percent = (int) ((progressCounter.sum() / (double) taskVolume) * 100);
7371
if (message == null || message.isEmpty()) {
7472
log.info("[%s] %s %d%%", Thread.currentThread().getName(), task, percent);
7573
} else {
7674
log.info("[%s] %s %d%% %s", Thread.currentThread().getName(), task, percent, message);
7775
}
76+
localProgress.setValue(0L);
7877
}
7978
}
8079

@@ -84,11 +83,11 @@ public void logProgress(long progress, Supplier<String> msgFactory) {
8483
return;
8584
}
8685
progressCounter.add(progress);
87-
long localProgress = callCounter.get().incrementAndGet();
88-
89-
if ((localProgress & (batchSize -1)) == 0) {
86+
var localProgress = callCounter.get();
87+
if (localProgress.longValue() < batchSize && (localProgress.addAndGet(progress) >= batchSize)) {
9088
int percent = (int) ((progressCounter.sum() / (double) taskVolume) * 100);
9189
log.info("[%s] %s %d%%", Thread.currentThread().getName(), task, percent);
90+
localProgress.setValue(localProgress.longValue() & (batchSize - 1));
9291
}
9392
}
9493

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright (c) 2017-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.graphalgo.core.utils;
21+
22+
import org.eclipse.collections.impl.utility.ListIterate;
23+
import org.junit.jupiter.api.Test;
24+
import org.neo4j.graphalgo.TestLog;
25+
26+
import java.util.List;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.IntStream;
29+
30+
import static org.junit.jupiter.api.Assertions.assertEquals;
31+
import static org.neo4j.graphalgo.utils.StringFormatting.formatWithLocale;
32+
33+
class BatchingProgressLoggerTest {
34+
35+
@Test
36+
void mustLogProgressOnlyAfterBatchSizeInvocations() {
37+
var log = new TestLog();
38+
var taskVolume = 42;
39+
var batchSize = 8;
40+
var logger = new BatchingProgressLogger(
41+
log,
42+
taskVolume,
43+
batchSize,
44+
"foo",
45+
/* concurrency */0
46+
);
47+
48+
for (int i = 0; i < taskVolume; i++) {
49+
int currentProgress = i;
50+
logger.logProgress(() -> String.valueOf(currentProgress));
51+
}
52+
53+
var threadName = Thread.currentThread().getName();
54+
var messageTemplate = "[%s] foo %d%% %d";
55+
var expectedMessages = List.of(
56+
formatWithLocale(messageTemplate, threadName, 1 * batchSize * 100 / taskVolume, 1 * batchSize - 1),
57+
formatWithLocale(messageTemplate, threadName, 2 * batchSize * 100 / taskVolume, 2 * batchSize - 1),
58+
formatWithLocale(messageTemplate, threadName, 3 * batchSize * 100 / taskVolume, 3 * batchSize - 1),
59+
formatWithLocale(messageTemplate, threadName, 4 * batchSize * 100 / taskVolume, 4 * batchSize - 1),
60+
formatWithLocale(messageTemplate, threadName, 5 * batchSize * 100 / taskVolume, 5 * batchSize - 1)
61+
);
62+
63+
var messages = log.getMessages("info");
64+
assertEquals(expectedMessages, messages);
65+
}
66+
67+
@Test
68+
void mustLogProgressOnlyAfterHittingOrExceedingBatchSize() {
69+
var log = new TestLog();
70+
var taskVolume = 1337;
71+
var progressStep = 5;
72+
var batchSize = 16;
73+
var logger = new BatchingProgressLogger(
74+
log,
75+
taskVolume,
76+
batchSize,
77+
"foo",
78+
/* concurrency */1
79+
);
80+
81+
for (int i = 0; i < taskVolume; i += progressStep) {
82+
int currentProgress = i;
83+
logger.logProgress(progressStep, () -> String.valueOf(currentProgress));
84+
}
85+
86+
var threadName = Thread.currentThread().getName();
87+
var messageTemplate = "[%s] foo %d%%";
88+
89+
var progressSteps = IntStream
90+
.iterate(0, i -> i < taskVolume, i -> i + progressStep)
91+
.boxed()
92+
.collect(Collectors.toList());
93+
var loggedProgressSteps = ListIterate.distinctBy(progressSteps, i -> i / batchSize);
94+
95+
var expectedMessages = loggedProgressSteps.stream()
96+
.skip(1)
97+
.map(i -> formatWithLocale(messageTemplate, threadName, i * 100 / taskVolume))
98+
.collect(Collectors.toList());
99+
100+
var messages = log.getMessages("info");
101+
assertEquals(expectedMessages, messages);
102+
}
103+
}

0 commit comments

Comments
 (0)