s = findOne(myLocId);
+
+ return s.flatMap(myLoc ->
+ // merge historical and live data
+ Streams.merge(locationSaveEvents,
+ Streams.from(locations.findAll())
+ .dispatchOn(Environment.get())
+
+ // not us
+ .filter(l -> !nullSafeEquals(l.getId(), myLocId))
+
+ // only Locations within given Distance
+ .filter(new GeoNearPredicate(myLoc.toPoint(), new Distance(distance)))
+ ));
+ }
}
diff --git a/src/main/java/demo/geo/GeoNearPredicate.java b/src/main/java/demo/geo/GeoNearPredicate.java
index 3d3ad9c..e0f69d1 100644
--- a/src/main/java/demo/geo/GeoNearPredicate.java
+++ b/src/main/java/demo/geo/GeoNearPredicate.java
@@ -3,13 +3,13 @@
import demo.domain.Location;
import org.springframework.data.geo.Distance;
import org.springframework.data.geo.Point;
-import reactor.function.Predicate;
+import reactor.fn.Predicate;
/**
- * This {@link reactor.function.Predicate} implementation calculates the distance between a "home" {@link
+ * This {@link reactor.fn.Predicate} implementation calculates the distance between a "home" {@link
* org.springframework.data.geo.Point} set at instantiation time to the {@link demo.domain.Location Locations} passing
* through a {@link reactor.rx.Stream} using a Haversine formula [1]. If the distance is within the specified amount,
- * the {@link reactor.function.Predicate} allows the {@link demo.domain.Location} to pass through the {@link
+ * the {@link reactor.fn.Predicate} allows the {@link demo.domain.Location} to pass through the {@link
* reactor.rx.Stream}j to be processed. [1] - http://rosettacode.org/wiki/Haversine_formula#Java
*
* @author Jon Brisbin