Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/teetime/framework/AbstractPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ public abstract class AbstractPipe<T> implements IPipe<T> {
private final InputPort<T> targetPort;

// FIXME each scheduler should set a default pipe scheduler by its own
private PipeScheduler scheduler = new PipeScheduler() {
private PipeScheduler<T> scheduler = new PipeScheduler<T>() {

@Override
public void onElementAdded(final AbstractSynchedPipe<?> pipe) {
public void onElementAdded(final AbstractSynchedPipe<T> pipe) {
// do nothing
}

@Override
public void onElementAdded(final AbstractUnsynchedPipe<?> pipe) {
public void onElementAdded(final AbstractUnsynchedPipe<T> pipe) {
// do nothing
}

@Override
public void onElementNotAdded(final AbstractSynchedPipe<?> pipe) {
public void onElementNotAdded(final AbstractSynchedPipe<T> pipe) {
// do nothing
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/teetime/framework/AbstractPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
public class AbstractPort<T> {

protected static final Object TERMINATE_ELEMENT = new Object();
protected final T TERMINATE_ELEMENT = null;

protected IPipe<T> pipe;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ public void close() {
}

@Override
public boolean addNonBlocking(final Object element) {
public boolean addNonBlocking(final P element) {
return outputPipe.addNonBlocking(element);
}

@Override
public void add(final Object element) {
public void add(final P element) {
outputPipe.add(element);
}

Expand All @@ -131,7 +131,7 @@ public int size() {
}

@Override
public Object removeLast() {
public P removeLast() {
return outputPipe.removeLast();
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/teetime/framework/InstantiationPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public int size() {
}

@Override
public Object removeLast() {
public T removeLast() {
throw new IllegalStateException(ERROR_MESSAGE);
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/teetime/framework/pipe/DummyPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public boolean addNonBlocking(final Object element) {
}

@Override
public Object removeLast() {
public T removeLast() {
return null;
}

Expand All @@ -61,12 +61,12 @@ public int size() {
}

@Override
public OutputPort<? extends Object> getSourcePort() {
public OutputPort<? extends T> getSourcePort() {
return null;
}

@Override
public InputPort<Object> getTargetPort() {
public InputPort<T> getTargetPort() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,25 @@
*/
class BoundedMpMcSynchedPipe<T> extends AbstractSynchedPipe<T> implements IMonitorablePipe {

private final MpmcArrayQueue<Object> queue;
private final MpmcArrayQueue<T> queue;

private transient long lastProducerIndex, lastConsumerIndex;

public BoundedMpMcSynchedPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int requestedCapacity) {
super(sourcePort, targetPort);
this.queue = new MpmcArrayQueue<Object>(requestedCapacity);
this.queue = new MpmcArrayQueue<T>(requestedCapacity);
}

@Override
public void add(final Object element) {
public void add(final T element) {
while (!this.queue.offer(element)) {
getScheduler().onElementNotAdded(this);
}
getScheduler().onElementAdded(this);
}

@Override
public boolean addNonBlocking(final Object element) {
public boolean addNonBlocking(final T element) {
return this.queue.offer(element);
}

Expand All @@ -66,7 +66,7 @@ public int size() {
}

@Override
public Object removeLast() {
public T removeLast() {
return this.queue.poll();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
* @since 3.0
*
*/
public class GlobalTaskPoolScheduling implements TeeTimeScheduler, PipeScheduler, UncaughtExceptionHandler {
public class GlobalTaskPoolScheduling<T> implements TeeTimeScheduler, PipeScheduler<T>, UncaughtExceptionHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTaskPoolScheduling.class);
private static final StageFacade STAGE_FACADE = StageFacade.INSTANCE;
Expand Down Expand Up @@ -320,13 +320,13 @@ private void setScheduler(final AbstractStage stage) {
}

@Override
public void onElementAdded(final AbstractUnsynchedPipe<?> pipe) {
public void onElementAdded(final AbstractUnsynchedPipe<T> pipe) {
String message = String.format("This scheduler does not allow unsynched pipes: %s", pipe);
throw new IllegalStateException(message);
}

@Override
public void onElementAdded(final AbstractSynchedPipe<?> pipe) {
public void onElementAdded(final AbstractSynchedPipe<T> pipe) {
BoundedMpMcSynchedPipe<?> castedPipe = (BoundedMpMcSynchedPipe<?>) pipe;
long numPushes = castedPipe.getNumPushesSinceAppStart();
long lastNumPushes = castedPipe.getLastProducerIndex();
Expand All @@ -345,7 +345,7 @@ public void onElementAdded(final AbstractSynchedPipe<?> pipe) {
}

@Override
public void onElementNotAdded(final AbstractSynchedPipe<?> pipe) {
public void onElementNotAdded(final AbstractSynchedPipe<T> pipe) {
if (!taskPool.scheduleStage(pipe.getCachedTargetStage())) {
throw new IllegalStateException(String.format("onElementNotAdded: scheduling target stage failed for %s", pipe.getCachedTargetStage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/**
* Automatically instantiates the correct pipes
*/
class A3PipeInstantiation implements ITraverserVisitor, PipeScheduler {
class A3PipeInstantiation<T> implements ITraverserVisitor, PipeScheduler<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(A3PipeInstantiation.class);

Expand Down Expand Up @@ -89,17 +89,17 @@ private <T> void instantiatePipe(final IPipe<T> pipe) {
}

@Override
public void onElementAdded(final AbstractUnsynchedPipe<?> pipe) {
public void onElementAdded(final AbstractUnsynchedPipe<T> pipe) {
pipe.getCachedTargetStage().executeByFramework();
}

@Override
public void onElementAdded(final AbstractSynchedPipe<?> pipe) {
public void onElementAdded(final AbstractSynchedPipe<T> pipe) {
// do nothing
}

@Override
public void onElementNotAdded(final AbstractSynchedPipe<?> pipe) {
public void onElementNotAdded(final AbstractSynchedPipe<T> pipe) {
// do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public boolean isEmpty() {
}

@Override
public Object removeLast() {
public T removeLast() {
return null;
}

Expand Down