Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions examples/scheduling/mt-scheduler/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
plugins {
id 'application'
}

dependencies {
implementation project(':flashlib.core.scheduling')
}

mainClassName = 'example.Main'
83 changes: 83 additions & 0 deletions examples/scheduling/mt-scheduler/src/main/java/example/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package example;

import com.flash3388.flashlib.global.GlobalDependencies;
import com.flash3388.flashlib.scheduling.SchedulerMode;
import com.flash3388.flashlib.scheduling.actions.Action;
import com.flash3388.flashlib.scheduling.actions.ActionBase;
import com.flash3388.flashlib.scheduling.threading.MtExecutorServiceWorkers;
import com.flash3388.flashlib.scheduling.threading.MultiThreadedScheduler;
import com.flash3388.flashlib.time.Clock;
import com.flash3388.flashlib.time.SystemNanoClock;
import com.flash3388.flashlib.time.Time;
import com.flash3388.flashlib.util.logging.LogLevel;
import com.flash3388.flashlib.util.logging.LoggerBuilder;
import org.slf4j.Logger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {

public static void main(String[] args) throws InterruptedException {
final int WORKER_COUNT = 2;
final SchedulerMode RUN_MODE = new SchedulerMode() {
@Override
public boolean isDisabled() {
return false;
}
};

ExecutorService executorService = Executors.newFixedThreadPool(WORKER_COUNT);
try {
Clock clock = new SystemNanoClock();
Logger logger = new LoggerBuilder("example")
.enableConsoleLogging(true)
.setLogLevel(LogLevel.DEBUG)
.build();

try (MultiThreadedScheduler scheduler = new MultiThreadedScheduler(
new MtExecutorServiceWorkers(executorService, WORKER_COUNT),
clock,
logger)) {
GlobalDependencies.setClockInstance(clock);
GlobalDependencies.setSchedulerInstance(scheduler);

Action action = new ActionBase() {
@Override
public void execute() {

}
};
action.configure()
.setName("Basic Action")
.setTimeout(Time.milliseconds(500))
.save();
action.start();

Action action1 = new ActionBase() {
@Override
public void execute() {

}
};
action1.configure()
.setName("Action to Cancel")
.save();
action1.start();

Time startTime = clock.currentTime();
Time runTime = Time.seconds(10);
while (clock.currentTime().sub(startTime).lessThan(runTime)) {
scheduler.run(RUN_MODE);
Thread.sleep(100);

if (action1.isRunning()) {
action1.cancel();
}
}
}
} finally {
executorService.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public ActionContext(Action action, Clock clock) {
mIsInitialized = false;
}

public boolean isCanceled() {
return mActionState.isCanceled();
}

public void prepareForRun() {
mActionState.markStarted();

Expand Down Expand Up @@ -92,4 +96,13 @@ boolean wasTimeoutReached(){

return getRunTime().after(mTimeout);
}

public Action getAction() {
return mAction;
}

@Override
public String toString() {
return mAction.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package com.flash3388.flashlib.scheduling.threading;

import com.flash3388.flashlib.scheduling.actions.Action;
import com.flash3388.flashlib.scheduling.actions.ActionContext;
import com.flash3388.flashlib.time.Clock;
import com.flash3388.flashlib.time.Time;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;

public class MtActionsControl {

private final MtRequirementsControl mRequirementsControl;
private final Clock mClock;
private final Logger mLogger;

private final Map<Action, ActionContext> mRunningActions;
private final Queue<ActionContext> mRunningActionsContexts;
private final Queue<ActionContext> mFinishedActionsContexts;

MtActionsControl(MtRequirementsControl requirementsControl, Clock clock, Logger logger,
Map<Action, ActionContext> runningActions,
Queue<ActionContext> runningActionsContexts,
Queue<ActionContext> finishedActionsContexts) {
mRequirementsControl = requirementsControl;
mClock = clock;
mLogger = logger;

mRunningActions = runningActions;
mRunningActionsContexts = runningActionsContexts;
mFinishedActionsContexts = finishedActionsContexts;
}

public MtActionsControl(MtRequirementsControl requirementsControl, Clock clock, Logger logger) {
this(requirementsControl, clock, logger,
new HashMap<>(5),
new LinkedBlockingQueue<>(),
new LinkedBlockingQueue<>());
}

public void startActions(Collection<Action> actions) {
for (Action action : actions) {
onActionStart(action);
}
}

public void cancelAction(Action action) {
ActionContext context = mRunningActions.get(action);
if (context != null) {
context.markCanceled();
mRunningActionsContexts.remove(context);
mFinishedActionsContexts.add(context);
} else {
throw new IllegalStateException("action not running");
}
}

public void cancelActionsIf(Predicate<? super Action> predicate) {
List<Action> toRemove = new ArrayList<>();
for (Action action : mRunningActions.keySet()) {
if (predicate.test(action)) {
toRemove.add(action);
}
}

toRemove.forEach(this::cancelAction);
}

public void cancelAllActions() {
List<Action> toRemove = new ArrayList<>(mRunningActions.keySet());
toRemove.forEach(this::cancelAction);
}

public boolean isActionRunning(Action action) {
return mRunningActions.containsKey(action);
}

public Time getActionRunTime(Action action) {
ActionContext context = mRunningActions.get(action);
if (context != null) {
return context.getRunTime();
} else {
throw new IllegalStateException("action not running");
}
}

public ActionContext pollRunningAction() {
return mRunningActionsContexts.poll();
}

public void pushRunningAction(ActionContext context) {
mRunningActionsContexts.add(context);
}

public void pushFinishedAction(ActionContext context) {
mFinishedActionsContexts.add(context);
}

public void processFinishedActions() {
while (!mFinishedActionsContexts.isEmpty()) {
ActionContext context = mFinishedActionsContexts.poll();
if (context == null) {
break;
}

if (mRunningActions.remove(context.getAction()) != null) {
onActionFinished(context, true);
}
}
}

private void onActionStart(Action action) {
if (mRunningActions.containsKey(action)) {
mLogger.debug("Attempted to start action {} when already running", action);
return;
}

Set<Action> conflictingActions = mRequirementsControl.updateRequirementsWithNewRunningAction(action);
conflictingActions.forEach((conflictingAction)-> {
ActionContext context = mRunningActions.remove(conflictingAction);
if (context != null) {
context.markCanceled();
onActionFinished(context, false);
}
});

ActionContext context = new ActionContext(action, mClock);
context.prepareForRun();

mRunningActions.put(action, context);
mRunningActionsContexts.add(context);

mLogger.debug("Started action {}", action);
}

private void onActionFinished(ActionContext context, boolean updateRequirements) {
Action action = context.getAction();

context.runFinished();
if (updateRequirements) {
mRequirementsControl.updateRequirementsNoCurrentAction(action);
}

mLogger.debug("Finished action {}", action);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.flash3388.flashlib.scheduling.threading;

import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;

public class MtDaemonThreadWorkers implements MtWorkers {

private final Function<Runnable, Thread> mWorkerFactory;
private final int mWorkerCount;
private final Set<Thread> mThreads;

public MtDaemonThreadWorkers(Function<Runnable, Thread> workerFactory, int workerCount) {
mWorkerFactory = workerFactory;
mWorkerCount = workerCount;
mThreads = new HashSet<>();
}

public MtDaemonThreadWorkers(ThreadGroup group, String threadName, int workerCount) {
this((task)-> new Thread(group, task, threadName), workerCount);
}

public MtDaemonThreadWorkers(int workerCount) {
this(new ThreadGroup("mt-scheduler"), "mt-scheduler-worker", workerCount);
}

@Override
public void runWorkers(Supplier<Runnable> taskSupplier) {
for (int i = 0; i < mWorkerCount; i++) {
Runnable task = taskSupplier.get();

Thread thread = mWorkerFactory.apply(task);
thread.setDaemon(true);
mThreads.add(thread);

thread.start();
}
}

@Override
public void stopWorkers() {
for (Thread thread : mThreads) {
thread.interrupt();
}

mThreads.clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.flash3388.flashlib.scheduling.threading;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;

public class MtExecutorServiceWorkers implements MtWorkers {

private final ExecutorService mExecutorService;
private final int mWorkerCount;
private final Set<Future<?>> mFutures;

public MtExecutorServiceWorkers(ExecutorService executorService, int workerCount) {
mExecutorService = executorService;
mWorkerCount = workerCount;

mFutures = new HashSet<>();
}

@Override
public void runWorkers(Supplier<Runnable> taskSupplier) {
for (int i = 0; i < mWorkerCount; i++) {
Runnable runnable = taskSupplier.get();
Future<?> future = mExecutorService.submit(runnable);
mFutures.add(future);
}
}

@Override
public void stopWorkers() {
for (Future<?> future : mFutures) {
future.cancel(true);
}

mFutures.clear();
}
}
Loading