diff --git a/examples/scheduling/mt-scheduler/build.gradle b/examples/scheduling/mt-scheduler/build.gradle new file mode 100644 index 000000000..cf5ee7213 --- /dev/null +++ b/examples/scheduling/mt-scheduler/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'application' +} + +dependencies { + implementation project(':flashlib.core.scheduling') +} + +mainClassName = 'example.Main' \ No newline at end of file diff --git a/examples/scheduling/mt-scheduler/src/main/java/example/Main.java b/examples/scheduling/mt-scheduler/src/main/java/example/Main.java new file mode 100644 index 000000000..cff9ba13d --- /dev/null +++ b/examples/scheduling/mt-scheduler/src/main/java/example/Main.java @@ -0,0 +1,83 @@ +package example; + +import com.flash3388.flashlib.global.GlobalDependencies; +import com.flash3388.flashlib.scheduling.SchedulerMode; +import com.flash3388.flashlib.scheduling.actions.Action; +import com.flash3388.flashlib.scheduling.actions.ActionBase; +import com.flash3388.flashlib.scheduling.threading.MtExecutorServiceWorkers; +import com.flash3388.flashlib.scheduling.threading.MultiThreadedScheduler; +import com.flash3388.flashlib.time.Clock; +import com.flash3388.flashlib.time.SystemNanoClock; +import com.flash3388.flashlib.time.Time; +import com.flash3388.flashlib.util.logging.LogLevel; +import com.flash3388.flashlib.util.logging.LoggerBuilder; +import org.slf4j.Logger; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class Main { + + public static void main(String[] args) throws InterruptedException { + final int WORKER_COUNT = 2; + final SchedulerMode RUN_MODE = new SchedulerMode() { + @Override + public boolean isDisabled() { + return false; + } + }; + + ExecutorService executorService = Executors.newFixedThreadPool(WORKER_COUNT); + try { + Clock clock = new SystemNanoClock(); + Logger logger = new LoggerBuilder("example") + .enableConsoleLogging(true) + .setLogLevel(LogLevel.DEBUG) + .build(); + + try (MultiThreadedScheduler scheduler = new MultiThreadedScheduler( + new MtExecutorServiceWorkers(executorService, WORKER_COUNT), + clock, + logger)) { + GlobalDependencies.setClockInstance(clock); + GlobalDependencies.setSchedulerInstance(scheduler); + + Action action = new ActionBase() { + @Override + public void execute() { + + } + }; + action.configure() + .setName("Basic Action") + .setTimeout(Time.milliseconds(500)) + .save(); + action.start(); + + Action action1 = new ActionBase() { + @Override + public void execute() { + + } + }; + action1.configure() + .setName("Action to Cancel") + .save(); + action1.start(); + + Time startTime = clock.currentTime(); + Time runTime = Time.seconds(10); + while (clock.currentTime().sub(startTime).lessThan(runTime)) { + scheduler.run(RUN_MODE); + Thread.sleep(100); + + if (action1.isRunning()) { + action1.cancel(); + } + } + } + } finally { + executorService.shutdownNow(); + } + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/actions/ActionContext.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/actions/ActionContext.java index 72156c156..d6116fd03 100644 --- a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/actions/ActionContext.java +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/actions/ActionContext.java @@ -25,6 +25,10 @@ public ActionContext(Action action, Clock clock) { mIsInitialized = false; } + public boolean isCanceled() { + return mActionState.isCanceled(); + } + public void prepareForRun() { mActionState.markStarted(); @@ -92,4 +96,13 @@ boolean wasTimeoutReached(){ return getRunTime().after(mTimeout); } + + public Action getAction() { + return mAction; + } + + @Override + public String toString() { + return mAction.toString(); + } } diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java new file mode 100644 index 000000000..ca56aa7ef --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtActionsControl.java @@ -0,0 +1,154 @@ +package com.flash3388.flashlib.scheduling.threading; + +import com.flash3388.flashlib.scheduling.actions.Action; +import com.flash3388.flashlib.scheduling.actions.ActionContext; +import com.flash3388.flashlib.time.Clock; +import com.flash3388.flashlib.time.Time; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Predicate; + +public class MtActionsControl { + + private final MtRequirementsControl mRequirementsControl; + private final Clock mClock; + private final Logger mLogger; + + private final Map mRunningActions; + private final Queue mRunningActionsContexts; + private final Queue mFinishedActionsContexts; + + MtActionsControl(MtRequirementsControl requirementsControl, Clock clock, Logger logger, + Map runningActions, + Queue runningActionsContexts, + Queue finishedActionsContexts) { + mRequirementsControl = requirementsControl; + mClock = clock; + mLogger = logger; + + mRunningActions = runningActions; + mRunningActionsContexts = runningActionsContexts; + mFinishedActionsContexts = finishedActionsContexts; + } + + public MtActionsControl(MtRequirementsControl requirementsControl, Clock clock, Logger logger) { + this(requirementsControl, clock, logger, + new HashMap<>(5), + new LinkedBlockingQueue<>(), + new LinkedBlockingQueue<>()); + } + + public void startActions(Collection actions) { + for (Action action : actions) { + onActionStart(action); + } + } + + public void cancelAction(Action action) { + ActionContext context = mRunningActions.get(action); + if (context != null) { + context.markCanceled(); + mRunningActionsContexts.remove(context); + mFinishedActionsContexts.add(context); + } else { + throw new IllegalStateException("action not running"); + } + } + + public void cancelActionsIf(Predicate predicate) { + List toRemove = new ArrayList<>(); + for (Action action : mRunningActions.keySet()) { + if (predicate.test(action)) { + toRemove.add(action); + } + } + + toRemove.forEach(this::cancelAction); + } + + public void cancelAllActions() { + List toRemove = new ArrayList<>(mRunningActions.keySet()); + toRemove.forEach(this::cancelAction); + } + + public boolean isActionRunning(Action action) { + return mRunningActions.containsKey(action); + } + + public Time getActionRunTime(Action action) { + ActionContext context = mRunningActions.get(action); + if (context != null) { + return context.getRunTime(); + } else { + throw new IllegalStateException("action not running"); + } + } + + public ActionContext pollRunningAction() { + return mRunningActionsContexts.poll(); + } + + public void pushRunningAction(ActionContext context) { + mRunningActionsContexts.add(context); + } + + public void pushFinishedAction(ActionContext context) { + mFinishedActionsContexts.add(context); + } + + public void processFinishedActions() { + while (!mFinishedActionsContexts.isEmpty()) { + ActionContext context = mFinishedActionsContexts.poll(); + if (context == null) { + break; + } + + if (mRunningActions.remove(context.getAction()) != null) { + onActionFinished(context, true); + } + } + } + + private void onActionStart(Action action) { + if (mRunningActions.containsKey(action)) { + mLogger.debug("Attempted to start action {} when already running", action); + return; + } + + Set conflictingActions = mRequirementsControl.updateRequirementsWithNewRunningAction(action); + conflictingActions.forEach((conflictingAction)-> { + ActionContext context = mRunningActions.remove(conflictingAction); + if (context != null) { + context.markCanceled(); + onActionFinished(context, false); + } + }); + + ActionContext context = new ActionContext(action, mClock); + context.prepareForRun(); + + mRunningActions.put(action, context); + mRunningActionsContexts.add(context); + + mLogger.debug("Started action {}", action); + } + + private void onActionFinished(ActionContext context, boolean updateRequirements) { + Action action = context.getAction(); + + context.runFinished(); + if (updateRequirements) { + mRequirementsControl.updateRequirementsNoCurrentAction(action); + } + + mLogger.debug("Finished action {}", action); + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtDaemonThreadWorkers.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtDaemonThreadWorkers.java new file mode 100644 index 000000000..f5595e880 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtDaemonThreadWorkers.java @@ -0,0 +1,49 @@ +package com.flash3388.flashlib.scheduling.threading; + +import java.util.HashSet; +import java.util.Set; +import java.util.function.Function; +import java.util.function.Supplier; + +public class MtDaemonThreadWorkers implements MtWorkers { + + private final Function mWorkerFactory; + private final int mWorkerCount; + private final Set mThreads; + + public MtDaemonThreadWorkers(Function workerFactory, int workerCount) { + mWorkerFactory = workerFactory; + mWorkerCount = workerCount; + mThreads = new HashSet<>(); + } + + public MtDaemonThreadWorkers(ThreadGroup group, String threadName, int workerCount) { + this((task)-> new Thread(group, task, threadName), workerCount); + } + + public MtDaemonThreadWorkers(int workerCount) { + this(new ThreadGroup("mt-scheduler"), "mt-scheduler-worker", workerCount); + } + + @Override + public void runWorkers(Supplier taskSupplier) { + for (int i = 0; i < mWorkerCount; i++) { + Runnable task = taskSupplier.get(); + + Thread thread = mWorkerFactory.apply(task); + thread.setDaemon(true); + mThreads.add(thread); + + thread.start(); + } + } + + @Override + public void stopWorkers() { + for (Thread thread : mThreads) { + thread.interrupt(); + } + + mThreads.clear(); + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtExecutorServiceWorkers.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtExecutorServiceWorkers.java new file mode 100644 index 000000000..e7e8dbd4a --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtExecutorServiceWorkers.java @@ -0,0 +1,39 @@ +package com.flash3388.flashlib.scheduling.threading; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Supplier; + +public class MtExecutorServiceWorkers implements MtWorkers { + + private final ExecutorService mExecutorService; + private final int mWorkerCount; + private final Set> mFutures; + + public MtExecutorServiceWorkers(ExecutorService executorService, int workerCount) { + mExecutorService = executorService; + mWorkerCount = workerCount; + + mFutures = new HashSet<>(); + } + + @Override + public void runWorkers(Supplier taskSupplier) { + for (int i = 0; i < mWorkerCount; i++) { + Runnable runnable = taskSupplier.get(); + Future future = mExecutorService.submit(runnable); + mFutures.add(future); + } + } + + @Override + public void stopWorkers() { + for (Future future : mFutures) { + future.cancel(true); + } + + mFutures.clear(); + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtRequirementsControl.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtRequirementsControl.java new file mode 100644 index 000000000..2533e3a27 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtRequirementsControl.java @@ -0,0 +1,83 @@ +package com.flash3388.flashlib.scheduling.threading; + +import com.flash3388.flashlib.scheduling.Requirement; +import com.flash3388.flashlib.scheduling.Subsystem; +import com.flash3388.flashlib.scheduling.actions.Action; +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class MtRequirementsControl { + + private final Logger mLogger; + + private final Map mActionsOnRequirement; + private final Map mDefaultActionsOnSubsystems; + + MtRequirementsControl(Logger logger, Map actionsOnRequirement, Map defaultActionsOnSubsystems) { + mLogger = logger; + mActionsOnRequirement = actionsOnRequirement; + mDefaultActionsOnSubsystems = defaultActionsOnSubsystems; + } + + public MtRequirementsControl(Logger logger) { + this(logger, new HashMap<>(5), new HashMap<>(5)); + } + + public void updateRequirementsNoCurrentAction(Action action) { + for (Requirement requirement : action.getConfiguration().getRequirements()) { + mActionsOnRequirement.remove(requirement); + } + } + + public Set updateRequirementsWithNewRunningAction(Action action) { + Set conflictingActions = new HashSet<>(); + + for (Requirement requirement : action.getConfiguration().getRequirements()) { + if (mActionsOnRequirement.containsKey(requirement)) { + Action currentAction = mActionsOnRequirement.get(requirement); + conflictingActions.add(currentAction); + + mLogger.warn("Requirements conflict in Scheduler between {} and new action {} over requirement {}", + currentAction.toString(), action.toString(), requirement.toString()); + } + + mActionsOnRequirement.put(requirement, action); + } + + return conflictingActions; + } + + public Optional getActionOnRequirement(Requirement requirement) { + return Optional.ofNullable(mActionsOnRequirement.get(requirement)); + } + + public void setDefaultActionOnSubsystem(Subsystem subsystem, Action action) { + if (!action.getConfiguration().getRequirements().contains(subsystem)) { + throw new IllegalArgumentException("Action should require the default subsystem"); + } + + Action old = mDefaultActionsOnSubsystems.put(subsystem, action); + if (old != null && old.isRunning()) { + old.cancel(); + } + } + + public Map getDefaultActionsToStart() { + Map actionsToStart = new HashMap<>(); + + for (Map.Entry entry : mDefaultActionsOnSubsystems.entrySet()) { + if (mActionsOnRequirement.containsKey(entry.getKey())) { + continue; + } + + actionsToStart.put(entry.getKey(), entry.getValue()); + } + + return actionsToStart; + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtSchedulerWorker.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtSchedulerWorker.java new file mode 100644 index 000000000..18a0ab960 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtSchedulerWorker.java @@ -0,0 +1,55 @@ +package com.flash3388.flashlib.scheduling.threading; + +import com.beans.Property; +import com.flash3388.flashlib.scheduling.SchedulerMode; +import com.flash3388.flashlib.scheduling.actions.ActionContext; +import org.slf4j.Logger; + +public class MtSchedulerWorker implements Runnable { + + private final MtActionsControl mControl; + private final Property mCurrentMode; + private final Logger mLogger; + + public MtSchedulerWorker(MtActionsControl control, Property currentMode, + Logger logger) { + mControl = control; + mCurrentMode = currentMode; + mLogger = logger; + } + + @Override + public void run() { + while (!Thread.interrupted()) { + SchedulerMode mode = mCurrentMode.get(); + if (mode == null) { + Thread.yield(); + continue; + } + + ActionContext context = mControl.pollRunningAction(); + if (context == null || context.isCanceled()) { + continue; + } + + try { + if (mode.isDisabled() && !context.runWhenDisabled()) { + context.markCanceled(); + mControl.pushFinishedAction(context); + mLogger.debug("Action {} running in disabled. Canceling", context); + continue; + } + + if (!context.run()) { + mControl.pushFinishedAction(context); + } else { + mControl.pushRunningAction(context); + } + } catch (Throwable t) { + mLogger.error("Error while running an action", t); + context.markCanceled(); + mControl.pushFinishedAction(context); + } + } + } +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtWorkers.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtWorkers.java new file mode 100644 index 000000000..4ed87db95 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MtWorkers.java @@ -0,0 +1,9 @@ +package com.flash3388.flashlib.scheduling.threading; + +import java.util.function.Supplier; + +public interface MtWorkers { + + void runWorkers(Supplier taskSupplier); + void stopWorkers(); +} diff --git a/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MultiThreadedScheduler.java b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MultiThreadedScheduler.java new file mode 100644 index 000000000..bcad9a665 --- /dev/null +++ b/flashlib.core.scheduling/src/main/java/com/flash3388/flashlib/scheduling/threading/MultiThreadedScheduler.java @@ -0,0 +1,163 @@ +package com.flash3388.flashlib.scheduling.threading; + +import com.beans.Property; +import com.beans.properties.atomic.AtomicProperty; +import com.flash3388.flashlib.scheduling.Requirement; +import com.flash3388.flashlib.scheduling.Scheduler; +import com.flash3388.flashlib.scheduling.SchedulerMode; +import com.flash3388.flashlib.scheduling.Subsystem; +import com.flash3388.flashlib.scheduling.actions.Action; +import com.flash3388.flashlib.time.Clock; +import com.flash3388.flashlib.time.Time; +import com.flash3388.flashlib.util.logging.Logging; +import org.slf4j.Logger; + +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; + +public class MultiThreadedScheduler implements Scheduler, AutoCloseable { + + private static final int DEFAULT_WORKER_COUNT = 3; + + private final MtWorkers mWorkersControl; + private final Logger mLogger; + + private final MtRequirementsControl mRequirementsControl; + private final MtActionsControl mActionsControl; + private final Property mCurrentMode; + private final Set mActionsToStart; + + MultiThreadedScheduler(MtWorkers workersControl, Logger logger, + MtRequirementsControl requirementsControl, MtActionsControl actionsControl, + Property currentMode, Set actionsToStart) { + mWorkersControl = workersControl; + mLogger = logger; + mRequirementsControl = requirementsControl; + mActionsControl = actionsControl; + mCurrentMode = currentMode; + mActionsToStart = actionsToStart; + + mWorkersControl.runWorkers(()-> new MtSchedulerWorker(mActionsControl, mCurrentMode, mLogger)); + } + + public MultiThreadedScheduler(MtWorkers workersControl, Clock clock, Logger logger) { + mWorkersControl = workersControl; + mLogger = logger; + mRequirementsControl = new MtRequirementsControl(logger); + mActionsControl = new MtActionsControl(mRequirementsControl, clock, logger); + mCurrentMode = new AtomicProperty<>(); + mActionsToStart = new HashSet<>(); + + mWorkersControl.runWorkers(()-> new MtSchedulerWorker(mActionsControl, mCurrentMode, mLogger)); + } + + public MultiThreadedScheduler(Clock clock, Logger logger) { + this(new MtDaemonThreadWorkers(DEFAULT_WORKER_COUNT), clock, logger); + } + + public MultiThreadedScheduler(Clock clock) { + this(clock, Logging.stub()); + } + + @Override + public void start(Action action) { + Objects.requireNonNull(action, "action is null"); + + mActionsToStart.add(action); + } + + @Override + public void cancel(Action action) { + Objects.requireNonNull(action, "action is null"); + + if(!mActionsToStart.remove(action)) { + mActionsControl.cancelAction(action); + } + } + + @Override + public boolean isRunning(Action action) { + Objects.requireNonNull(action, "action is null"); + + return mActionsToStart.contains(action) || + mActionsControl.isActionRunning(action); + } + + @Override + public Time getActionRunTime(Action action) { + Objects.requireNonNull(action, "action is null"); + + if (mActionsToStart.contains(action)) { + return Time.milliseconds(0); + } + + return mActionsControl.getActionRunTime(action); + } + + @Override + public void cancelActionsIf(Predicate predicate) { + Objects.requireNonNull(predicate, "predicate is null"); + + mActionsToStart.removeIf(predicate); + mActionsControl.cancelActionsIf(predicate); + } + + @Override + public void cancelAllActions() { + mActionsToStart.clear(); + mActionsControl.cancelAllActions(); + } + + @Override + public void setDefaultAction(Subsystem subsystem, Action action) { + Objects.requireNonNull(subsystem, "subsystem is null"); + Objects.requireNonNull(action, "action is null"); + + mRequirementsControl.setDefaultActionOnSubsystem(subsystem, action); + } + + @Override + public Optional getActionRunningOnRequirement(Requirement requirement) { + Objects.requireNonNull(requirement, "requirement is null"); + return mRequirementsControl.getActionOnRequirement(requirement); + } + + @Override + public void run(SchedulerMode mode) { + mCurrentMode.set(mode); + + mActionsControl.processFinishedActions(); + + mActionsControl.startActions(mActionsToStart); + mActionsToStart.clear(); + + startDefaultActions(mode); + } + + @Override + public void close() { + mWorkersControl.stopWorkers(); + } + + private void startDefaultActions(SchedulerMode mode) { + for (Map.Entry entry : mRequirementsControl.getDefaultActionsToStart().entrySet()) { + try { + Action action = entry.getValue(); + + if (mode.isDisabled() && + !action.getConfiguration().shouldRunWhenDisabled()) { + continue; + } + + mLogger.debug("Starting default action for {}", entry.getKey()); + action.start(); + } catch (Throwable t) { + mLogger.error("Error when starting default action", t); + } + } + } +} diff --git a/settings.gradle b/settings.gradle index 8017396ef..85ed3b591 100644 --- a/settings.gradle +++ b/settings.gradle @@ -19,6 +19,8 @@ include 'examples:robot:basic-robot' include 'examples:robot:robot-manual-mode-control' include 'examples:robot:basic-trigger' +include 'examples:scheduling:mt-scheduler' + include 'examples:vision:basic-vision' include 'examples:vision:cv-2020-vision' include 'examples:vision:cv-colorfilter'