Skip to content
This repository was archived by the owner on Feb 24, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.1.6.RELEASE</version>
<version>1.2.1.RELEASE</version>
<relativePath/>
</parent>

Expand Down Expand Up @@ -82,12 +82,12 @@

<!-- Reactor -->
<dependency>
<groupId>org.projectreactor</groupId>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor.version}</version>
</dependency>
<dependency>
<groupId>org.projectreactor.spring</groupId>
<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-context</artifactId>
<version>${reactor.version}</version>
</dependency>
Expand All @@ -106,7 +106,7 @@
<java.version>1.8</java.version>
<spring-boot-ratpack.version>1.0.0.BUILD-SNAPSHOT</spring-boot-ratpack.version>
<reactor.version>2.0.0.BUILD-SNAPSHOT</reactor.version>
<ratpack.version>0.9.9-SNAPSHOT</ratpack.version>
<ratpack.version>0.9.11-SNAPSHOT</ratpack.version>
</properties>

<build>
Expand Down
113 changes: 58 additions & 55 deletions src/main/java/demo/ProcessorApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
import ratpack.render.Renderer;
import ratpack.render.RendererSupport;
import ratpack.spring.annotation.EnableRatpack;
import reactor.core.Environment;
import reactor.rx.Stream;
import reactor.Environment;
import reactor.rx.Streams;
import reactor.rx.stream.HotStream;
import reactor.rx.broadcast.Broadcaster;
import reactor.spring.context.config.EnableReactor;

import static ratpack.jackson.Jackson.fromJson;
Expand All @@ -36,66 +35,70 @@
@EnableReactor
public class ProcessorApplication {

@Bean
public HotStream<Location> locationEventStream(Environment env) {
return Streams.defer(env);
}
static {
Environment.initialize();
}

@Bean
public Action<Chain> handlers(LocationService locations,
ObjectMapper mapper,
ModelMapper beanMapper) {
return (chain) -> {
// Create new Location
chain.post("location", ctx -> {
Location loc = ctx.parse(fromJson(Location.class));
@Bean
public Broadcaster<Location> locationEventStream(Environment env) {
return Broadcaster.create();
}

ctx.promise(f -> locations.update(loc)
.consume(f::success))
.then(ctx::render);
});
@Bean
public Action<Chain> handlers(LocationService locations,
ObjectMapper mapper,
ModelMapper beanMapper) {
return (chain) -> {
// Create new Location
chain.post("location", ctx -> {
Location loc = ctx.parse(fromJson(Location.class));

// Update existing Location
chain.put("location/:id", ctx -> {
String id = ctx.getPathTokens().get("id");
ctx.promise(f -> locations.update(loc)
.consume(f::success))
.then(ctx::render);
});

ctx.promise(f -> locations.findOne(id)
.observe(l -> beanMapper.map(l, ctx.parse(fromJson(Location.class))))
.flatMap(locations::update)
.consume(f::success))
.then(ctx::render);
});
// Update existing Location
chain.put("location/:id", ctx -> {
String id = ctx.getPathTokens().get("id");

// Watch for updates of Locations near us
chain.handler("location/:id/nearby", ctx -> {
String id = ctx.getPathTokens().get("id");
int distance = Integer.valueOf(ctx.getRequest()
.getQueryParams()
.get("distance"));
ctx.promise(f -> locations.findOne(id)
.observe(l -> beanMapper.map(l, ctx.parse(fromJson(Location.class))))
.flatMap(locations::update)
.consume(f::success))
.then(ctx::render);
});

websocketBroadcast(ctx, locations.nearby(id, distance)
.map(l -> l.toJson(mapper)));
});
};
}
// Watch for updates of Locations near us
chain.handler("location/:id/nearby", ctx -> {
String id = ctx.getPathTokens().get("id");
int distance = Integer.valueOf(ctx.getRequest()
.getQueryParams()
.get("distance"));

@Bean
public ModelMapper beanMapper() {
return new ModelMapper();
}
websocketBroadcast(ctx, locations.nearby(id, distance)
.map(l -> l.toJson(mapper)));
});
};
}

@Bean
public Renderer<Location> locationRenderer() {
return new RendererSupport<Location>() {
@Override
public void render(Context ctx, Location loc) throws Exception {
ctx.render(json(loc));
}
};
}
@Bean
public ModelMapper beanMapper() {
return new ModelMapper();
}

public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}
@Bean
public Renderer<Location> locationRenderer() {
return new RendererSupport<Location>() {
@Override
public void render(Context ctx, Location loc) throws Exception {
ctx.render(json(loc));
}
};
}

public static void main(String[] args) {
SpringApplication.run(ProcessorApplication.class, args);
}

}
2 changes: 1 addition & 1 deletion src/main/java/demo/domain/Location.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.springframework.data.geo.Point;
import org.springframework.data.mongodb.core.index.GeoSpatialIndexed;
import org.springframework.data.mongodb.core.mapping.Document;
import reactor.util.ObjectUtils;
import reactor.core.support.ObjectUtils;

/**
* @author Jon Brisbin
Expand Down
120 changes: 60 additions & 60 deletions src/main/java/demo/domain/LocationService.java
Original file line number Diff line number Diff line change
@@ -1,81 +1,81 @@
package demo.domain;

import demo.geo.GeoNearPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.geo.Distance;
import org.springframework.stereotype.Service;
import reactor.core.Environment;
import reactor.Environment;
import reactor.fn.Consumer;
import reactor.fn.Supplier;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.stream.HotStream;
import reactor.rx.action.terminal.ObservableAction;
import reactor.rx.broadcast.Broadcaster;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static reactor.util.ObjectUtils.nullSafeEquals;
import static reactor.core.support.ObjectUtils.nullSafeEquals;


/**
* @author Jon Brisbin
*/
@Service
public class LocationService {

private final Logger log = LoggerFactory.getLogger(getClass());
private final ConcurrentHashMap<String, Stream<Location>> nearbyStreams = new ConcurrentHashMap<>();

private final Environment env;
private final LocationRepository locations;
private final HotStream<Location> locationSaveEvents;

@Autowired
public LocationService(Environment env,
LocationRepository locations,
HotStream<Location> locationSaveEvents) {
this.env = env;
this.locations = locations;
this.locationSaveEvents = locationSaveEvents;

locations.deleteAll();
}

public Map<String, Stream<Location>> registry() {
return this.nearbyStreams;
}

public Stream<Location> findOne(String id) {
return Streams.just(id)
.dispatchOn(env, env.getDefaultDispatcherFactory().get())
.<Location>map(locations::findOne);
}

public Stream<Location> update(Location loc) {
return Streams.just(loc)
.dispatchOn(env, env.getDefaultDispatcherFactory().get())

// persist incoming to MongoDB
.map(locations::save)

// broadcast this update to others
.observe(locationSaveEvents::broadcastNext);
}

public Stream<Location> nearby(String myLocId, int distance) {
Stream<Location> s = findOne(myLocId);

return s.flatMap(myLoc ->
// merge historical and live data
Streams.merge(locationSaveEvents,
Streams.defer(locations.findAll()))
.dispatchOn(env, s.getDispatcher())

// not us
.filter(l -> !nullSafeEquals(l.getId(), myLocId))

// only Locations within given Distance
.filter(new GeoNearPredicate(myLoc.toPoint(), new Distance(distance)))
);
}
private final ConcurrentHashMap<String, Stream<Location>> nearbyStreams = new ConcurrentHashMap<>();

private final LocationRepository locations;
private final Broadcaster<Location> locationSaveEvents;

@Autowired
public LocationService(
LocationRepository locations,
Broadcaster<Location> locationSaveEvents) {
this.locations = locations;
this.locationSaveEvents = locationSaveEvents;

locations.deleteAll();
}

public Map<String, Stream<Location>> registry() {
return this.nearbyStreams;
}

public Stream<Location> findOne(String id) {
return Streams.just(id)
.dispatchOn(Environment.get())
.<Location>map(locations::findOne);
}

public Stream<Location> update(Location loc) {
return Streams.just(loc)
.dispatchOn(Environment.get())

// persist incoming to MongoDB
.map(locations::save)

// broadcast this update to others
.observe(locationSaveEvents::onNext);
}

public Stream<Location> nearby(String myLocId, int distance) {
Stream<Location> s = findOne(myLocId);

return s.flatMap(myLoc ->
// merge historical and live data
Streams.merge(locationSaveEvents,
Streams.from(locations.findAll())
.dispatchOn(Environment.get())

// not us
.filter(l -> !nullSafeEquals(l.getId(), myLocId))

// only Locations within given Distance
.filter(new GeoNearPredicate(myLoc.toPoint(), new Distance(distance)))
));
}

}
6 changes: 3 additions & 3 deletions src/main/java/demo/geo/GeoNearPredicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
import demo.domain.Location;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.Point;
import reactor.function.Predicate;
import reactor.fn.Predicate;

/**
* This {@link reactor.function.Predicate} implementation calculates the distance between a "home" {@link
* This {@link reactor.fn.Predicate} implementation calculates the distance between a "home" {@link
* org.springframework.data.geo.Point} set at instantiation time to the {@link demo.domain.Location Locations} passing
* through a {@link reactor.rx.Stream} using a Haversine formula [1]. If the distance is within the specified amount,
* the {@link reactor.function.Predicate} allows the {@link demo.domain.Location} to pass through the {@link
* the {@link reactor.fn.Predicate} allows the {@link demo.domain.Location} to pass through the {@link
* reactor.rx.Stream}j to be processed. <p> [1] - http://rosettacode.org/wiki/Haversine_formula#Java </p>
*
* @author Jon Brisbin
Expand Down