Skip to content

fixing busy waiting in abstraction and eliminate busy-waiting #3302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@
* THE SOFTWARE.
*/
package com.iluwatar.logaggregation;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;

/**
* Responsible for collecting and buffering logs from different services. Once the logs reach a
* certain threshold or after a certain time interval, they are flushed to the central log store.
Expand All @@ -41,30 +43,49 @@
public class LogAggregator {

private static final int BUFFER_THRESHOLD = 3;
private static final int FLUSH_INTERVAL_SECONDS = 5;
private static final int SHUTDOWN_TIMEOUT_SECONDS = 10;

private final CentralLogStore centralLogStore;
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final BlockingQueue<LogEntry> buffer = new LinkedBlockingQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
private final AtomicInteger logCount = new AtomicInteger(0);

private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private volatile boolean running = true;
/**
* constructor of LogAggregator.
*
* @param centralLogStore central log store implement
* @param minLogLevel min log level to store log
*/
public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
this.centralLogStore = centralLogStore;
this.centralLogStore = centralLogStore;
this.minLogLevel = minLogLevel;
startBufferFlusher();
startPeriodicFlusher();

// Add shutdown hook for graceful termination
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
stop();
} catch (InterruptedException e) {
LOGGER.warn("Shutdown interrupted", e);
Thread.currentThread().interrupt();
}
}));
}

/**
* Collects a given log entry, and filters it by the defined log level.
*
* @param logEntry The log entry to collect.
*/
public void collectLog(LogEntry logEntry) {
public void collectLog(LogEntry logEntry) {
if (!running) {
LOGGER.warn("LogAggregator is shutting down. Skipping log entry.");
return;
}

if (logEntry.getLevel() == null || minLogLevel == null) {
LOGGER.warn("Log level or threshold level is null. Skipping.");
return;
Expand All @@ -75,10 +96,17 @@ public void collectLog(LogEntry logEntry) {
return;
}

buffer.offer(logEntry);
// BlockingQueue.offer() is non-blocking and thread-safe
boolean added = buffer.offer(logEntry);
if (!added) {
LOGGER.warn("Failed to add log entry to buffer - queue may be full");
return;
}

// Check if immediate flush is needed due to threshold
if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
flushBuffer();
// Schedule immediate flush instead of blocking current thread
scheduledExecutor.execute(this::flushBuffer);
}
}

Expand All @@ -87,33 +115,126 @@ public void collectLog(LogEntry logEntry) {
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void stop() throws InterruptedException {
executorService.shutdownNow();
if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.error("Log aggregator did not terminate.");
public void stop() throws InterruptedException {
LOGGER.info("Stopping LogAggregator...");
running = false;

// Shutdown the scheduler gracefully
scheduledExecutor.shutdown();

try {
// Wait for scheduled tasks to complete
if (!scheduledExecutor.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
LOGGER.warn("Scheduler did not terminate gracefully, forcing shutdown");
scheduledExecutor.shutdownNow();

// Wait a bit more for tasks to respond to interruption
if (!scheduledExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
LOGGER.error("Scheduler did not terminate after forced shutdown");
}
}
} finally {
// Final flush of any remaining logs
flushBuffer();
shutdownLatch.countDown();
LOGGER.info("LogAggregator stopped successfully");
}
flushBuffer();
}



/**
* Waits for the LogAggregator to complete shutdown.
* Useful for testing or controlled shutdown scenarios.
*
* @throws InterruptedException If any thread has interrupted the current thread.
*/
public void awaitShutdown() throws InterruptedException {
shutdownLatch.await();
}


private void flushBuffer() {
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null) {
centralLogStore.storeLog(logEntry);
logCount.decrementAndGet();
if (!running && buffer.isEmpty()) {
return;
}

try {
List<LogEntry> batch = new ArrayList<>();
int drained = 0;

// Drain up to a reasonable batch size for efficiency
LogEntry logEntry;
while ((logEntry = buffer.poll()) != null && drained < 100) {
batch.add(logEntry);
drained++;
}

if (!batch.isEmpty()) {
LOGGER.debug("Flushing {} log entries to central store", batch.size());

// Process the batch
for (LogEntry entry : batch) {
centralLogStore.storeLog(entry);
logCount.decrementAndGet();
}

LOGGER.debug("Successfully flushed {} log entries", batch.size());
}
} catch (Exception e) {
LOGGER.error("Error occurred while flushing buffer", e);
}
}

private void startBufferFlusher() {
executorService.execute(

/**
* Starts the periodic buffer flusher using ScheduledExecutorService.
* This eliminates the busy-waiting loop with Thread.sleep().
*/
private void startPeriodicFlusher() {
scheduledExecutor.scheduleAtFixedRate(
() -> {
while (!Thread.currentThread().isInterrupted()) {
if (running) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
LOGGER.error("Error in periodic flush", e);
}
}
});
},
FLUSH_INTERVAL_SECONDS, // Initial delay
FLUSH_INTERVAL_SECONDS, // Period
TimeUnit.SECONDS
);

LOGGER.info("Periodic log flusher started with interval of {} seconds", FLUSH_INTERVAL_SECONDS);
}
/**
* Gets the current number of buffered log entries.
* Useful for monitoring and testing.
*
* @return Current buffer size
*/
public int getBufferSize() {
return buffer.size();
}

/**
* Gets the current log count.
* Useful for monitoring and testing.
*
* @return Current log count
*/
public int getLogCount() {
return logCount.get();
}

/**
* Checks if the LogAggregator is currently running.
*
* @return true if running, false if stopped or stopping
*/
public boolean isRunning() {
return running;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,118 @@ void whenDebugLogIsCollected_thenNoLogsShouldBeStored() {
verifyNoInteractionsWithCentralLogStore();
}

private static LogEntry createLogEntry(LogLevel logLevel, String message) {

@Test
void whenTwoLogsCollected_thenBufferShouldContainThem() {
// NEW TEST: Verify buffer state management
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Message 1"));
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Message 2"));

assertEquals(2, logAggregator.getLogCount());
assertEquals(2, logAggregator.getBufferSize());

// Should not trigger flush yet (threshold is 3)
verifyNoInteractionsWithCentralLogStore();
}

@Test
void whenScheduledFlushOccurs_thenBufferedLogsShouldBeStored() throws InterruptedException {
// NEW TEST: Verify scheduled periodic flushing
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Scheduled flush test"));

assertEquals(1, logAggregator.getLogCount());
verifyNoInteractionsWithCentralLogStore();

// Wait for scheduled flush (FLUSH_INTERVAL_SECONDS = 5)
Thread.sleep(6000); // 5 seconds + buffer

verifyCentralLogStoreInvokedTimes(1);
assertEquals(0, logAggregator.getLogCount());
}

@Test
void whenLogAggregatorStopped_thenRemainingLogsShouldBeStored() throws InterruptedException {
// NEW TEST: Verify graceful shutdown flushes remaining logs
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Final message 1"));
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Final message 2"));

assertEquals(2, logAggregator.getLogCount());
verifyNoInteractionsWithCentralLogStore();

// Stop should trigger final flush
logAggregator.stop();
logAggregator.awaitShutdown();

verifyCentralLogStoreInvokedTimes(2);
assertEquals(0, logAggregator.getLogCount());
assertFalse(logAggregator.isRunning());
}

@Test
void whenLogLevelBelowThreshold_thenLogShouldBeFiltered() {
// 🎯 ENHANCED TEST: Test all log levels below INFO
logAggregator.collectLog(createLogEntry(LogLevel.DEBUG, "Debug message"));
logAggregator.collectLog(createLogEntry(LogLevel.TRACE, "Trace message"));

assertEquals(0, logAggregator.getLogCount());
assertEquals(0, logAggregator.getBufferSize());
verifyNoInteractionsWithCentralLogStore();
}

@Test
void whenLogLevelAtOrAboveThreshold_thenLogShouldBeAccepted() {
// NEW TEST: Verify all accepted log levels
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Info message"));
logAggregator.collectLog(createLogEntry(LogLevel.WARN, "Warning message"));
logAggregator.collectLog(createLogEntry(LogLevel.ERROR, "Error message"));

assertEquals(3, logAggregator.getLogCount());
assertEquals(3, logAggregator.getBufferSize());
}

@Test
void whenNullLogLevelProvided_thenLogShouldBeSkipped() {
// EDGE CASE TEST: Null safety
LogEntry nullLevelEntry = new LogEntry("ServiceA", null, "Null level message", LocalDateTime.now());

logAggregator.collectLog(nullLevelEntry);

assertEquals(0, logAggregator.getLogCount());
verifyNoInteractionsWithCentralLogStore();
}

@Test
void whenLogAggregatorIsShutdown_thenNewLogsShouldBeRejected() throws InterruptedException {
// NEW TEST: Verify shutdown behavior
logAggregator.stop();
logAggregator.awaitShutdown();

assertFalse(logAggregator.isRunning());

// Try to add log after shutdown
logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Post-shutdown message"));

assertEquals(0, logAggregator.getLogCount());
verifyNoInteractionsWithCentralLogStore();
}

@Test
void testPerformanceMetrics() throws InterruptedException {
// CHAMPIONSHIP TEST: Verify performance monitoring
assertTrue(logAggregator.isRunning());
assertFalse(logAggregator.isSuspended());
assertEquals(4.0, logAggregator.getFrameRate(), 0.1); // 1000ms / 250ms = 4 FPS

logAggregator.collectLog(createLogEntry(LogLevel.INFO, "Performance test"));
assertEquals(1, logAggregator.getLogCount());

String report = logAggregator.getPerformanceReport();
assertNotNull(report);
assertTrue(report.contains("Event-Driven"));
assertTrue(report.contains("Zero Busy-Wait"));
}

private static LogEntry createLogEntry(LogLevel logLevel, String message) {
return new LogEntry("ServiceA", logLevel, message, LocalDateTime.now());
}

Expand Down
Loading
Loading