From 0e8c5810835467200bd60cf85a08bcfa688ef0e4 Mon Sep 17 00:00:00 2001 From: Tiare Balbi <0484.gerente@olearys.com.br> Date: Thu, 19 Feb 2015 10:39:12 -0200 Subject: [PATCH 1/2] - Updated and organized the imports; - Refactored the methods in LocationService; - Updated Spring Boot version to 1.2.1.RELEASE; - Adjusted some config methods. --- pom.xml | 6 +- src/main/java/demo/ProcessorApplication.java | 114 +++++++++-------- src/main/java/demo/domain/Location.java | 2 +- .../java/demo/domain/LocationService.java | 120 +++++++++--------- src/main/java/demo/geo/GeoNearPredicate.java | 6 +- 5 files changed, 125 insertions(+), 123 deletions(-) diff --git a/pom.xml b/pom.xml index 07b74f7..462c424 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ org.springframework.boot spring-boot-starter-parent - 1.1.6.RELEASE + 1.2.1.RELEASE @@ -82,12 +82,12 @@ - org.projectreactor + io.projectreactor reactor-core ${reactor.version} - org.projectreactor.spring + io.projectreactor.spring reactor-spring-context ${reactor.version} diff --git a/src/main/java/demo/ProcessorApplication.java b/src/main/java/demo/ProcessorApplication.java index 9c43802..1e23658 100644 --- a/src/main/java/demo/ProcessorApplication.java +++ b/src/main/java/demo/ProcessorApplication.java @@ -17,10 +17,8 @@ import ratpack.render.Renderer; import ratpack.render.RendererSupport; import ratpack.spring.annotation.EnableRatpack; -import reactor.core.Environment; -import reactor.rx.Stream; -import reactor.rx.Streams; -import reactor.rx.stream.HotStream; +import reactor.Environment; +import reactor.rx.broadcast.Broadcaster; import reactor.spring.context.config.EnableReactor; import static ratpack.jackson.Jackson.fromJson; @@ -36,66 +34,70 @@ @EnableReactor public class ProcessorApplication { - @Bean - public HotStream locationEventStream(Environment env) { - return Streams.defer(env); - } + static { + Environment.initialize(); + } - @Bean - public Action handlers(LocationService locations, - ObjectMapper mapper, - ModelMapper beanMapper) { - return (chain) -> { - // Create new Location - chain.post("location", ctx -> { - Location loc = ctx.parse(fromJson(Location.class)); + @Bean + public Broadcaster locationEventStream() { + return Broadcaster.create(); + } - ctx.promise(f -> locations.update(loc) - .consume(f::success)) - .then(ctx::render); - }); + @Bean + public Action handlers(LocationService locations, + ObjectMapper mapper, + ModelMapper beanMapper) { + return (chain) -> { + // Create new Location + chain.post("location", ctx -> { + Location loc = ctx.parse(fromJson(Location.class)); - // Update existing Location - chain.put("location/:id", ctx -> { - String id = ctx.getPathTokens().get("id"); + ctx.promise(f -> locations.update(loc) + .consume(f::success)) + .then(ctx::render); + }); - ctx.promise(f -> locations.findOne(id) - .observe(l -> beanMapper.map(l, ctx.parse(fromJson(Location.class)))) - .flatMap(locations::update) - .consume(f::success)) - .then(ctx::render); - }); + // Update existing Location + chain.put("location/:id", ctx -> { + String id = ctx.getPathTokens().get("id"); - // Watch for updates of Locations near us - chain.handler("location/:id/nearby", ctx -> { - String id = ctx.getPathTokens().get("id"); - int distance = Integer.valueOf(ctx.getRequest() - .getQueryParams() - .get("distance")); + ctx.promise(f -> locations.findOne(id) + .observe(l -> beanMapper.map(l, ctx.parse(fromJson(Location.class)))) + .flatMap(locations::update) + .consume(f::success)) + .then(ctx::render); + }); - websocketBroadcast(ctx, locations.nearby(id, distance) - .map(l -> l.toJson(mapper))); - }); - }; - } + // Watch for updates of Locations near us + chain.handler("location/:id/nearby", ctx -> { + String id = ctx.getPathTokens().get("id"); + int distance = Integer.valueOf(ctx.getRequest() + .getQueryParams() + .get("distance")); - @Bean - public ModelMapper beanMapper() { - return new ModelMapper(); - } + websocketBroadcast(ctx, locations.nearby(id, distance) + .map(l -> l.toJson(mapper))); + }); + }; + } - @Bean - public Renderer locationRenderer() { - return new RendererSupport() { - @Override - public void render(Context ctx, Location loc) throws Exception { - ctx.render(json(loc)); - } - }; - } + @Bean + public ModelMapper beanMapper() { + return new ModelMapper(); + } - public static void main(String[] args) { - SpringApplication.run(ProcessorApplication.class, args); - } + @Bean + public Renderer locationRenderer() { + return new RendererSupport() { + @Override + public void render(Context ctx, Location loc) throws Exception { + ctx.render(json(loc)); + } + }; + } + + public static void main(String[] args) { + SpringApplication.run(ProcessorApplication.class, args); + } } diff --git a/src/main/java/demo/domain/Location.java b/src/main/java/demo/domain/Location.java index 42c986a..d0d914e 100644 --- a/src/main/java/demo/domain/Location.java +++ b/src/main/java/demo/domain/Location.java @@ -6,7 +6,7 @@ import org.springframework.data.geo.Point; import org.springframework.data.mongodb.core.index.GeoSpatialIndexed; import org.springframework.data.mongodb.core.mapping.Document; -import reactor.util.ObjectUtils; +import reactor.core.support.ObjectUtils; /** * @author Jon Brisbin diff --git a/src/main/java/demo/domain/LocationService.java b/src/main/java/demo/domain/LocationService.java index 1b81408..eae110b 100644 --- a/src/main/java/demo/domain/LocationService.java +++ b/src/main/java/demo/domain/LocationService.java @@ -1,20 +1,23 @@ package demo.domain; import demo.geo.GeoNearPredicate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.reactivestreams.Publisher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.geo.Distance; import org.springframework.stereotype.Service; -import reactor.core.Environment; +import reactor.Environment; +import reactor.fn.Consumer; +import reactor.fn.Supplier; import reactor.rx.Stream; import reactor.rx.Streams; -import reactor.rx.stream.HotStream; +import reactor.rx.action.terminal.ObservableAction; +import reactor.rx.broadcast.Broadcaster; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static reactor.util.ObjectUtils.nullSafeEquals; +import static reactor.core.support.ObjectUtils.nullSafeEquals; + /** * @author Jon Brisbin @@ -22,60 +25,57 @@ @Service public class LocationService { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final ConcurrentHashMap> nearbyStreams = new ConcurrentHashMap<>(); - - private final Environment env; - private final LocationRepository locations; - private final HotStream locationSaveEvents; - - @Autowired - public LocationService(Environment env, - LocationRepository locations, - HotStream locationSaveEvents) { - this.env = env; - this.locations = locations; - this.locationSaveEvents = locationSaveEvents; - - locations.deleteAll(); - } - - public Map> registry() { - return this.nearbyStreams; - } - - public Stream findOne(String id) { - return Streams.just(id) - .dispatchOn(env, env.getDefaultDispatcherFactory().get()) - .map(locations::findOne); - } - - public Stream update(Location loc) { - return Streams.just(loc) - .dispatchOn(env, env.getDefaultDispatcherFactory().get()) - - // persist incoming to MongoDB - .map(locations::save) - - // broadcast this update to others - .observe(locationSaveEvents::broadcastNext); - } - - public Stream nearby(String myLocId, int distance) { - Stream s = findOne(myLocId); - - return s.flatMap(myLoc -> - // merge historical and live data - Streams.merge(locationSaveEvents, - Streams.defer(locations.findAll())) - .dispatchOn(env, s.getDispatcher()) - - // not us - .filter(l -> !nullSafeEquals(l.getId(), myLocId)) - - // only Locations within given Distance - .filter(new GeoNearPredicate(myLoc.toPoint(), new Distance(distance))) - ); - } + private final ConcurrentHashMap> nearbyStreams = new ConcurrentHashMap<>(); + + private final LocationRepository locations; + private final Broadcaster locationSaveEvents; + + @Autowired + public LocationService( + LocationRepository locations, + Broadcaster locationSaveEvents) { + this.locations = locations; + this.locationSaveEvents = locationSaveEvents; + + locations.deleteAll(); + } + + public Map> registry() { + return this.nearbyStreams; + } + + public Stream findOne(String id) { + return Streams.just(id) + .dispatchOn(Environment.cachedDispatcher()) + .map(locations::findOne); + } + + public Stream update(Location loc) { + return Streams.just(loc) + .dispatchOn(Environment.cachedDispatcher()) + + // persist incoming to MongoDB + .map(locations::save) + + // broadcast this update to others + .observe(locationSaveEvents::onNext); + } + + public Stream nearby(String myLocId, int distance) { + Stream s = findOne(myLocId); + + return s.flatMap(myLoc -> + // merge historical and live data + Streams.merge(locationSaveEvents, + Streams.from(locations.findAll()) + .dispatchOn(Environment.cachedDispatcher()) + + // not us + .filter(l -> !nullSafeEquals(l.getId(), myLocId)) + + // only Locations within given Distance + .filter(new GeoNearPredicate(myLoc.toPoint(), new Distance(distance))) + )); + } } diff --git a/src/main/java/demo/geo/GeoNearPredicate.java b/src/main/java/demo/geo/GeoNearPredicate.java index 3d3ad9c..e0f69d1 100644 --- a/src/main/java/demo/geo/GeoNearPredicate.java +++ b/src/main/java/demo/geo/GeoNearPredicate.java @@ -3,13 +3,13 @@ import demo.domain.Location; import org.springframework.data.geo.Distance; import org.springframework.data.geo.Point; -import reactor.function.Predicate; +import reactor.fn.Predicate; /** - * This {@link reactor.function.Predicate} implementation calculates the distance between a "home" {@link + * This {@link reactor.fn.Predicate} implementation calculates the distance between a "home" {@link * org.springframework.data.geo.Point} set at instantiation time to the {@link demo.domain.Location Locations} passing * through a {@link reactor.rx.Stream} using a Haversine formula [1]. If the distance is within the specified amount, - * the {@link reactor.function.Predicate} allows the {@link demo.domain.Location} to pass through the {@link + * the {@link reactor.fn.Predicate} allows the {@link demo.domain.Location} to pass through the {@link * reactor.rx.Stream}j to be processed.

[1] - http://rosettacode.org/wiki/Haversine_formula#Java

* * @author Jon Brisbin From 736c942fbf3ec655d81353de49c2ed9058168206 Mon Sep 17 00:00:00 2001 From: Tiare Balbi <0484.gerente@olearys.com.br> Date: Fri, 20 Feb 2015 14:54:26 -0200 Subject: [PATCH 2/2] ratpack updated and updated the usage of Environment --- pom.xml | 2 +- src/main/java/demo/ProcessorApplication.java | 3 ++- src/main/java/demo/domain/LocationService.java | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 462c424..94478c9 100644 --- a/pom.xml +++ b/pom.xml @@ -106,7 +106,7 @@ 1.8 1.0.0.BUILD-SNAPSHOT 2.0.0.BUILD-SNAPSHOT - 0.9.9-SNAPSHOT + 0.9.11-SNAPSHOT diff --git a/src/main/java/demo/ProcessorApplication.java b/src/main/java/demo/ProcessorApplication.java index 1e23658..99cc5b5 100644 --- a/src/main/java/demo/ProcessorApplication.java +++ b/src/main/java/demo/ProcessorApplication.java @@ -18,6 +18,7 @@ import ratpack.render.RendererSupport; import ratpack.spring.annotation.EnableRatpack; import reactor.Environment; +import reactor.rx.Streams; import reactor.rx.broadcast.Broadcaster; import reactor.spring.context.config.EnableReactor; @@ -39,7 +40,7 @@ public class ProcessorApplication { } @Bean - public Broadcaster locationEventStream() { + public Broadcaster locationEventStream(Environment env) { return Broadcaster.create(); } diff --git a/src/main/java/demo/domain/LocationService.java b/src/main/java/demo/domain/LocationService.java index eae110b..8016852 100644 --- a/src/main/java/demo/domain/LocationService.java +++ b/src/main/java/demo/domain/LocationService.java @@ -46,13 +46,13 @@ public Map> registry() { public Stream findOne(String id) { return Streams.just(id) - .dispatchOn(Environment.cachedDispatcher()) + .dispatchOn(Environment.get()) .map(locations::findOne); } public Stream update(Location loc) { return Streams.just(loc) - .dispatchOn(Environment.cachedDispatcher()) + .dispatchOn(Environment.get()) // persist incoming to MongoDB .map(locations::save) @@ -68,7 +68,7 @@ public Stream nearby(String myLocId, int distance) { // merge historical and live data Streams.merge(locationSaveEvents, Streams.from(locations.findAll()) - .dispatchOn(Environment.cachedDispatcher()) + .dispatchOn(Environment.get()) // not us .filter(l -> !nullSafeEquals(l.getId(), myLocId))