From bb1b85cf3339fb91b4e41104053957cc9dc0e515 Mon Sep 17 00:00:00 2001 From: Tom Tzook Date: Sun, 13 Jun 2021 18:37:29 +0300 Subject: [PATCH 1/5] added multithreaded scheduler --- .../scheduling/actions/ActionContext.java | 13 ++ .../scheduling/mt/MtActionsControl.java | 150 ++++++++++++++++ .../scheduling/mt/MtDaemonThreadWorkers.java | 49 ++++++ .../mt/MtExecutorServiceWorkers.java | 39 +++++ .../scheduling/mt/MtRequirementsControl.java | 83 +++++++++ .../scheduling/mt/MtSchedulerWorker.java | 55 ++++++ .../flashlib/scheduling/mt/MtWorkers.java | 9 + .../scheduling/mt/MultiThreadedScheduler.java | 163 ++++++++++++++++++ 8 files changed, 561 insertions(+) create mode 100644 flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java create mode 100644 flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtDaemonThreadWorkers.java create mode 100644 flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtExecutorServiceWorkers.java create mode 100644 flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtRequirementsControl.java create mode 100644 flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtSchedulerWorker.java create mode 100644 flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtWorkers.java create mode 100644 flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MultiThreadedScheduler.java diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/actions/ActionContext.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/actions/ActionContext.java index 72156c156..d6116fd03 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/actions/ActionContext.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/actions/ActionContext.java @@ -25,6 +25,10 @@ public ActionContext(Action action, Clock clock) { mIsInitialized = false; } + public boolean isCanceled() { + return mActionState.isCanceled(); + } + public void prepareForRun() { mActionState.markStarted(); @@ -92,4 +96,13 @@ boolean wasTimeoutReached(){ return getRunTime().after(mTimeout); } + + public Action getAction() { + return mAction; + } + + @Override + public String toString() { + return mAction.toString(); + } } diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java new file mode 100644 index 000000000..56d3297d7 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java @@ -0,0 +1,150 @@ +package com.flash3388.flashlib.scheduling.mt; + +import com.flash3388.flashlib.scheduling.actions.Action; +import com.flash3388.flashlib.scheduling.actions.ActionContext; +import com.flash3388.flashlib.time.Clock; +import com.flash3388.flashlib.time.Time; +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Predicate; + +public class MtActionsControl { + + private final MtRequirementsControl mRequirementsControl; + private final Clock mClock; + private final Logger mLogger; + + private final Map mRunningActions; + private final Queue mRunningActionsContexts; + private final Queue mFinishedActionsContexts; + + MtActionsControl(MtRequirementsControl requirementsControl, Clock clock, Logger logger, + Map runningActions, + Queue runningActionsContexts, + Queue finishedActionsContexts) { + mRequirementsControl = requirementsControl; + mClock = clock; + mLogger = logger; + + mRunningActions = runningActions; + mRunningActionsContexts = runningActionsContexts; + mFinishedActionsContexts = finishedActionsContexts; + } + + public MtActionsControl(MtRequirementsControl requirementsControl, Clock clock, Logger logger) { + this(requirementsControl, clock, logger, + new HashMap<>(5), + new LinkedBlockingQueue<>(5), + new LinkedBlockingQueue<>(5)); + } + + public void startActions(Collection actions) { + for (Action action : actions) { + onActionStart(action); + } + } + + public void cancelAction(Action action) { + ActionContext context = mRunningActions.get(action); + if (context != null) { + context.markCanceled(); + mRunningActionsContexts.remove(context); + mFinishedActionsContexts.add(context); + } else { + throw new IllegalStateException("action not running"); + } + } + + public void cancelActionsIf(Predicate predicate) { + for (Action action : mRunningActions.keySet()) { + if (predicate.test(action)) { + cancelAction(action); + } + } + } + + public void cancelAllActions() { + for (Action action : mRunningActions.keySet()) { + cancelAction(action); + } + } + + public boolean isActionRunning(Action action) { + return mRunningActions.containsKey(action); + } + + public Time getActionRunTime(Action action) { + ActionContext context = mRunningActions.get(action); + if (context != null) { + return context.getRunTime(); + } else { + throw new IllegalStateException("action not running"); + } + } + + public ActionContext pollRunningAction() { + return mRunningActionsContexts.poll(); + } + + public void pushRunningAction(ActionContext context) { + mRunningActionsContexts.add(context); + } + + public void pushFinishedAction(ActionContext context) { + mFinishedActionsContexts.add(context); + } + + public void processFinishedActions() { + while (!mFinishedActionsContexts.isEmpty()) { + ActionContext context = mFinishedActionsContexts.poll(); + if (context == null) { + break; + } + + if (mRunningActions.remove(context.getAction()) != null) { + onActionFinished(context, true); + } + } + } + + private void onActionStart(Action action) { + if (mRunningActions.containsKey(action)) { + mLogger.debug("Attempted to start action {} when already running", action); + return; + } + + Set conflictingActions = mRequirementsControl.updateRequirementsWithNewRunningAction(action); + conflictingActions.forEach((conflictingAction)-> { + ActionContext context = mRunningActions.remove(conflictingAction); + if (context != null) { + context.markCanceled(); + onActionFinished(context, false); + } + }); + + ActionContext context = new ActionContext(action, mClock); + context.prepareForRun(); + + mRunningActions.put(action, context); + mRunningActionsContexts.add(context); + + mLogger.debug("Started action {}", action); + } + + private void onActionFinished(ActionContext context, boolean updateRequirements) { + Action action = context.getAction(); + + context.runFinished(); + if (updateRequirements) { + mRequirementsControl.updateRequirementsNoCurrentAction(action); + } + + mLogger.debug("Finished action {}", action); + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtDaemonThreadWorkers.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtDaemonThreadWorkers.java new file mode 100644 index 000000000..cf6165f01 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtDaemonThreadWorkers.java @@ -0,0 +1,49 @@ +package com.flash3388.flashlib.scheduling.mt; + +import java.util.HashSet; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; + +public class MtDaemonThreadWorkers implements MtWorkers { + + private final Function mWorkerFactory; + private final int mWorkerCount; + private final Set mThreads; + + public MtDaemonThreadWorkers(Function workerFactory, int workerCount) { + mWorkerFactory = workerFactory; + mWorkerCount = workerCount; + mThreads = new HashSet<>(); + } + + public MtDaemonThreadWorkers(ThreadGroup group, String threadName, int workerCount) { + this((task)-> new Thread(group, task, threadName), workerCount); + } + + public MtDaemonThreadWorkers(int workerCount) { + this(new ThreadGroup("mt-scheduler"), "mt-scheduler-worker", workerCount); + } + + @Override + public void runWorkers(Supplier taskSupplier) { + for (int i = 0; i < mWorkerCount; i++) { + Runnable task = taskSupplier.get(); + + Thread thread = mWorkerFactory.apply(task); + thread.setDaemon(true); + mThreads.add(thread); + + thread.start(); + } + } + + @Override + public void stopWorkers() { + for (Thread thread : mThreads) { + thread.interrupt(); + } + + mThreads.clear(); + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtExecutorServiceWorkers.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtExecutorServiceWorkers.java new file mode 100644 index 000000000..acfb6557e --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtExecutorServiceWorkers.java @@ -0,0 +1,39 @@ +package com.flash3388.flashlib.scheduling.mt; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Supplier; + +public class MtExecutorServiceWorkers implements MtWorkers { + + private final ExecutorService mExecutorService; + private final int mWorkerCount; + private final Set> mFutures; + + public MtExecutorServiceWorkers(ExecutorService executorService, int workerCount) { + mExecutorService = executorService; + mWorkerCount = workerCount; + + mFutures = new HashSet<>(); + } + + @Override + public void runWorkers(Supplier taskSupplier) { + for (int i = 0; i < mWorkerCount; i++) { + Runnable runnable = taskSupplier.get(); + Future future = mExecutorService.submit(runnable); + mFutures.add(future); + } + } + + @Override + public void stopWorkers() { + for (Future future : mFutures) { + future.cancel(true); + } + + mFutures.clear(); + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtRequirementsControl.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtRequirementsControl.java new file mode 100644 index 000000000..9b8b878c9 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtRequirementsControl.java @@ -0,0 +1,83 @@ +package com.flash3388.flashlib.scheduling.mt; + +import com.flash3388.flashlib.scheduling.Requirement; +import com.flash3388.flashlib.scheduling.Subsystem; +import com.flash3388.flashlib.scheduling.actions.Action; +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class MtRequirementsControl { + + private final Logger mLogger; + + private final Map mActionsOnRequirement; + private final Map mDefaultActionsOnSubsystems; + + MtRequirementsControl(Logger logger, Map actionsOnRequirement, Map defaultActionsOnSubsystems) { + mLogger = logger; + mActionsOnRequirement = actionsOnRequirement; + mDefaultActionsOnSubsystems = defaultActionsOnSubsystems; + } + + public MtRequirementsControl(Logger logger) { + this(logger, new HashMap<>(5), new HashMap<>(5)); + } + + public void updateRequirementsNoCurrentAction(Action action) { + for (Requirement requirement : action.getConfiguration().getRequirements()) { + mActionsOnRequirement.remove(requirement); + } + } + + public Set updateRequirementsWithNewRunningAction(Action action) { + Set conflictingActions = new HashSet<>(); + + for (Requirement requirement : action.getConfiguration().getRequirements()) { + if (mActionsOnRequirement.containsKey(requirement)) { + Action currentAction = mActionsOnRequirement.get(requirement); + conflictingActions.add(currentAction); + + mLogger.warn("Requirements conflict in Scheduler between {} and new action {} over requirement {}", + currentAction.toString(), action.toString(), requirement.toString()); + } + + mActionsOnRequirement.put(requirement, action); + } + + return conflictingActions; + } + + public Optional getActionOnRequirement(Requirement requirement) { + return Optional.ofNullable(mActionsOnRequirement.get(requirement)); + } + + public void setDefaultActionOnSubsystem(Subsystem subsystem, Action action) { + if (!action.getConfiguration().getRequirements().contains(subsystem)) { + throw new IllegalArgumentException("Action should require the default subsystem"); + } + + Action old = mDefaultActionsOnSubsystems.put(subsystem, action); + if (old != null && old.isRunning()) { + old.cancel(); + } + } + + public Map getDefaultActionsToStart() { + Map actionsToStart = new HashMap<>(); + + for (Map.Entry entry : mDefaultActionsOnSubsystems.entrySet()) { + if (mActionsOnRequirement.containsKey(entry.getKey())) { + continue; + } + + actionsToStart.put(entry.getKey(), entry.getValue()); + } + + return actionsToStart; + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtSchedulerWorker.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtSchedulerWorker.java new file mode 100644 index 000000000..c0d41640b --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtSchedulerWorker.java @@ -0,0 +1,55 @@ +package com.flash3388.flashlib.scheduling.mt; + +import com.beans.Property; +import com.flash3388.flashlib.scheduling.SchedulerMode; +import com.flash3388.flashlib.scheduling.actions.ActionContext; +import org.slf4j.Logger; + +public class MtSchedulerWorker implements Runnable { + + private final MtActionsControl mControl; + private final Property mCurrentMode; + private final Logger mLogger; + + public MtSchedulerWorker(MtActionsControl control, Property currentMode, + Logger logger) { + mControl = control; + mCurrentMode = currentMode; + mLogger = logger; + } + + @Override + public void run() { + while (!Thread.interrupted()) { + SchedulerMode mode = mCurrentMode.get(); + if (mode == null) { + Thread.yield(); + continue; + } + + ActionContext context = mControl.pollRunningAction(); + if (context == null || context.isCanceled()) { + continue; + } + + try { + if (mode.isDisabled() && !context.runWhenDisabled()) { + context.markCanceled(); + mControl.pushFinishedAction(context); + mLogger.debug("Action {} running in disabled. Canceling", context); + continue; + } + + if (!context.run()) { + mControl.pushFinishedAction(context); + } else { + mControl.pushRunningAction(context); + } + } catch (Throwable t) { + mLogger.error("Error while running an action", t); + context.markCanceled(); + mControl.pushFinishedAction(context); + } + } + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtWorkers.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtWorkers.java new file mode 100644 index 000000000..8bbd14748 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtWorkers.java @@ -0,0 +1,9 @@ +package com.flash3388.flashlib.scheduling.mt; + +import java.util.function.Supplier; + +public interface MtWorkers { + + void runWorkers(Supplier taskSupplier); + void stopWorkers(); +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MultiThreadedScheduler.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MultiThreadedScheduler.java new file mode 100644 index 000000000..68111a079 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MultiThreadedScheduler.java @@ -0,0 +1,163 @@ +package com.flash3388.flashlib.scheduling.mt; + +import com.beans.Property; +import com.beans.properties.atomic.AtomicProperty; +import com.flash3388.flashlib.scheduling.Requirement; +import com.flash3388.flashlib.scheduling.Scheduler; +import com.flash3388.flashlib.scheduling.SchedulerMode; +import com.flash3388.flashlib.scheduling.Subsystem; +import com.flash3388.flashlib.scheduling.actions.Action; +import com.flash3388.flashlib.time.Clock; +import com.flash3388.flashlib.time.Time; +import com.flash3388.flashlib.util.logging.Logging; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; + +public class MultiThreadedScheduler implements Scheduler, AutoCloseable { + + private static final int DEFAULT_WORKER_COUNT = 3; + + private final MtWorkers mWorkersControl; + private final Logger mLogger; + + private final MtRequirementsControl mRequirementsControl; + private final MtActionsControl mActionsControl; + private final Property mCurrentMode; + private final Set mActionsToStart; + + MultiThreadedScheduler(MtWorkers workersControl, Logger logger, + MtRequirementsControl requirementsControl, MtActionsControl actionsControl, + Property currentMode, Set actionsToStart) { + mWorkersControl = workersControl; + mLogger = logger; + mRequirementsControl = requirementsControl; + mActionsControl = actionsControl; + mCurrentMode = currentMode; + mActionsToStart = actionsToStart; + + mWorkersControl.runWorkers(()-> new MtSchedulerWorker(mActionsControl, mCurrentMode, mLogger)); + } + + public MultiThreadedScheduler(MtWorkers workersControl, Clock clock, Logger logger) { + mWorkersControl = workersControl; + mLogger = logger; + mRequirementsControl = new MtRequirementsControl(logger); + mActionsControl = new MtActionsControl(mRequirementsControl, clock, logger); + mCurrentMode = new AtomicProperty<>(); + mActionsToStart = new HashSet<>(); + + mWorkersControl.runWorkers(()-> new MtSchedulerWorker(mActionsControl, mCurrentMode, mLogger)); + } + + public MultiThreadedScheduler(Clock clock, Logger logger) { + this(new MtDaemonThreadWorkers(DEFAULT_WORKER_COUNT), clock, logger); + } + + public MultiThreadedScheduler(Clock clock) { + this(clock, Logging.stub()); + } + + @Override + public void start(Action action) { + Objects.requireNonNull(action, "action is null"); + + mActionsToStart.add(action); + } + + @Override + public void cancel(Action action) { + Objects.requireNonNull(action, "action is null"); + + if(!mActionsToStart.remove(action)) { + mActionsControl.cancelAction(action); + } + } + + @Override + public boolean isRunning(Action action) { + Objects.requireNonNull(action, "action is null"); + + return mActionsToStart.contains(action) || + mActionsControl.isActionRunning(action); + } + + @Override + public Time getActionRunTime(Action action) { + Objects.requireNonNull(action, "action is null"); + + if (mActionsToStart.contains(action)) { + return Time.milliseconds(0); + } + + return mActionsControl.getActionRunTime(action); + } + + @Override + public void cancelActionsIf(Predicate predicate) { + Objects.requireNonNull(predicate, "predicate is null"); + + mActionsToStart.removeIf(predicate); + mActionsControl.cancelActionsIf(predicate); + } + + @Override + public void cancelAllActions() { + mActionsToStart.clear(); + mActionsControl.cancelAllActions(); + } + + @Override + public void setDefaultAction(Subsystem subsystem, Action action) { + Objects.requireNonNull(subsystem, "subsystem is null"); + Objects.requireNonNull(action, "action is null"); + + mRequirementsControl.setDefaultActionOnSubsystem(subsystem, action); + } + + @Override + public Optional getActionRunningOnRequirement(Requirement requirement) { + Objects.requireNonNull(requirement, "requirement is null"); + return mRequirementsControl.getActionOnRequirement(requirement); + } + + @Override + public void run(SchedulerMode mode) { + mCurrentMode.set(mode); + + mActionsControl.processFinishedActions(); + + mActionsControl.startActions(mActionsToStart); + mActionsToStart.clear(); + + startDefaultActions(mode); + } + + @Override + public void close() { + mWorkersControl.stopWorkers(); + } + + private void startDefaultActions(SchedulerMode mode) { + for (Map.Entry entry : mRequirementsControl.getDefaultActionsToStart().entrySet()) { + try { + Action action = entry.getValue(); + + if (mode.isDisabled() && + !action.getConfiguration().shouldRunWhenDisabled()) { + continue; + } + + mLogger.debug("Starting default action for {}", entry.getKey()); + action.start(); + } catch (Throwable t) { + mLogger.error("Error when starting default action", t); + } + } + } +} From e78416edaa545f158ce24bf22c2121b72f4db0ac Mon Sep 17 00:00:00 2001 From: Tom Tzook Date: Sun, 13 Jun 2021 18:37:49 +0300 Subject: [PATCH 2/5] example running actions with the scheduler --- examples/scheduling/mt-scheduler/build.gradle | 9 ++ .../src/main/java/example/Main.java | 83 +++++++++++++++++++ settings.gradle | 2 + 3 files changed, 94 insertions(+) create mode 100644 examples/scheduling/mt-scheduler/build.gradle create mode 100644 examples/scheduling/mt-scheduler/src/main/java/example/Main.java diff --git a/examples/scheduling/mt-scheduler/build.gradle b/examples/scheduling/mt-scheduler/build.gradle new file mode 100644 index 000000000..cf5ee7213 --- /dev/null +++ b/examples/scheduling/mt-scheduler/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'application' +} + +dependencies { + implementation project(':flashlib.core.scheduling') +} + +mainClassName = 'example.Main' \ No newline at end of file diff --git a/examples/scheduling/mt-scheduler/src/main/java/example/Main.java b/examples/scheduling/mt-scheduler/src/main/java/example/Main.java new file mode 100644 index 000000000..5c3a43d3b --- /dev/null +++ b/examples/scheduling/mt-scheduler/src/main/java/example/Main.java @@ -0,0 +1,83 @@ +package example; + +import com.flash3388.flashlib.global.GlobalDependencies; +import com.flash3388.flashlib.scheduling.SchedulerMode; +import com.flash3388.flashlib.scheduling.actions.Action; +import com.flash3388.flashlib.scheduling.actions.ActionBase; +import com.flash3388.flashlib.scheduling.mt.MtExecutorServiceWorkers; +import com.flash3388.flashlib.scheduling.mt.MultiThreadedScheduler; +import com.flash3388.flashlib.time.Clock; +import com.flash3388.flashlib.time.SystemNanoClock; +import com.flash3388.flashlib.time.Time; +import com.flash3388.flashlib.util.logging.LogLevel; +import com.flash3388.flashlib.util.logging.LoggerBuilder; +import org.slf4j.Logger; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class Main { + + public static void main(String[] args) throws InterruptedException { + final int WORKER_COUNT = 2; + final SchedulerMode RUN_MODE = new SchedulerMode() { + @Override + public boolean isDisabled() { + return false; + } + }; + + ExecutorService executorService = Executors.newFixedThreadPool(WORKER_COUNT); + try { + Clock clock = new SystemNanoClock(); + Logger logger = new LoggerBuilder("example") + .enableConsoleLogging(true) + .setLogLevel(LogLevel.DEBUG) + .build(); + + try (MultiThreadedScheduler scheduler = new MultiThreadedScheduler( + new MtExecutorServiceWorkers(executorService, WORKER_COUNT), + clock, + logger)) { + GlobalDependencies.setClockInstance(clock); + GlobalDependencies.setSchedulerInstance(scheduler); + + Action action = new ActionBase() { + @Override + public void execute() { + + } + }; + action.configure() + .setName("Basic Action") + .setTimeout(Time.milliseconds(500)) + .save(); + action.start(); + + Action action1 = new ActionBase() { + @Override + public void execute() { + + } + }; + action1.configure() + .setName("Action to Cancel") + .save(); + action1.start(); + + Time startTime = clock.currentTime(); + Time runTime = Time.seconds(10); + while (clock.currentTime().sub(startTime).lessThan(runTime)) { + scheduler.run(RUN_MODE); + Thread.sleep(100); + + if (action1.isRunning()) { + action1.cancel(); + } + } + } + } finally { + executorService.shutdownNow(); + } + } +} diff --git a/settings.gradle b/settings.gradle index 8017396ef..85ed3b591 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,6 +19,8 @@ include 'examples:robot:basic-robot' include 'examples:robot:robot-manual-mode-control' include 'examples:robot:basic-trigger' +include 'examples:scheduling:mt-scheduler' + include 'examples:vision:basic-vision' include 'examples:vision:cv-2020-vision' include 'examples:vision:cv-colorfilter' From 69aab41bfca0e40e2b3449799e6c4ea558689ac6 Mon Sep 17 00:00:00 2001 From: Tom Tzook Date: Sun, 13 Jun 2021 19:22:40 +0300 Subject: [PATCH 3/5] fixed canceling if iteration' --- .../flashlib/scheduling/mt/MtActionsControl.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java index 56d3297d7..2e22799cb 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java @@ -6,8 +6,10 @@ import com.flash3388.flashlib.time.Time; import org.slf4j.Logger; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; @@ -62,17 +64,19 @@ public void cancelAction(Action action) { } public void cancelActionsIf(Predicate predicate) { + List toRemove = new ArrayList<>(); for (Action action : mRunningActions.keySet()) { if (predicate.test(action)) { - cancelAction(action); + toRemove.add(action); } } + + toRemove.forEach(this::cancelAction); } public void cancelAllActions() { - for (Action action : mRunningActions.keySet()) { - cancelAction(action); - } + List toRemove = new ArrayList<>(mRunningActions.keySet()); + toRemove.forEach(this::cancelAction); } public boolean isActionRunning(Action action) { From 25c4f3d480144d5ef32e0f4c4530211b6b837df1 Mon Sep 17 00:00:00 2001 From: Tom Tzook Date: Tue, 24 Aug 2021 09:52:28 +0300 Subject: [PATCH 4/5] changed package for mt scheduler code --- .../scheduling/mt-scheduler/src/main/java/example/Main.java | 4 ++-- .../scheduling/{mt => threading}/MtActionsControl.java | 2 +- .../scheduling/{mt => threading}/MtDaemonThreadWorkers.java | 2 +- .../{mt => threading}/MtExecutorServiceWorkers.java | 2 +- .../scheduling/{mt => threading}/MtRequirementsControl.java | 2 +- .../scheduling/{mt => threading}/MtSchedulerWorker.java | 2 +- .../flashlib/scheduling/{mt => threading}/MtWorkers.java | 2 +- .../scheduling/{mt => threading}/MultiThreadedScheduler.java | 2 +- 8 files changed, 9 insertions(+), 9 deletions(-) rename flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/{mt => threading}/MtActionsControl.java (98%) rename flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/{mt => threading}/MtDaemonThreadWorkers.java (96%) rename flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/{mt => threading}/MtExecutorServiceWorkers.java (95%) rename flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/{mt => threading}/MtRequirementsControl.java (98%) rename flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/{mt => threading}/MtSchedulerWorker.java (97%) rename flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/{mt => threading}/MtWorkers.java (73%) rename flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/{mt => threading}/MultiThreadedScheduler.java (99%) diff --git a/examples/scheduling/mt-scheduler/src/main/java/example/Main.java b/examples/scheduling/mt-scheduler/src/main/java/example/Main.java index 5c3a43d3b..cff9ba13d 100644 --- a/examples/scheduling/mt-scheduler/src/main/java/example/Main.java +++ b/examples/scheduling/mt-scheduler/src/main/java/example/Main.java @@ -4,8 +4,8 @@ import com.flash3388.flashlib.scheduling.SchedulerMode; import com.flash3388.flashlib.scheduling.actions.Action; import com.flash3388.flashlib.scheduling.actions.ActionBase; -import com.flash3388.flashlib.scheduling.mt.MtExecutorServiceWorkers; -import com.flash3388.flashlib.scheduling.mt.MultiThreadedScheduler; +import com.flash3388.flashlib.scheduling.threading.MtExecutorServiceWorkers; +import com.flash3388.flashlib.scheduling.threading.MultiThreadedScheduler; import com.flash3388.flashlib.time.Clock; import com.flash3388.flashlib.time.SystemNanoClock; import com.flash3388.flashlib.time.Time; diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java similarity index 98% rename from flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java rename to flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java index 2e22799cb..206cc2f94 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtActionsControl.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java @@ -1,4 +1,4 @@ -package com.flash3388.flashlib.scheduling.mt; +package com.flash3388.flashlib.scheduling.threading; import com.flash3388.flashlib.scheduling.actions.Action; import com.flash3388.flashlib.scheduling.actions.ActionContext; diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtDaemonThreadWorkers.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtDaemonThreadWorkers.java similarity index 96% rename from flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtDaemonThreadWorkers.java rename to flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtDaemonThreadWorkers.java index cf6165f01..f5595e880 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtDaemonThreadWorkers.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtDaemonThreadWorkers.java @@ -1,4 +1,4 @@ -package com.flash3388.flashlib.scheduling.mt; +package com.flash3388.flashlib.scheduling.threading; import java.util.HashSet; import java.util.Set; diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtExecutorServiceWorkers.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtExecutorServiceWorkers.java similarity index 95% rename from flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtExecutorServiceWorkers.java rename to flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtExecutorServiceWorkers.java index acfb6557e..e7e8dbd4a 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtExecutorServiceWorkers.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtExecutorServiceWorkers.java @@ -1,4 +1,4 @@ -package com.flash3388.flashlib.scheduling.mt; +package com.flash3388.flashlib.scheduling.threading; import java.util.HashSet; import java.util.Set; diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtRequirementsControl.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtRequirementsControl.java similarity index 98% rename from flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtRequirementsControl.java rename to flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtRequirementsControl.java index 9b8b878c9..2533e3a27 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtRequirementsControl.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtRequirementsControl.java @@ -1,4 +1,4 @@ -package com.flash3388.flashlib.scheduling.mt; +package com.flash3388.flashlib.scheduling.threading; import com.flash3388.flashlib.scheduling.Requirement; import com.flash3388.flashlib.scheduling.Subsystem; diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtSchedulerWorker.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtSchedulerWorker.java similarity index 97% rename from flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtSchedulerWorker.java rename to flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtSchedulerWorker.java index c0d41640b..18a0ab960 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtSchedulerWorker.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtSchedulerWorker.java @@ -1,4 +1,4 @@ -package com.flash3388.flashlib.scheduling.mt; +package com.flash3388.flashlib.scheduling.threading; import com.beans.Property; import com.flash3388.flashlib.scheduling.SchedulerMode; diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtWorkers.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtWorkers.java similarity index 73% rename from flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtWorkers.java rename to flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtWorkers.java index 8bbd14748..4ed87db95 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MtWorkers.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtWorkers.java @@ -1,4 +1,4 @@ -package com.flash3388.flashlib.scheduling.mt; +package com.flash3388.flashlib.scheduling.threading; import java.util.function.Supplier; diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MultiThreadedScheduler.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MultiThreadedScheduler.java similarity index 99% rename from flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MultiThreadedScheduler.java rename to flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MultiThreadedScheduler.java index 68111a079..bcad9a665 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/mt/MultiThreadedScheduler.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MultiThreadedScheduler.java @@ -1,4 +1,4 @@ -package com.flash3388.flashlib.scheduling.mt; +package com.flash3388.flashlib.scheduling.threading; import com.beans.Property; import com.beans.properties.atomic.AtomicProperty; From f9d6d7484aa7c5d114e6e817e051d8b105c7a105 Mon Sep 17 00:00:00 2001 From: Tom Tzook Date: Thu, 23 Sep 2021 20:05:42 +0300 Subject: [PATCH 5/5] removed cap on queus --- .../flashlib/scheduling/threading/MtActionsControl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java index 206cc2f94..ca56aa7ef 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java @@ -42,8 +42,8 @@ public class MtActionsControl { public MtActionsControl(MtRequirementsControl requirementsControl, Clock clock, Logger logger) { this(requirementsControl, clock, logger, new HashMap<>(5), - new LinkedBlockingQueue<>(5), - new LinkedBlockingQueue<>(5)); + new LinkedBlockingQueue<>(), + new LinkedBlockingQueue<>()); } public void startActions(Collection actions) {