diff --git a/bundles/org.openhab.core.automation/src/main/java/org/openhab/core/automation/internal/TriggerHandlerCallbackImpl.java b/bundles/org.openhab.core.automation/src/main/java/org/openhab/core/automation/internal/TriggerHandlerCallbackImpl.java index cbd4eb18531..08733bec081 100644 --- a/bundles/org.openhab.core.automation/src/main/java/org/openhab/core/automation/internal/TriggerHandlerCallbackImpl.java +++ b/bundles/org.openhab.core.automation/src/main/java/org/openhab/core/automation/internal/TriggerHandlerCallbackImpl.java @@ -13,7 +13,6 @@ package org.openhab.core.automation.internal; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -23,7 +22,7 @@ import org.openhab.core.automation.RuleStatusInfo; import org.openhab.core.automation.Trigger; import org.openhab.core.automation.handler.TriggerHandlerCallback; -import org.openhab.core.common.NamedThreadFactory; +import org.openhab.core.common.ThreadPoolManager; /** * This class is implementation of {@link TriggerHandlerCallback} used by the {@link Trigger}s to notify rule engine @@ -48,7 +47,7 @@ public class TriggerHandlerCallbackImpl implements TriggerHandlerCallback { protected TriggerHandlerCallbackImpl(RuleEngineImpl re, String ruleUID) { this.re = re; this.ruleUID = ruleUID; - executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("rule-" + ruleUID)); + this.executor = ThreadPoolManager.newSequentialScheduledExecutorService("rules-pool", "rule-" + ruleUID); } @Override diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/SequentialScheduledExecutorService.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/SequentialScheduledExecutorService.java new file mode 100644 index 00000000000..bf3b656af90 --- /dev/null +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/SequentialScheduledExecutorService.java @@ -0,0 +1,526 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.common; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; + +/** + * A ScheduledExecutorService that will sequentially perform the tasks like a + * {@link Executors#newSingleThreadScheduledExecutor} backed by a thread pool. + * This is a drop in replacement to a ScheduledExecutorService with one thread to avoid a lot of threads created, idling + * most of the time and wasting memory on low-end devices. + * + * The mechanism to block the ScheduledExecutorService to run tasks concurrently is based on a chain of + * {@link CompletableFuture}s. + * Each instance has a reference to the last CompletableFuture and will call handleAsync to add a new task. + * + * @author Jörg Sautter - Initial contribution + */ +class SequentialScheduledExecutorService implements ScheduledExecutorService { + + private final WorkQueueEntry empty; + private final ScheduledThreadPoolExecutor pool; + private final List> scheduled; + private WorkQueueEntry tail; + + public SequentialScheduledExecutorService(ScheduledThreadPoolExecutor pool) { + if (pool.getMaximumPoolSize() != Integer.MAX_VALUE) { + throw new IllegalArgumentException("the pool must scale unlimited to avoid potential dead locks!"); + } + + this.pool = pool; + + // prepare the WorkQueueEntry we are using when no tasks are pending + RunnableCompletableFuture future = new RunnableCompletableFuture(); + future.complete(null); + empty = new WorkQueueEntry(null, null, future); + + // tracks scheduled tasks alive + this.scheduled = new ArrayList<>(); + + tail = empty; + // clean up to ensure we do keep references to old tasks + this.scheduleAtFixedRate(() -> { + synchronized (this) { + scheduled.removeIf((sf) -> sf.isCancelled()); + + WorkQueueEntry entry = tail; + + while (entry.prev != null) { + if (entry.prev.future.isDone()) { + entry.prev = null; + break; + } + entry = entry.prev; + } + + if (tail != empty && tail.future.isDone()) { + // replace the tail with empty to ensure we do not prevent GC + tail = empty; + } + } + }, 2, 4, TimeUnit.SECONDS); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + synchronized (this) { + if (tail == null) { + throw new RejectedExecutionException("this scheduled executor has been shutdown before"); + } + + CompletableFuture> origin = new CompletableFuture<>(); + ScheduledFuture future = pool.schedule(() -> { + // we block the thread here, in worst case new threads are spawned + submit0(origin.join(), command).join(); + }, delay, unit); + + scheduled.add((RunnableFuture) future); + origin.complete((RunnableFuture) future); + + return future; + } + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + synchronized (this) { + if (tail == null) { + throw new RejectedExecutionException("this scheduled executor has been shutdown before"); + } + + CompletableFuture> origin = new CompletableFuture<>(); + ScheduledFuture future = pool.schedule(() -> { + // we block the thread here, in worst case new threads are spawned + return submit0(origin.join(), callable).join(); + }, delay, unit); + + scheduled.add((RunnableFuture) future); + origin.complete((RunnableFuture) future); + + return future; + } + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + synchronized (this) { + if (tail == null) { + throw new RejectedExecutionException("this scheduled executor has been shutdown before"); + } + + CompletableFuture> origin = new CompletableFuture<>(); + ScheduledFuture future = pool.scheduleAtFixedRate(() -> { + CompletableFuture submitted; + + try { + // we block the thread here, in worst case new threads are spawned + submitted = submit0(origin.join(), command); + } catch (RejectedExecutionException ex) { + // the pool has been shutdown, scheduled tasks should cancel + return; + } + + submitted.join(); + }, initialDelay, period, unit); + + scheduled.add((RunnableFuture) future); + origin.complete((RunnableFuture) future); + + return future; + } + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + synchronized (this) { + if (tail == null) { + throw new RejectedExecutionException("this scheduled executor has been shutdown before"); + } + + CompletableFuture> origin = new CompletableFuture<>(); + ScheduledFuture future = pool.scheduleWithFixedDelay(() -> { + CompletableFuture submitted; + + try { + // we block the thread here, in worst case new threads are spawned + submitted = submit0(origin.join(), command); + } catch (RejectedExecutionException ex) { + // the pool has been shutdown, scheduled tasks should cancel + return; + } + + submitted.join(); + }, initialDelay, delay, unit); + + scheduled.add((RunnableFuture) future); + origin.complete((RunnableFuture) future); + + return future; + } + } + + @Override + public void shutdown() { + synchronized (this) { + scheduled.removeIf((sf) -> { + sf.cancel(false); + return true; + }); + tail = null; + } + } + + @Override + public List shutdownNow() { + synchronized (this) { + if (tail == null) { + return List.of(); + } + + Set runnables = Collections.newSetFromMap(new IdentityHashMap<>()); + WorkQueueEntry entry = tail; + scheduled.removeIf((sf) -> { + if (sf.cancel(false)) { + runnables.add(sf); + } + return true; + }); + tail = null; + + while (entry != null) { + if (!entry.future.cancel(false)) { + break; + } + + if (entry.origin != null) { + // entry has been submitted by a .schedule call + runnables.add(entry.origin); + } else { + // entry has been submitted by a .submit call + runnables.add(entry.future); + } + entry = entry.prev; + } + + return List.copyOf(runnables); + } + } + + @Override + public boolean isShutdown() { + synchronized (this) { + return pool == null; + } + } + + @Override + public boolean isTerminated() { + synchronized (this) { + return pool == null && tail.future.isDone(); + } + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + long timeoutAt = System.currentTimeMillis() + unit.toMillis(timeout); + + while (!isTerminated()) { + if (System.currentTimeMillis() > timeoutAt) { + return false; + } + + Thread.onSpinWait(); + } + + return true; + } + + @Override + public Future submit(Callable task) { + return submit0(null, task); + } + + private CompletableFuture submit0(RunnableFuture origin, Runnable task) { + Callable callable = () -> { + task.run(); + + return null; + }; + + return submit0(origin, callable); + } + + private CompletableFuture submit0(RunnableFuture origin, Callable task) { + BiFunction action = (result, error) -> { + // ignore result & error, they are from the previous task + try { + return task.call(); + } catch (RuntimeException ex) { + throw ex; + } catch (Exception ex) { + // a small hack to throw the Exception unchecked + throw SequentialScheduledExecutorService.unchecked(ex); + } + }; + + synchronized (this) { + if (tail == null) { + throw new RejectedExecutionException("this scheduled executor has been shutdown before"); + } + + RunnableCompletableFuture cf = tail.future.handleAsync(action, pool); + + cf.setCallable(task); + + tail = new WorkQueueEntry(tail, origin, cf); + + return cf; + } + } + + private static E unchecked(Exception ex) throws E { + throw (E) ex; + } + + @Override + public Future submit(Runnable task, T result) { + return submit0(null, () -> { + task.run(); + + return result; + }); + } + + @Override + public Future submit(Runnable task) { + return submit(task, (Void) null); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + List> futures = new ArrayList<>(); + + for (Callable task : tasks) { + futures.add(submit(task)); + } + + // wait for all futures to complete + for (Future future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + // ignore, we are just waiting here for the futures to complete + } + } + + return futures; + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + List> futures = new ArrayList<>(); + + for (Callable task : tasks) { + futures.add(submit0(null, task).orTimeout(timeout, unit)); + } + + // wait for all futures to complete + for (Future future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + // ignore, we are just waiting here for the futures to complete + } + } + + return futures; + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + List> futures = new ArrayList<>(); + + for (Callable task : tasks) { + futures.add(submit0(null, task)); + } + + // wait for any future to complete + while (true) { + boolean allDone = true; + + for (CompletableFuture future : futures) { + if (future.isDone()) { + if (!future.isCompletedExceptionally()) { + // stop the others + for (CompletableFuture toLate : futures) { + if (toLate != future) { + toLate.cancel(true); + } + } + + return future.join(); + } + } else { + allDone = false; + } + } + + if (allDone) { + ExecutionException exe = new ExecutionException("all tasks failed", null); + + for (CompletableFuture future : futures) { + try { + future.get(); + throw new AssertionError("all tasks should be failed"); + } catch (ExecutionException ex) { + exe.addSuppressed(ex); + } + } + + throw exe; + } + + Thread.onSpinWait(); + } + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + long timeoutAt = System.currentTimeMillis() + unit.toMillis(timeout); + List> futures = new ArrayList<>(); + + for (Callable task : tasks) { + futures.add(submit0(null, task)); + } + + // wait for any future to complete + while (timeoutAt >= System.currentTimeMillis()) { + boolean allDone = true; + + for (CompletableFuture future : futures) { + if (future.isDone()) { + if (!future.isCompletedExceptionally()) { + // stop the others + for (CompletableFuture toLate : futures) { + if (toLate != future) { + toLate.cancel(true); + } + } + + return future.join(); + } + } else { + allDone = false; + } + } + + if (allDone) { + ExecutionException exe = new ExecutionException("all tasks failed", null); + + for (CompletableFuture future : futures) { + try { + future.get(); + throw new AssertionError("all tasks should be failed"); + } catch (ExecutionException ex) { + exe.addSuppressed(ex); + } + } + + throw exe; + } + + Thread.onSpinWait(); + } + + for (CompletableFuture toLate : futures) { + toLate.cancel(true); + } + + throw new TimeoutException("none of the tasks did complete in time"); + } + + @Override + public void execute(Runnable command) { + submit(command); + } + + static class WorkQueueEntry { + private WorkQueueEntry prev; + private RunnableFuture origin; + private final RunnableCompletableFuture future; + + public WorkQueueEntry(WorkQueueEntry prev, RunnableFuture origin, RunnableCompletableFuture future) { + this.prev = prev; + this.origin = origin; + this.future = future; + } + } + + static class RunnableCompletableFuture extends CompletableFuture implements RunnableFuture { + private Callable callable; + + public RunnableCompletableFuture() { + } + + public void setCallable(Callable callable) { + this.callable = callable; + } + + @Override + public RunnableCompletableFuture newIncompleteFuture() { + return new RunnableCompletableFuture<>(); + } + + @Override + public RunnableCompletableFuture handleAsync(BiFunction fn, + Executor executor) { + return (RunnableCompletableFuture) super.handleAsync(fn, executor); + } + + @Override + public void run() { + if (this.isDone()) { + // a FutureTask does also return here without exception + return; + } + + try { + this.complete(callable.call()); + } catch (Throwable t) { + this.completeExceptionally(t); + } + } + } +} diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java index 69c51d2989d..98732888272 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/common/ThreadPoolManager.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -115,6 +116,26 @@ protected void modified(Map properties) { } } + /** + * Returns an instance of a scheduled service, which will sequentially execute submitted tasks. If a task is + * currently running the task is queued until the previous one is completed, this also applies for scheduled tasks. + * The service might execute submitted task might in different threads, but still one after the other. + * If it is the first request for the given pool name and a pool is used, the instance is newly created. + * + * @param poolName a short name used to identify the pool, if a thread pool is used e.g. "discovery" + * @param threadName a short name used to identify the thread if no thread pool is used, e.g. "bluetooth" + * @return an instance to use + */ + public static ScheduledExecutorService newSequentialScheduledExecutorService(String poolName, String threadName) { + if (Boolean.getBoolean("org.openhab.core.common.SequentialScheduledExecutorService")) { + ScheduledThreadPoolExecutor pool = getScheduledPoolUnwrapped(poolName); + + return new SequentialScheduledExecutorService((ScheduledThreadPoolExecutor) pool); + } + + return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(threadName)); + } + /** * Returns an instance of a scheduled thread pool service. If it is the first request for the given pool name, the * instance is newly created. diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java index 3560b1fb086..d95633e27a8 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/events/EventHandler.java @@ -29,6 +29,7 @@ import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.common.NamedThreadFactory; +import org.openhab.core.common.ThreadPoolManager; import org.openhab.core.events.Event; import org.openhab.core.events.EventFactory; import org.openhab.core.events.EventFilter; @@ -69,9 +70,8 @@ public EventHandler(final Map> typedEventSubscriber } private synchronized ExecutorRecord createExecutorRecord(Class subscriber) { - return new ExecutorRecord( - Executors.newSingleThreadExecutor(new NamedThreadFactory("eventexecutor-" + executors.size())), - new AtomicInteger()); + return new ExecutorRecord(ThreadPoolManager.newSequentialScheduledExecutorService("events-pool", + "eventexecutor-" + executors.size()), new AtomicInteger()); } @Override diff --git a/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java b/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java index 9b993a32edc..921d6e6c0d2 100644 --- a/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java +++ b/bundles/org.openhab.core/src/test/java/org/openhab/core/common/ThreadPoolManagerTest.java @@ -17,11 +17,16 @@ import static org.junit.jupiter.api.Assertions.*; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RunnableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jdt.annotation.NonNullByDefault; import org.junit.jupiter.api.Test; @@ -34,6 +39,129 @@ */ @NonNullByDefault public class ThreadPoolManagerTest { + @Test + public void testSequentialExecutorServiceAssumptions() { + Callable callable = () -> true; + Runnable runnable = () -> { + }; + + ScheduledExecutorService service = Executors.newScheduledThreadPool(1); + + assertTrue(service.submit(runnable) instanceof RunnableFuture); + assertTrue(service.submit(callable) instanceof RunnableFuture); + + assertTrue(service.schedule(runnable, 1, TimeUnit.SECONDS) instanceof RunnableFuture); + assertTrue(service.schedule(callable, 1, TimeUnit.SECONDS) instanceof RunnableFuture); + + var fixedRate = service.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS); + + try { + assertTrue(fixedRate instanceof RunnableFuture); + } finally { + fixedRate.cancel(false); + } + + var fixedDelay = service.scheduleWithFixedDelay(runnable, 1, 1, TimeUnit.SECONDS); + + try { + assertTrue(fixedDelay instanceof RunnableFuture); + } finally { + fixedDelay.cancel(false); + } + } + + @Test + public void testExecutionIsSequentialInSequentialExecutorService() { + ScheduledExecutorService service = ThreadPoolManager.newSequentialScheduledExecutorService("unit-test", + "thread-1"); + + AtomicBoolean done = new AtomicBoolean(false); + + service.submit(() -> { + Thread.sleep(1000); + + done.set(true); + + return null; + }); + + assertTrue(((CompletableFuture) service.submit(() -> done.get())).join()); + } + + @Test + public void testCancelDoesNotStopProcessingInSequentialExecutorService() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + ScheduledExecutorService service = ThreadPoolManager.newSequentialScheduledExecutorService("unit-test", + "thread-2"); + + service.submit(() -> { + Thread.sleep(1000); + + return null; + }); + + service.submit(() -> null).cancel(false); + + service.submit(() -> { + latch.countDown(); + }); + + assertTrue(latch.await(2, TimeUnit.SECONDS)); + } + + @Test + public void testInstancesDoNotBlockEachOtherInSequentialExecutorService() throws InterruptedException { + ScheduledExecutorService serviceA = ThreadPoolManager.newSequentialScheduledExecutorService("unit-test", + "thread-3"); + ScheduledExecutorService serviceB = ThreadPoolManager.newSequentialScheduledExecutorService("unit-test", + "thread-4"); + + serviceA.submit(() -> { + Thread.sleep(1000); + + return null; + }); + + CountDownLatch latch = new CountDownLatch(1); + + serviceB.submit(() -> { + latch.countDown(); + }); + + assertTrue(latch.await(800, TimeUnit.MILLISECONDS)); + } + + @Test + public void testSchedulingWorksInSequentialExecutorService() throws InterruptedException { + ScheduledExecutorService service = ThreadPoolManager.newSequentialScheduledExecutorService("unit-test", + "thread-5"); + CountDownLatch latch = new CountDownLatch(1); + + service.schedule(() -> { + latch.countDown(); + }, 400, TimeUnit.MILLISECONDS); + + assertTrue(latch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testSchedulingGetsBlockedByRegularTaskInSequentialExecutorService() throws InterruptedException { + ScheduledExecutorService service = ThreadPoolManager.newSequentialScheduledExecutorService("unit-test", + "thread-6"); + CountDownLatch latch = new CountDownLatch(1); + + service.submit(() -> { + Thread.sleep(2000); + + return null; + }); + + service.schedule(() -> { + latch.countDown(); + }, 20, TimeUnit.MILLISECONDS); + + assertFalse(latch.await(1, TimeUnit.SECONDS)); + } @Test public void testGetScheduledPool() { diff --git a/tools/static-code-analysis/pmd/suppressions.properties b/tools/static-code-analysis/pmd/suppressions.properties index 6ee882d6add..cd17bd9129c 100644 --- a/tools/static-code-analysis/pmd/suppressions.properties +++ b/tools/static-code-analysis/pmd/suppressions.properties @@ -15,3 +15,4 @@ org.openhab.core.automation.internal.RuleEngineImpl=AvoidCatchingThrowable org.openhab.core.automation.internal.RuleRegistryImpl=CompareObjectsWithEquals org.openhab.core.automation.internal.provider.AutomationResourceBundlesEventQueue=AvoidCatchingThrowable org.openhab.core.io.console.karaf.internal.InstallServiceCommand=SystemPrintln +org.openhab.core.common.SequentialScheduledExecutorService=CompareObjectsWithEquals