From baebd263b75f095d067fbf9f94563a52871af550 Mon Sep 17 00:00:00 2001 From: Krzysztof Janosz Date: Wed, 5 Jul 2017 17:29:30 +0200 Subject: [PATCH 1/4] Add support for raw sort functions --- agni/api/app/foxcomm/agni/api/Api.scala | 4 +- .../core/app/foxcomm/agni/SearchService.scala | 21 ++++++--- agni/core/app/foxcomm/agni/dsl/package.scala | 10 +++++ agni/core/app/foxcomm/agni/dsl/query.scala | 2 - agni/core/app/foxcomm/agni/dsl/sort.scala | 34 ++++++++++++++ .../agni/interpreter/QueryInterpreter.scala | 19 +++----- .../agni/interpreter/SortInterpreter.scala | 13 ++++++ .../interpreter/es/ESSortInterpreter.scala | 45 +++++++++++++++++++ .../foxcomm/agni/interpreter/es/package.scala | 9 ++++ agni/core/app/foxcomm/agni/package.scala | 16 ++++--- agni/core/app/foxcomm/agni/payload.scala | 4 +- 11 files changed, 146 insertions(+), 31 deletions(-) create mode 100644 agni/core/app/foxcomm/agni/dsl/sort.scala create mode 100644 agni/core/app/foxcomm/agni/interpreter/SortInterpreter.scala create mode 100644 agni/core/app/foxcomm/agni/interpreter/es/ESSortInterpreter.scala diff --git a/agni/api/app/foxcomm/agni/api/Api.scala b/agni/api/app/foxcomm/agni/api/Api.scala index 621f86b0dd..3588c2ff09 100644 --- a/agni/api/app/foxcomm/agni/api/Api.scala +++ b/agni/api/app/foxcomm/agni/api/Api.scala @@ -5,7 +5,7 @@ import com.twitter.finagle.http.Status import com.twitter.util.Await import foxcomm.agni._ import foxcomm.agni.dsl.query._ -import foxcomm.agni.interpreter.es.queryInterpreter +import foxcomm.agni.interpreter.es._ import foxcomm.utils.finch._ import io.circe.generic.extras.auto._ import io.finch._ @@ -44,7 +44,7 @@ object Api extends App { implicit val s: Scheduler = Scheduler.global val config = AppConfig.load() - val svc = SearchService.fromConfig(config, queryInterpreter) + val svc = SearchService.fromConfig(config, queryInterpreter, sortInterpreter) Await.result( Http.server diff --git a/agni/core/app/foxcomm/agni/SearchService.scala b/agni/core/app/foxcomm/agni/SearchService.scala index ad017d659b..991d69207b 100644 --- a/agni/core/app/foxcomm/agni/SearchService.scala +++ b/agni/core/app/foxcomm/agni/SearchService.scala @@ -15,7 +15,7 @@ import org.elasticsearch.index.query.QueryBuilder import org.elasticsearch.search.SearchHit @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) -class SearchService private (client: Client, qi: ESQueryInterpreter) { +class SearchService private (client: Client, qi: ESQueryInterpreter, si: ESSortInterpreter) { import SearchService.ExtractJsonObject def translate(searchPayload: SearchPayload.fc): Task[Json] = { @@ -56,8 +56,15 @@ class SearchService private (client: Client, qi: ESQueryInterpreter) { def evalQuery(builder: SearchRequestBuilder): Coeval[SearchRequestBuilder] = searchPayload match { case SearchPayload.es(query, _) ⇒ Coeval.eval(builder.setQuery(Json.fromJsonObject(query).toBytes)) - case SearchPayload.fc(query, _) ⇒ - qi(query).map(builder.setQuery) + case SearchPayload.fc(query, sort, _) ⇒ + for { + query ← qi(query) + sorts ← si(sort) + } yield { + builder.setQuery(query) + sorts.foreach(builder.addSort) + builder + } } def setupBuilder: Task[SearchRequestBuilder] = (prepareBuilder flatMap evalQuery).task @@ -90,10 +97,10 @@ object SearchService { .flatMap(_.asObject) } - def apply(client: Client, qi: ESQueryInterpreter): SearchService = - new SearchService(client, qi) + def apply(client: Client, qi: ESQueryInterpreter, si: ESSortInterpreter): SearchService = + new SearchService(client, qi, si) - def fromConfig(config: AppConfig, qi: ESQueryInterpreter): SearchService = { + def fromConfig(config: AppConfig, qi: ESQueryInterpreter, si: ESSortInterpreter): SearchService = { val esConfig = config.elasticsearch val settings = Settings.settingsBuilder().put("cluster.name", esConfig.cluster).build() @@ -103,6 +110,6 @@ object SearchService { .build() .addTransportAddresses(esConfig.host.toList.map(new InetSocketTransportAddress(_)): _*) - apply(client, qi) + apply(client, qi, si) } } diff --git a/agni/core/app/foxcomm/agni/dsl/package.scala b/agni/core/app/foxcomm/agni/dsl/package.scala index 2aeb24ae85..228c626eeb 100644 --- a/agni/core/app/foxcomm/agni/dsl/package.scala +++ b/agni/core/app/foxcomm/agni/dsl/package.scala @@ -1,7 +1,17 @@ package foxcomm.agni +import io.circe.Decoder import io.circe.generic.extras.Configuration +import shapeless._ package object dsl { + val Discriminator: String = foxcomm.agni.Discriminator + implicit def configuration: Configuration = foxcomm.agni.configuration + + // decodes coproduct in a naive way assuming that json representations of each component are disjoint + implicit def decodeCoproduct[H: Decoder, T <: Coproduct: Decoder]: Decoder[H :+: T] = + Decoder[H].map(Inl(_)) or Decoder[T].map(Inr(_)) + + implicit val decodeCNil: Decoder[CNil] = Decoder.failedWithMessage("Cannot decode value") } diff --git a/agni/core/app/foxcomm/agni/dsl/query.scala b/agni/core/app/foxcomm/agni/dsl/query.scala index 4bc9dbea96..bc80925722 100644 --- a/agni/core/app/foxcomm/agni/dsl/query.scala +++ b/agni/core/app/foxcomm/agni/dsl/query.scala @@ -145,8 +145,6 @@ object query { sealed trait QueryFunction object QueryFunction { - val Discriminator = "type" - private def buildQueryFunctionDecoder[A <: QueryFunction](expectedTpe: String, decoder: Decoder[A])( onBoost: (Float, HCursor, Decoder[A]) ⇒ Decoder.Result[A]) = Decoder.instance { c ⇒ diff --git a/agni/core/app/foxcomm/agni/dsl/sort.scala b/agni/core/app/foxcomm/agni/dsl/sort.scala new file mode 100644 index 0000000000..96b6bc1977 --- /dev/null +++ b/agni/core/app/foxcomm/agni/dsl/sort.scala @@ -0,0 +1,34 @@ +package foxcomm.agni.dsl + +import cats.data.NonEmptyList +import io.circe._ +import io.circe.generic.extras.semiauto._ +import shapeless._ + +object sort { + sealed trait SortFunction + object SortFunction { + type RawSortValue = JsonObject :+: String :+: CNil + implicit val decodeRawSortValue: Decoder[RawSortValue] = decodeCoproduct[JsonObject, String :+: CNil] + .withErrorMessage("Raw sort value must be either a string or an object") + + final case class raw private (value: RawSortValue) extends SortFunction + object raw { + implicit val decodeRaw: Decoder[raw] = deriveDecoder[raw] + } + + implicit val decodeSortFunction: Decoder[SortFunction] = + Decoder[raw].map(identity[SortFunction](_)) + } + + final case class FCSort(sort: Option[NonEmptyList[SortFunction]]) + object FCSort { + implicit val decodeFCQuery: Decoder[FCSort] = { + Decoder + .decodeOption( + Decoder.decodeNonEmptyList[SortFunction] or + Decoder[SortFunction].map(NonEmptyList.of(_))) + .map(FCSort(_)) + } + } +} diff --git a/agni/core/app/foxcomm/agni/interpreter/QueryInterpreter.scala b/agni/core/app/foxcomm/agni/interpreter/QueryInterpreter.scala index 9af3d8445a..9f937f243d 100644 --- a/agni/core/app/foxcomm/agni/interpreter/QueryInterpreter.scala +++ b/agni/core/app/foxcomm/agni/interpreter/QueryInterpreter.scala @@ -4,13 +4,8 @@ import scala.language.higherKinds import cats.data.NonEmptyList import foxcomm.agni.dsl.query._ -sealed trait QueryError -object QueryError {} - trait QueryInterpreter[F[_], V] extends Interpreter[F, NonEmptyList[QueryFunction], V] { - type Result = V - - final def eval(qf: QueryFunction): F[Result] = qf match { + final def eval(qf: QueryFunction): F[V] = qf match { case qf: QueryFunction.matches ⇒ matchesF(qf) case qf: QueryFunction.equals ⇒ equalsF(qf) case qf: QueryFunction.exists ⇒ existsF(qf) @@ -19,17 +14,17 @@ trait QueryInterpreter[F[_], V] extends Interpreter[F, NonEmptyList[QueryFunctio case qf: QueryFunction.bool ⇒ boolF(qf) } - def matchesF(qf: QueryFunction.matches): F[Result] + def matchesF(qf: QueryFunction.matches): F[V] - def equalsF(qf: QueryFunction.equals): F[Result] + def equalsF(qf: QueryFunction.equals): F[V] - def existsF(qf: QueryFunction.exists): F[Result] + def existsF(qf: QueryFunction.exists): F[V] - def rangeF(qf: QueryFunction.range): F[Result] + def rangeF(qf: QueryFunction.range): F[V] - def rawF(qf: QueryFunction.raw): F[Result] + def rawF(qf: QueryFunction.raw): F[V] - def boolF(qf: QueryFunction.bool): F[Result] + def boolF(qf: QueryFunction.bool): F[V] } object QueryInterpreter { diff --git a/agni/core/app/foxcomm/agni/interpreter/SortInterpreter.scala b/agni/core/app/foxcomm/agni/interpreter/SortInterpreter.scala new file mode 100644 index 0000000000..40eb81a0bb --- /dev/null +++ b/agni/core/app/foxcomm/agni/interpreter/SortInterpreter.scala @@ -0,0 +1,13 @@ +package foxcomm.agni.interpreter + +import scala.language.higherKinds +import cats.data.NonEmptyList +import foxcomm.agni.dsl.sort.SortFunction + +trait SortInterpreter[F[_], V] extends Interpreter[F, NonEmptyList[SortFunction], V] { + final def eval(qs: SortFunction): F[V] = qs match { + case qs: SortFunction.raw ⇒ rawF(qs) + } + + def rawF(qf: SortFunction.raw): F[V] +} diff --git a/agni/core/app/foxcomm/agni/interpreter/es/ESSortInterpreter.scala b/agni/core/app/foxcomm/agni/interpreter/es/ESSortInterpreter.scala new file mode 100644 index 0000000000..7cd881a92f --- /dev/null +++ b/agni/core/app/foxcomm/agni/interpreter/es/ESSortInterpreter.scala @@ -0,0 +1,45 @@ +package foxcomm.agni.interpreter.es + +import cats.data._ +import foxcomm.agni._ +import foxcomm.agni.dsl.sort.SortFunction +import foxcomm.agni.dsl.sort.SortFunction.RawSortValue +import foxcomm.agni.interpreter.SortInterpreter +import org.elasticsearch.common.xcontent.{ToXContent, XContentBuilder} +import org.elasticsearch.search.sort.{SortBuilder, SortOrder} +import scala.annotation.compileTimeOnly +import scala.collection.mutable.ListBuffer + +@SuppressWarnings( + Array("org.wartremover.warts.MutableDataStructures", "org.wartremover.warts.NonUnitStatements")) +private[es] object ESSortInterpreter extends SortInterpreter[() ⇒ ?, NonEmptyList[SortBuilder]] { + type State = () ⇒ NonEmptyList[SortBuilder] + + private final case class RawSortBuilder(content: RawSortValue) extends SortBuilder { + @compileTimeOnly("forbidden method to call") + def missing(missing: scala.Any): SortBuilder = ??? + + @compileTimeOnly("forbidden method to call") + def order(order: SortOrder): SortBuilder = ??? + + def toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder = { + content.eliminate(_.toMap.foreach { + case (n, v) ⇒ + builder.rawField(n, v.toSmile) + }, _.eliminate(name ⇒ { + builder.startObject(name) + builder.endObject() + }, _.impossible)) + builder + } + } + + def apply(sfs: NonEmptyList[SortFunction]): State = () ⇒ { + NonEmptyList.fromListUnsafe( + sfs.foldLeft(ListBuffer.empty[SortBuilder])((acc, sf) ⇒ acc ++= eval(sf)().toList).toList) + } + + def rawF(qf: SortFunction.raw): State = () ⇒ { + NonEmptyList.of(RawSortBuilder(qf.value)) + } +} diff --git a/agni/core/app/foxcomm/agni/interpreter/es/package.scala b/agni/core/app/foxcomm/agni/interpreter/es/package.scala index 21ce4b006a..44999fb069 100644 --- a/agni/core/app/foxcomm/agni/interpreter/es/package.scala +++ b/agni/core/app/foxcomm/agni/interpreter/es/package.scala @@ -2,15 +2,24 @@ package foxcomm.agni.interpreter import cats.data._ import foxcomm.agni.dsl.query.{FCQuery, QueryFunction} +import foxcomm.agni.dsl.sort.{FCSort, SortFunction} import monix.eval.Coeval import org.elasticsearch.index.query.{BoolQueryBuilder, QueryBuilders} +import org.elasticsearch.search.sort.SortBuilder package object es { type ESQueryInterpreter = Kleisli[Coeval, FCQuery, BoolQueryBuilder] + type ESSortInterpreter = Kleisli[Coeval, FCSort, List[SortBuilder]] val queryInterpreter: ESQueryInterpreter = { val eval: Interpreter[Coeval, NonEmptyList[QueryFunction], BoolQueryBuilder] = ESQueryInterpreter andThen (f ⇒ Coeval.eval(f(QueryBuilders.boolQuery()))) Kleisli(_.query.fold(Coeval.eval(QueryBuilders.boolQuery().must(QueryBuilders.matchAllQuery())))(eval)) } + + val sortInterpreter: ESSortInterpreter = { + val eval: Interpreter[Coeval, NonEmptyList[SortFunction], List[SortBuilder]] = + ESSortInterpreter andThen (f ⇒ Coeval.eval(f().toList)) + Kleisli(_.sort.fold(Coeval.now(List.empty[SortBuilder]))(eval)) + } } diff --git a/agni/core/app/foxcomm/agni/package.scala b/agni/core/app/foxcomm/agni/package.scala index 61c553e078..b3652f6609 100644 --- a/agni/core/app/foxcomm/agni/package.scala +++ b/agni/core/app/foxcomm/agni/package.scala @@ -6,25 +6,27 @@ import io.circe.generic.extras.Configuration import io.circe.{Json, Printer} import java.io.ByteArrayOutputStream import monix.eval.Task +import monix.execution.Cancelable import org.elasticsearch.action.ActionListener -import scala.concurrent.Promise package object agni { private[this] val smileFactory = new SmileFactory() private[this] val jsonFactory = new JsonFactory() + val Discriminator: String = "type" + implicit val configuration: Configuration = - Configuration.default.withDefaults.withDiscriminator("type").withSnakeCaseKeys + Configuration.default.withDefaults.withDiscriminator(Discriminator).withSnakeCaseKeys @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) - def async[A, B](action: ActionListener[A] ⇒ Any): Task[A] = Task.deferFuture { - val p = Promise[A]() + def async[A, B](action: ActionListener[A] ⇒ Any): Task[A] = Task.create { (_, cb) ⇒ action(new ActionListener[A] { - def onFailure(e: Throwable): Unit = p.tryFailure(e) + def onFailure(e: Throwable): Unit = cb.onError(e) - def onResponse(response: A): Unit = p.trySuccess(response) + def onResponse(response: A): Unit = cb.onSuccess(response) }) - p.future + + Cancelable.empty } @SuppressWarnings(Array("org.wartremover.warts.While")) diff --git a/agni/core/app/foxcomm/agni/payload.scala b/agni/core/app/foxcomm/agni/payload.scala index 38d0389df6..8b064047aa 100644 --- a/agni/core/app/foxcomm/agni/payload.scala +++ b/agni/core/app/foxcomm/agni/payload.scala @@ -2,6 +2,7 @@ package foxcomm.agni import cats.data.NonEmptyList import foxcomm.agni.dsl.query._ +import foxcomm.agni.dsl.sort.FCSort import io.circe.JsonObject sealed trait SearchPayload { @@ -9,5 +10,6 @@ sealed trait SearchPayload { } object SearchPayload { final case class es(query: JsonObject, fields: Option[NonEmptyList[String]]) extends SearchPayload - final case class fc(query: FCQuery, fields: Option[NonEmptyList[String]]) extends SearchPayload + final case class fc(query: FCQuery, sort: FCSort, fields: Option[NonEmptyList[String]]) + extends SearchPayload } From 9f0b465bbd934ab76692784f1edbf28fa9630826 Mon Sep 17 00:00:00 2001 From: Krzysztof Janosz Date: Thu, 6 Jul 2017 09:59:05 +0200 Subject: [PATCH 2/4] Make query dsl error messages friendlier Improve client error messages for the query dsl. Split query dsl into separate modules. --- agni/core/app/foxcomm/agni/dsl/package.scala | 5 +- agni/core/app/foxcomm/agni/dsl/query.scala | 290 ------------------ .../foxcomm/agni/dsl/query/QueryData.scala | 128 ++++++++ .../agni/dsl/query/QueryFunctions.scala | 195 ++++++++++++ .../app/foxcomm/agni/dsl/query/package.scala | 17 + agni/core/app/foxcomm/agni/dsl/sort.scala | 8 +- .../test/foxcomm/agni/dsl/QueryDslSpec.scala | 7 +- 7 files changed, 348 insertions(+), 302 deletions(-) delete mode 100644 agni/core/app/foxcomm/agni/dsl/query.scala create mode 100644 agni/core/app/foxcomm/agni/dsl/query/QueryData.scala create mode 100644 agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala create mode 100644 agni/core/app/foxcomm/agni/dsl/query/package.scala diff --git a/agni/core/app/foxcomm/agni/dsl/package.scala b/agni/core/app/foxcomm/agni/dsl/package.scala index 228c626eeb..ee7f916b54 100644 --- a/agni/core/app/foxcomm/agni/dsl/package.scala +++ b/agni/core/app/foxcomm/agni/dsl/package.scala @@ -9,9 +9,10 @@ package object dsl { implicit def configuration: Configuration = foxcomm.agni.configuration - // decodes coproduct in a naive way assuming that json representations of each component are disjoint + /** Decodes coproduct assuming that json representations of each coproduct element are disjoint. */ implicit def decodeCoproduct[H: Decoder, T <: Coproduct: Decoder]: Decoder[H :+: T] = Decoder[H].map(Inl(_)) or Decoder[T].map(Inr(_)) - implicit val decodeCNil: Decoder[CNil] = Decoder.failedWithMessage("Cannot decode value") + implicit def decodeCoproductLeaf[L: Decoder]: Decoder[L :+: CNil] = + Decoder[L].map(Inl(_)) } diff --git a/agni/core/app/foxcomm/agni/dsl/query.scala b/agni/core/app/foxcomm/agni/dsl/query.scala deleted file mode 100644 index bc80925722..0000000000 --- a/agni/core/app/foxcomm/agni/dsl/query.scala +++ /dev/null @@ -1,290 +0,0 @@ -package foxcomm.agni.dsl - -import scala.language.higherKinds -import cats.data.{NonEmptyList, NonEmptyVector} -import cats.implicits._ -import io.circe._ -import io.circe.generic.extras.semiauto._ -import scala.util.Try -import shapeless._ - -object query { - type QueryValueF[F[_], T] = T :+: F[T] :+: CNil - type QueryValue[T] = QueryValueF[NonEmptyList, T] - type CompoundValue = QueryValue[JsonNumber] :+: QueryValue[String] :+: CNil - type Field = QueryValueF[NonEmptyVector, String] - type RangeValue = RangeBound[JsonNumber] :+: RangeBound[String] :+: CNil - - implicit class RichQueryValue[T](val qv: QueryValue[T]) extends AnyVal { - def toNEL: NonEmptyList[T] = qv.eliminate(NonEmptyList.of(_), _.eliminate(identity, _.impossible)) - - def toList: List[T] = toNEL.toList - } - - implicit class RichCompoundValue(val cv: CompoundValue) extends AnyVal { - def toNEL: NonEmptyList[AnyRef] = cv.eliminate(_.toNEL, _.eliminate(_.toNEL, _.impossible)) - - def toList: List[AnyRef] = toNEL.toList - } - - implicit def decodeQueryValueF[F[_], T](implicit fD: Decoder[F[T]], - tD: Decoder[T]): Decoder[QueryValueF[F, T]] = - tD.map(Coproduct[QueryValueF[F, T]](_)) or fD.map(Coproduct[QueryValueF[F, T]](_)) - - implicit val decodeCompoundValue: Decoder[CompoundValue] = - Decoder[QueryValue[JsonNumber]].map(Coproduct[CompoundValue](_)) or - Decoder[QueryValue[String]].map(Coproduct[CompoundValue](_)) - - implicit val decodeField: Decoder[Field] = Decoder.decodeString.map { s ⇒ - val xs = s.split("\\.") - if (xs.length > 1) Coproduct[Field](NonEmptyVector.of(xs.head, xs.tail: _*)) - else Coproduct[Field](s) - } - - implicit val decodeRange: Decoder[RangeValue] = - Decoder[RangeBound[JsonNumber]].map(Coproduct[RangeValue](_)) or - Decoder[RangeBound[String]].map(Coproduct[RangeValue](_)) - - object Boostable { - private[this] val boostableRegex = "^(\\w+)\\^([0-9]*\\.?[0-9]+)$".r - - def unapply(s: String): Option[(String, Float)] = s match { - case boostableRegex(f, b) ⇒ Try(f → b.toFloat).toOption - case _ ⇒ None - } - - def default: Float = 1.0f - } - - sealed trait QueryField { - def toNEL: NonEmptyList[Field] - - def toList: List[Field] = toNEL.toList - } - object QueryField { - final case class Value(field: String, boost: Option[Float]) - - final case class Single(field: Field) extends QueryField { - def toNEL: NonEmptyList[Field] = NonEmptyList.of(field) - } - object Single { - implicit val decodeSingle: Decoder[Single] = Decoder[Field].map(Single(_)) - } - - final case class Multiple(fields: NonEmptyList[Field]) extends QueryField { - def toNEL: NonEmptyList[Field] = fields - } - object Multiple { - implicit val decodeMultiple: Decoder[Multiple] = - Decoder.decodeNonEmptyList[Field].map(Multiple(_)) - } - - implicit val decodeQueryField: Decoder[QueryField] = - Decoder[Single].map(identity) or Decoder[Multiple].map(identity) - } - - sealed trait QueryContext - object QueryContext { - case object filter extends QueryContext - case object must extends QueryContext - case object should extends QueryContext - case object not extends QueryContext - - implicit val decodeQueryContext: Decoder[QueryContext] = deriveEnumerationDecoder[QueryContext] - } - - sealed trait RangeFunction - object RangeFunction { - sealed trait LowerBound extends RangeFunction { - def withBound: Boolean - } - case object Gt extends RangeFunction with LowerBound { - def withBound: Boolean = false - } - case object Gte extends RangeFunction with LowerBound { - def withBound: Boolean = true - } - - sealed trait UpperBound extends RangeFunction { - def withBound: Boolean - } - case object Lt extends RangeFunction with UpperBound { - def withBound: Boolean = false - } - case object Lte extends RangeFunction with UpperBound { - def withBound: Boolean = true - } - - implicit val decodeRangeFunction: KeyDecoder[RangeFunction] = KeyDecoder.instance { - case "lt" | "<" ⇒ Some(Lt) - case "lte" | "<=" ⇒ Some(Lte) - case "gt" | ">" ⇒ Some(Gt) - case "gte" | ">=" ⇒ Some(Gte) - } - } - - final case class RangeBound[T](lower: Option[(RangeFunction.LowerBound, T)], - upper: Option[(RangeFunction.UpperBound, T)]) - object RangeBound { - import RangeFunction._ - - implicit def decodeRangeBound[T: Decoder]: Decoder[RangeBound[T]] = - Decoder.decodeMapLike[Map, RangeFunction, T].emap { map ⇒ - val lbs = map.view.collect { - case (lb: LowerBound, v) ⇒ lb → v - }.toList - val ubs = map.view.collect { - case (ub: UpperBound, v) ⇒ ub → v - }.toList - - if (lbs.size > 1) Either.left("Only single lower bound can be specified") - else if (ubs.size > 1) Either.left("Only single upper bound can be specified") - else Either.right(RangeBound(lbs.headOption, ubs.headOption)) - } - } - - sealed trait QueryFunction - object QueryFunction { - private def buildQueryFunctionDecoder[A <: QueryFunction](expectedTpe: String, decoder: Decoder[A])( - onBoost: (Float, HCursor, Decoder[A]) ⇒ Decoder.Result[A]) = - Decoder.instance { c ⇒ - val tpe = c.downField(Discriminator).focus.flatMap(_.asString) - tpe match { - case Some(Boostable(`expectedTpe`, b)) ⇒ onBoost(b, c, decoder) - case Some(`expectedTpe`) ⇒ decoder(c) - case _ ⇒ Either.left(DecodingFailure("Unknown query function type", c.history)) - } - } - - private def buildBoostableDecoder[A <: QueryFunction](expectedTpe: String)(decoder: Decoder[A]) = - buildQueryFunctionDecoder[A](expectedTpe, decoder)((boost, cursor, decoder) ⇒ - decoder.tryDecode(cursor.withFocus(_.mapObject(_.add("boost", Json.fromFloatOrNull(boost)))))) - - private def buildDecoder[A <: QueryFunction](expectedTpe: String)(decoder: Decoder[A]): Decoder[A] = - buildQueryFunctionDecoder[A](expectedTpe, decoder)((_, cursor, _) ⇒ - Either.left(DecodingFailure(s"$expectedTpe query function is not boostable", cursor.history))) - - sealed trait WithField { this: QueryFunction ⇒ - def field: QueryField - def boost: Option[Float] - } - sealed trait WithContext { this: QueryFunction ⇒ - def ctx: QueryContext - } - - sealed trait TermLevel extends WithContext { this: QueryFunction ⇒ - def context: Option[QueryContext] - - final def ctx: QueryContext = context.getOrElse(QueryContext.filter) - } - sealed trait FullText extends WithContext with WithField { this: QueryFunction ⇒ - def context: Option[QueryContext] - def in: Option[QueryField] - - final def ctx: QueryContext = context.getOrElse(QueryContext.must) - final def field: QueryField = in.getOrElse(QueryField.Single(Coproduct("_all"))) - } - - final case class matches private (in: Option[QueryField], - value: QueryValue[String], - context: Option[QueryContext], - boost: Option[Float]) - extends QueryFunction - with FullText - object matches { - implicit val decodeMatches: Decoder[matches] = buildBoostableDecoder("matches")(deriveDecoder[matches]) - } - - final case class equals private (in: QueryField, - value: CompoundValue, - context: Option[QueryContext], - boost: Option[Float]) - extends QueryFunction - with TermLevel - with WithField { - def field: QueryField = in - } - object equals { - implicit val decodeEquals: Decoder[equals] = buildBoostableDecoder("equals")(deriveDecoder[equals]) - } - - final case class exists private (value: QueryField, context: Option[QueryContext]) - extends QueryFunction - with TermLevel - object exists { - implicit val decodeExists: Decoder[exists] = buildDecoder("exists")(deriveDecoder[exists]) - } - - final case class range private (in: QueryField.Single, - value: RangeValue, - context: Option[QueryContext], - boost: Option[Float]) - extends QueryFunction - with TermLevel - with WithField { - def field: QueryField.Single = in - } - object range { - implicit val decodeRange: Decoder[range] = buildBoostableDecoder("range")(deriveDecoder[range]) - } - - final case class raw private (value: JsonObject, context: QueryContext) - extends QueryFunction - with WithContext { - def ctx: QueryContext = context - } - object raw { - implicit val decodeRaw: Decoder[raw] = buildDecoder("raw")(deriveDecoder[raw]) - } - - final case class bool private (value: QueryValue[QueryFunction], context: QueryContext) - extends QueryFunction - with WithContext { - def ctx: QueryContext = context - } - object bool { - // TODO: make it configurable (?) - val MaxDepth = 25 - - implicit val decodeBool: Decoder[bool] = buildDecoder[bool]("bool") { - val decoder = deriveDecoder[bool] - val depthField = "_depth" - - Decoder.instance { c ⇒ - val depth = (for { - parent ← c.up.focus - parent ← parent.asObject - depth ← parent(depthField) - depth ← depth.as[Int].toOption - } yield depth).getOrElse(1) - - // we start counting from 0, - // which denotes implicit top-level bool query - if (depth >= MaxDepth) - Either.left(DecodingFailure(s"Max depth of $MaxDepth exceeded for a bool query", c.history)) - else - decoder.tryDecode(c.withFocus(_.mapObject(_.add(depthField, Json.fromInt(depth + 1))))) - } - } - } - - implicit val decodeQueryFunction: Decoder[QueryFunction] = - Decoder[matches].map(identity[QueryFunction](_)) or - Decoder[equals].map(identity[QueryFunction](_)) or - Decoder[exists].map(identity[QueryFunction](_)) or - Decoder[range].map(identity[QueryFunction](_)) or - Decoder[raw].map(identity[QueryFunction](_)) or - Decoder[bool].map(identity[QueryFunction](_)) - } - - final case class FCQuery(query: Option[NonEmptyList[QueryFunction]]) - object FCQuery { - implicit val decodeFCQuery: Decoder[FCQuery] = { - Decoder - .decodeOption( - Decoder.decodeNonEmptyList[QueryFunction] or - Decoder[QueryFunction].map(NonEmptyList.of(_))) - .map(FCQuery(_)) - } - } -} diff --git a/agni/core/app/foxcomm/agni/dsl/query/QueryData.scala b/agni/core/app/foxcomm/agni/dsl/query/QueryData.scala new file mode 100644 index 0000000000..a9d94ca7ac --- /dev/null +++ b/agni/core/app/foxcomm/agni/dsl/query/QueryData.scala @@ -0,0 +1,128 @@ +package foxcomm.agni.dsl.query + +import scala.language.higherKinds +import cats.data.{NonEmptyList, NonEmptyVector} +import cats.implicits._ +import foxcomm.agni.dsl._ +import io.circe._ +import io.circe.generic.extras.semiauto._ +import shapeless._ + +private[query] trait QueryData { + type QueryValueF[F[_], T] = T :+: F[T] :+: CNil + implicit def decodeQueryValueF[F[_], T](implicit tD: Decoder[T], + fD: Decoder[F[T]]): Decoder[QueryValueF[F, T]] = + decodeCoproduct[T, F[T] :+: CNil] + + type QueryValue[T] = QueryValueF[NonEmptyList, T] + + type CompoundValue = QueryValue[JsonNumber] :+: QueryValue[String] :+: CNil + implicit val decodeCompoundValue: Decoder[CompoundValue] = + decodeCoproduct[QueryValue[JsonNumber], QueryValue[String] :+: CNil] + .withErrorMessage("Value must be single or non-empty array of uniformly typed elements") + + type Field = QueryValueF[NonEmptyVector, String] + implicit val decodeField: Decoder[Field] = Decoder.decodeString.map { s ⇒ + val xs = s.split("\\.") + if (xs.length > 1) Coproduct[Field](NonEmptyVector.of(xs.head, xs.tail: _*)) + else Coproduct[Field](s) + } + + type RangeValue = RangeBound[JsonNumber] :+: RangeBound[String] :+: CNil + implicit val decodeRangeValue: Decoder[RangeValue] = + decodeCoproduct[RangeBound[JsonNumber], RangeBound[String] :+: CNil] + .withErrorMessage( + "Error in range function bounds. " + + "Please make sure that you supplied valid operators and that you have uniformly typed values.") + + sealed trait RangeFunction + object RangeFunction { + sealed trait LowerBound extends RangeFunction { + def withBound: Boolean + } + case object Gt extends RangeFunction with LowerBound { + def withBound: Boolean = false + } + case object Gte extends RangeFunction with LowerBound { + def withBound: Boolean = true + } + + sealed trait UpperBound extends RangeFunction { + def withBound: Boolean + } + case object Lt extends RangeFunction with UpperBound { + def withBound: Boolean = false + } + case object Lte extends RangeFunction with UpperBound { + def withBound: Boolean = true + } + + implicit val decodeRangeFunction: KeyDecoder[RangeFunction] = KeyDecoder.instance { + case "lt" | "<" ⇒ Some(Lt) + case "lte" | "<=" ⇒ Some(Lte) + case "gt" | ">" ⇒ Some(Gt) + case "gte" | ">=" ⇒ Some(Gte) + case _ ⇒ None + } + } + + sealed case class RangeBound[T](lower: Option[(RangeFunction.LowerBound, T)], + upper: Option[(RangeFunction.UpperBound, T)]) + object RangeBound { + implicit def decodeRangeBound[T: Decoder]: Decoder[RangeBound[T]] = + Decoder.decodeMapLike[Map, RangeFunction, T].emap { map ⇒ + val lbs = map.view.collect { + case (lb: RangeFunction.LowerBound, v) ⇒ lb → v + }.toList + val ubs = map.view.collect { + case (ub: RangeFunction.UpperBound, v) ⇒ ub → v + }.toList + + if (lbs.size > 1) Either.left("Only single lower bound can be specified") + else if (ubs.size > 1) Either.left("Only single upper bound can be specified") + else Either.right(RangeBound(lbs.headOption, ubs.headOption)) + } + } + + sealed trait QueryField { + def toNEL: NonEmptyList[Field] + + def toList: List[Field] = toNEL.toList + } + object QueryField { + sealed case class Single(field: Field) extends QueryField { + def toNEL: NonEmptyList[Field] = NonEmptyList.of(field) + } + object Single { + implicit val decodeSingle: Decoder[Single] = Decoder[Field] + .map(Single(_)) + .withErrorMessage("Query field must be a single string") + } + + sealed case class Multiple(fields: NonEmptyList[Field]) extends QueryField { + def toNEL: NonEmptyList[Field] = fields + } + object Multiple { + implicit val decodeMultiple: Decoder[Multiple] = + Decoder + .decodeNonEmptyList[Field] + .map(Multiple(_)) + .withErrorMessage("Query field must be a non-empty array of strings") + } + + implicit val decodeQueryField: Decoder[QueryField] = + (Decoder[Single].map(identity[QueryField]) or Decoder[Multiple].map(identity[QueryField])) + .withErrorMessage("Query field must be either a single string or a non-empty array of strings") + } + + sealed trait QueryContext + object QueryContext { + case object filter extends QueryContext + case object must extends QueryContext + case object should extends QueryContext + case object not extends QueryContext + + implicit val decodeQueryContext: Decoder[QueryContext] = deriveEnumerationDecoder[QueryContext] + .withErrorMessage("Unknown query function context") + } +} diff --git a/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala b/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala new file mode 100644 index 0000000000..5484805ada --- /dev/null +++ b/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala @@ -0,0 +1,195 @@ +package foxcomm.agni.dsl.query + +import cats.data.NonEmptyList +import cats.implicits._ +import foxcomm.agni.dsl._ +import io.circe._ +import io.circe.generic.extras.semiauto._ +import scala.util.Try +import shapeless._ + +@SuppressWarnings(Array("org.wartremover.warts.Equals", "org.wartremover.warts.Throw")) +private[query] trait QueryFunctions { this: QueryData ⇒ + object Boostable { + private[this] val boostableRegex = "^(\\w+)\\^([0-9]*\\.?[0-9]+)$".r + + def unapply(s: String): Option[(String, Float)] = s match { + case boostableRegex(f, b) ⇒ Try(f → b.toFloat).toOption + case _ ⇒ None + } + } + + sealed trait QueryFunction + object QueryFunction { + val BoostField: String = "boost" + val DepthField = "_depth" + + // TODO: make it configurable (?) + val MaxDepth = 25 + + sealed trait WithField { this: QueryFunction ⇒ + def field: QueryField + def boost: Option[Float] + } + sealed trait WithContext { this: QueryFunction ⇒ + def ctx: QueryContext + } + + sealed trait TermLevel extends WithContext { this: QueryFunction ⇒ + def context: Option[QueryContext] + + final def ctx: QueryContext = context.getOrElse(QueryContext.filter) + } + sealed trait FullText extends WithContext with WithField { this: QueryFunction ⇒ + def context: Option[QueryContext] + def in: Option[QueryField] + + final def ctx: QueryContext = context.getOrElse(QueryContext.must) + final def field: QueryField = in.getOrElse(QueryField.Single(Coproduct("_all"))) + } + + sealed case class matches private (in: Option[QueryField], + value: QueryValue[String], + context: Option[QueryContext], + boost: Option[Float]) + extends QueryFunction + with FullText + object matches { + implicit val decodeMatches: Decoder[matches] = deriveDecoder[matches] + } + + sealed case class equals private (in: QueryField, + value: CompoundValue, + context: Option[QueryContext], + boost: Option[Float]) + extends QueryFunction + with TermLevel + with WithField { + def field: QueryField = in + } + object equals { + implicit val decodeEquals: Decoder[equals] = deriveDecoder[equals] + } + + sealed case class exists private (value: QueryField, context: Option[QueryContext]) + extends QueryFunction + with TermLevel + object exists { + implicit val decodeExists: Decoder[exists] = deriveDecoder[exists] + } + + sealed case class range private (in: QueryField.Single, + value: RangeValue, + context: Option[QueryContext], + boost: Option[Float]) + extends QueryFunction + with TermLevel + with WithField { + def field: QueryField.Single = in + } + object range { + implicit val decodeRange: Decoder[range] = deriveDecoder[range] + } + + sealed case class raw private (value: JsonObject, context: QueryContext) + extends QueryFunction + with WithContext { + def ctx: QueryContext = context + } + object raw { + implicit val decodeRaw: Decoder[raw] = deriveDecoder[raw] + } + + sealed case class bool private (value: QueryValue[QueryFunction], context: QueryContext) + extends QueryFunction + with WithContext { + def ctx: QueryContext = context + } + object bool { + implicit lazy val decodeNestedQueryFunctions: Decoder[QueryValue[QueryFunction]] = { + val decodeQfs: Decoder[QueryValue[QueryFunction]] = + Decoder.decodeNonEmptyList[QueryFunction].map(qfs ⇒ Inr(Inl(qfs))) + val decodeQf: Decoder[QueryValue[QueryFunction]] = + Decoder[QueryFunction].map(qf ⇒ Inl(qf)) + + Decoder.instance[QueryValue[QueryFunction]] { c ⇒ + // explicit codec definition as we don't want circe to try other codec + // for the sake of better client error messages + if (c.value.isArray) decodeQfs(c) + else decodeQf(c) + } + } + + implicit val decodeBool: Decoder[bool] = { + val decoder = deriveDecoder[bool] + + Decoder.instance { c ⇒ + val depth = (for { + parent ← c.up.focus + parent ← parent.asObject + depth ← parent(DepthField) + depth ← depth.as[Int].toOption + } yield depth).getOrElse(1) + + // we start counting from 0, + // which denotes implicit top-level bool query + if (depth > MaxDepth) + Either.left(DecodingFailure(s"Max depth of $MaxDepth exceeded for a bool query", c.history)) + else + decoder.tryDecode(c.withFocus(_.mapObject(_.add(DepthField, Json.fromInt(depth + 1))))) + } + } + } + + implicit val decodeQueryFunction: Decoder[QueryFunction] = { + val boostable: Map[String, Decoder[_ <: QueryFunction with QueryFunction.WithField]] = Map( + "matches" → Decoder[matches], + "equals" → Decoder[equals], + "range" → Decoder[range] + ) + val all: Map[String, Decoder[_ <: QueryFunction]] = boostable ++ Map( + "exists" → Decoder[exists], + "raw" → Decoder[raw], + "bool" → Decoder[bool] + ) + + Decoder.instance { hc ⇒ + val c = hc.downField(Discriminator) + val tpe = c.focus.flatMap(_.asString) + tpe match { + case Some(Boostable(t, b)) ⇒ + if (boostable.contains(t)) + boostable(t).tryDecode(hc.withFocus(_.mapObject(_.add(BoostField, Json.fromFloatOrNull(b))))) + else + Either.left(DecodingFailure(s"Query function is not boostable", c.history)) + case Some(t) if all.contains(t) ⇒ + all(t).tryDecode(hc) + case _ ⇒ + Either.left(DecodingFailure("Unknown query function type", c.history)) + } + } + } + } + + sealed case class FCQuery(query: Option[NonEmptyList[QueryFunction]]) + object FCQuery { + implicit val decodeFCQuery: Decoder[FCQuery] = { + val qfsDecode = Decoder.decodeNonEmptyList[QueryFunction] + val qfDecode = Decoder[QueryFunction].map(NonEmptyList.of(_)) + + Decoder.instance { c ⇒ + if (c.value.isNull) Either.right(FCQuery(None)) + else if (c.value.isArray) qfsDecode.map(qfs ⇒ FCQuery(Some(qfs)))(c) + else if (c.value.isObject) qfDecode.map(qfs ⇒ FCQuery(Some(qfs)))(c) + else + Either.left( + DecodingFailure( + "Query DSL must be either empty or " + + "a non-empty array of query functions or " + + "a single query function object", + c.history + )) + } + } + } +} diff --git a/agni/core/app/foxcomm/agni/dsl/query/package.scala b/agni/core/app/foxcomm/agni/dsl/query/package.scala new file mode 100644 index 0000000000..294aed5bd9 --- /dev/null +++ b/agni/core/app/foxcomm/agni/dsl/query/package.scala @@ -0,0 +1,17 @@ +package foxcomm.agni.dsl + +import cats.data.NonEmptyList + +package object query extends QueryData with QueryFunctions { + implicit class RichQueryValue[T](val qv: QueryValue[T]) extends AnyVal { + def toNEL: NonEmptyList[T] = qv.eliminate(NonEmptyList.of(_), _.eliminate(identity, _.impossible)) + + def toList: List[T] = toNEL.toList + } + + implicit class RichCompoundValue(val cv: CompoundValue) extends AnyVal { + def toNEL: NonEmptyList[AnyRef] = cv.eliminate(_.toNEL, _.eliminate(_.toNEL, _.impossible)) + + def toList: List[AnyRef] = toNEL.toList + } +} diff --git a/agni/core/app/foxcomm/agni/dsl/sort.scala b/agni/core/app/foxcomm/agni/dsl/sort.scala index 96b6bc1977..d3c7f40c23 100644 --- a/agni/core/app/foxcomm/agni/dsl/sort.scala +++ b/agni/core/app/foxcomm/agni/dsl/sort.scala @@ -2,7 +2,7 @@ package foxcomm.agni.dsl import cats.data.NonEmptyList import io.circe._ -import io.circe.generic.extras.semiauto._ +import io.circe.generic.extras.auto._ import shapeless._ object sort { @@ -13,12 +13,6 @@ object sort { .withErrorMessage("Raw sort value must be either a string or an object") final case class raw private (value: RawSortValue) extends SortFunction - object raw { - implicit val decodeRaw: Decoder[raw] = deriveDecoder[raw] - } - - implicit val decodeSortFunction: Decoder[SortFunction] = - Decoder[raw].map(identity[SortFunction](_)) } final case class FCSort(sort: Option[NonEmptyList[SortFunction]]) diff --git a/agni/core/test/foxcomm/agni/dsl/QueryDslSpec.scala b/agni/core/test/foxcomm/agni/dsl/QueryDslSpec.scala index 342807a3fb..ea70c96eb5 100644 --- a/agni/core/test/foxcomm/agni/dsl/QueryDslSpec.scala +++ b/agni/core/test/foxcomm/agni/dsl/QueryDslSpec.scala @@ -2,8 +2,8 @@ package foxcomm.agni.dsl import cats.data.NonEmptyVector import foxcomm.agni.dsl.query._ -import io.circe.{Json, JsonObject} import io.circe.parser._ +import io.circe.{Json, JsonObject} import org.scalatest.EitherValues._ import org.scalatest.OptionValues._ import org.scalatest.{Assertion, FlatSpec, Matchers} @@ -28,6 +28,7 @@ class QueryDslSpec extends FlatSpec with Matchers { Source .fromInputStream(getClass.getResourceAsStream("/query/multiple.json")) .mkString).right.value + val queries = json.as[FCQuery].right.value.query.map(_.toList).getOrElse(Nil) assertQueryFunction[QueryFunction.equals](queries.head) { equals ⇒ @@ -100,7 +101,7 @@ class QueryDslSpec extends FlatSpec with Matchers { ))) else Json.fromJsonObject(embed) - deepBool(boolDepth = QueryFunction.bool.MaxDepth - 1, embed = leaf).as[FCQuery].isLeft should === (false) - deepBool(boolDepth = QueryFunction.bool.MaxDepth, embed = leaf).as[FCQuery].isLeft should === (true) + deepBool(boolDepth = QueryFunction.MaxDepth, embed = leaf).as[FCQuery].isLeft should === (false) + deepBool(boolDepth = QueryFunction.MaxDepth + 1, embed = leaf).as[FCQuery].isLeft should === (true) } } From 08d3cbd56d3a5e0c5ecb8d55d6e06f5a94b55cd9 Mon Sep 17 00:00:00 2001 From: Krzysztof Janosz Date: Thu, 6 Jul 2017 13:55:34 +0200 Subject: [PATCH 3/4] Add aggregations dsl skeleton Add support for raw aggregations. Add some more test cases. --- agni/api/app/foxcomm/agni/api/Api.scala | 2 +- .../core/app/foxcomm/agni/SearchService.scala | 23 ++++++-- .../app/foxcomm/agni/dsl/aggregations.scala | 57 ++++++++++++++++++ .../agni/dsl/query/QueryFunctions.scala | 24 ++------ agni/core/app/foxcomm/agni/dsl/sort.scala | 12 ++-- .../interpreter/AggregationInterpreter.scala | 13 +++++ .../agni/interpreter/QueryInterpreter.scala | 2 +- .../interpreter/es/ESAggInterpreter.scala | 58 +++++++++++++++++++ .../interpreter/es/ESSortInterpreter.scala | 17 +++--- .../foxcomm/agni/interpreter/es/package.scala | 15 ++++- agni/core/app/foxcomm/agni/payload.scala | 6 +- .../dsl/{QueryDslSpec.scala => DslSpec.scala} | 56 ++++++++++++++++-- agni/core/test/resources/aggs.json | 18 ++++++ agni/core/test/resources/queries.json | 56 ++++++++++++++++++ agni/core/test/resources/query/multiple.json | 53 ----------------- agni/core/test/resources/sorts.json | 13 +++++ 16 files changed, 323 insertions(+), 102 deletions(-) create mode 100644 agni/core/app/foxcomm/agni/dsl/aggregations.scala create mode 100644 agni/core/app/foxcomm/agni/interpreter/AggregationInterpreter.scala create mode 100644 agni/core/app/foxcomm/agni/interpreter/es/ESAggInterpreter.scala rename agni/core/test/foxcomm/agni/dsl/{QueryDslSpec.scala => DslSpec.scala} (66%) create mode 100644 agni/core/test/resources/aggs.json create mode 100644 agni/core/test/resources/queries.json delete mode 100644 agni/core/test/resources/query/multiple.json create mode 100644 agni/core/test/resources/sorts.json diff --git a/agni/api/app/foxcomm/agni/api/Api.scala b/agni/api/app/foxcomm/agni/api/Api.scala index 3588c2ff09..4baff402a6 100644 --- a/agni/api/app/foxcomm/agni/api/Api.scala +++ b/agni/api/app/foxcomm/agni/api/Api.scala @@ -44,7 +44,7 @@ object Api extends App { implicit val s: Scheduler = Scheduler.global val config = AppConfig.load() - val svc = SearchService.fromConfig(config, queryInterpreter, sortInterpreter) + val svc = SearchService.fromConfig(config, aggregationInterpreter, queryInterpreter, sortInterpreter) Await.result( Http.server diff --git a/agni/core/app/foxcomm/agni/SearchService.scala b/agni/core/app/foxcomm/agni/SearchService.scala index 991d69207b..d3c09058e0 100644 --- a/agni/core/app/foxcomm/agni/SearchService.scala +++ b/agni/core/app/foxcomm/agni/SearchService.scala @@ -15,7 +15,10 @@ import org.elasticsearch.index.query.QueryBuilder import org.elasticsearch.search.SearchHit @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) -class SearchService private (client: Client, qi: ESQueryInterpreter, si: ESSortInterpreter) { +class SearchService private (client: Client, + ai: ESAggregationInterpreter, + qi: ESQueryInterpreter, + si: ESSortInterpreter) { import SearchService.ExtractJsonObject def translate(searchPayload: SearchPayload.fc): Task[Json] = { @@ -56,11 +59,13 @@ class SearchService private (client: Client, qi: ESQueryInterpreter, si: ESSortI def evalQuery(builder: SearchRequestBuilder): Coeval[SearchRequestBuilder] = searchPayload match { case SearchPayload.es(query, _) ⇒ Coeval.eval(builder.setQuery(Json.fromJsonObject(query).toBytes)) - case SearchPayload.fc(query, sort, _) ⇒ + case SearchPayload.fc(aggs, query, sort, _) ⇒ for { + aggs ← ai(aggs) query ← qi(query) sorts ← si(sort) } yield { + aggs.foreach(builder.addAggregation) builder.setQuery(query) sorts.foreach(builder.addSort) builder @@ -97,10 +102,16 @@ object SearchService { .flatMap(_.asObject) } - def apply(client: Client, qi: ESQueryInterpreter, si: ESSortInterpreter): SearchService = - new SearchService(client, qi, si) + def apply(client: Client, + ai: ESAggregationInterpreter, + qi: ESQueryInterpreter, + si: ESSortInterpreter): SearchService = + new SearchService(client, ai, qi, si) - def fromConfig(config: AppConfig, qi: ESQueryInterpreter, si: ESSortInterpreter): SearchService = { + def fromConfig(config: AppConfig, + ai: ESAggregationInterpreter, + qi: ESQueryInterpreter, + si: ESSortInterpreter): SearchService = { val esConfig = config.elasticsearch val settings = Settings.settingsBuilder().put("cluster.name", esConfig.cluster).build() @@ -110,6 +121,6 @@ object SearchService { .build() .addTransportAddresses(esConfig.host.toList.map(new InetSocketTransportAddress(_)): _*) - apply(client, qi, si) + apply(client, ai, qi, si) } } diff --git a/agni/core/app/foxcomm/agni/dsl/aggregations.scala b/agni/core/app/foxcomm/agni/dsl/aggregations.scala new file mode 100644 index 0000000000..914c5e3780 --- /dev/null +++ b/agni/core/app/foxcomm/agni/dsl/aggregations.scala @@ -0,0 +1,57 @@ +package foxcomm.agni.dsl + +import cats.data.NonEmptyList +import cats.implicits._ +import io.circe._ +import io.circe.generic.extras.auto._ + +object aggregations { + sealed trait AggregationFunction { + def name: String + + def `type`: String + + final def tpe: String = `type` + } + object AggregationFunction { + sealed trait WithMetadata { this: AggregationFunction ⇒ + def meta: Option[JsonObject] + } + + final case class raw private (name: String, `type`: String, meta: Option[JsonObject], value: JsonObject) + extends AggregationFunction + with WithMetadata + + implicit val decodeAggFunction: Decoder[AggregationFunction] = { + val all: Map[String, Decoder[_ <: AggregationFunction]] = Map.empty.withDefaultValue(Decoder[raw]) + + Decoder.instance { hc ⇒ + val c = hc.downField(Discriminator) + val tpe = c.focus.flatMap(_.asString) + tpe match { + case Some(t) ⇒ all(t).tryDecode(hc) + case None ⇒ Either.left(DecodingFailure("Unknown aggregation function type", c.history)) + } + } + } + } + + final case class FCAggregation(aggs: Option[NonEmptyList[AggregationFunction]]) + object FCAggregation { + implicit val decodeFCAggregation: Decoder[FCAggregation] = { + Decoder + .decodeOption(Decoder.decodeNonEmptyList[AggregationFunction].emap { aggs ⇒ + aggs + .foldM[Either[String, ?], Set[String]](Set.empty[String])( + (defined, agg) ⇒ + if (defined.contains(agg.name)) + Either.left( + s"Cannot have multiple aggregations with the same name: ${agg.name} is defined twice") + else Either.right(defined)) + .right + .map(_ ⇒ aggs) + } or Decoder[AggregationFunction].map(NonEmptyList.of(_))) + .map(FCAggregation(_)) + } + } +} diff --git a/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala b/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala index 5484805ada..8cd9a73e65 100644 --- a/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala +++ b/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala @@ -173,23 +173,11 @@ private[query] trait QueryFunctions { this: QueryData ⇒ sealed case class FCQuery(query: Option[NonEmptyList[QueryFunction]]) object FCQuery { - implicit val decodeFCQuery: Decoder[FCQuery] = { - val qfsDecode = Decoder.decodeNonEmptyList[QueryFunction] - val qfDecode = Decoder[QueryFunction].map(NonEmptyList.of(_)) - - Decoder.instance { c ⇒ - if (c.value.isNull) Either.right(FCQuery(None)) - else if (c.value.isArray) qfsDecode.map(qfs ⇒ FCQuery(Some(qfs)))(c) - else if (c.value.isObject) qfDecode.map(qfs ⇒ FCQuery(Some(qfs)))(c) - else - Either.left( - DecodingFailure( - "Query DSL must be either empty or " + - "a non-empty array of query functions or " + - "a single query function object", - c.history - )) - } - } + implicit val decodeFCQuery: Decoder[FCQuery] = + Decoder + .decodeOption( + Decoder.decodeNonEmptyList[QueryFunction] or + Decoder[QueryFunction].map(NonEmptyList.of(_))) + .map(FCQuery(_)) } } diff --git a/agni/core/app/foxcomm/agni/dsl/sort.scala b/agni/core/app/foxcomm/agni/dsl/sort.scala index d3c7f40c23..0c9866eb51 100644 --- a/agni/core/app/foxcomm/agni/dsl/sort.scala +++ b/agni/core/app/foxcomm/agni/dsl/sort.scala @@ -6,18 +6,18 @@ import io.circe.generic.extras.auto._ import shapeless._ object sort { + type RawSortValue = JsonObject :+: String :+: CNil + implicit val decodeRawSortValue: Decoder[RawSortValue] = decodeCoproduct[JsonObject, String :+: CNil] + .withErrorMessage("Raw sort value must be either a string or an object") + sealed trait SortFunction object SortFunction { - type RawSortValue = JsonObject :+: String :+: CNil - implicit val decodeRawSortValue: Decoder[RawSortValue] = decodeCoproduct[JsonObject, String :+: CNil] - .withErrorMessage("Raw sort value must be either a string or an object") - final case class raw private (value: RawSortValue) extends SortFunction } - final case class FCSort(sort: Option[NonEmptyList[SortFunction]]) + final case class FCSort(sorts: Option[NonEmptyList[SortFunction]]) object FCSort { - implicit val decodeFCQuery: Decoder[FCSort] = { + implicit val decodeFCSort: Decoder[FCSort] = { Decoder .decodeOption( Decoder.decodeNonEmptyList[SortFunction] or diff --git a/agni/core/app/foxcomm/agni/interpreter/AggregationInterpreter.scala b/agni/core/app/foxcomm/agni/interpreter/AggregationInterpreter.scala new file mode 100644 index 0000000000..d078540d20 --- /dev/null +++ b/agni/core/app/foxcomm/agni/interpreter/AggregationInterpreter.scala @@ -0,0 +1,13 @@ +package foxcomm.agni.interpreter + +import scala.language.higherKinds +import cats.data.NonEmptyList +import foxcomm.agni.dsl.aggregations.AggregationFunction + +trait AggregationInterpreter[F[_], V] extends Interpreter[F, NonEmptyList[AggregationFunction], V] { + final def eval(qa: AggregationFunction): F[V] = qa match { + case qa: AggregationFunction.raw ⇒ rawF(qa) + } + + def rawF(qf: AggregationFunction.raw): F[V] +} diff --git a/agni/core/app/foxcomm/agni/interpreter/QueryInterpreter.scala b/agni/core/app/foxcomm/agni/interpreter/QueryInterpreter.scala index 9f937f243d..3ffc1bed7a 100644 --- a/agni/core/app/foxcomm/agni/interpreter/QueryInterpreter.scala +++ b/agni/core/app/foxcomm/agni/interpreter/QueryInterpreter.scala @@ -2,7 +2,7 @@ package foxcomm.agni.interpreter import scala.language.higherKinds import cats.data.NonEmptyList -import foxcomm.agni.dsl.query._ +import foxcomm.agni.dsl.query.QueryFunction trait QueryInterpreter[F[_], V] extends Interpreter[F, NonEmptyList[QueryFunction], V] { final def eval(qf: QueryFunction): F[V] = qf match { diff --git a/agni/core/app/foxcomm/agni/interpreter/es/ESAggInterpreter.scala b/agni/core/app/foxcomm/agni/interpreter/es/ESAggInterpreter.scala new file mode 100644 index 0000000000..68748e8a4a --- /dev/null +++ b/agni/core/app/foxcomm/agni/interpreter/es/ESAggInterpreter.scala @@ -0,0 +1,58 @@ +package foxcomm.agni.interpreter.es + +import cats.data._ +import foxcomm.agni._ +import foxcomm.agni.dsl.aggregations._ +import foxcomm.agni.interpreter.AggregationInterpreter +import io.circe.JsonObject +import org.elasticsearch.common.xcontent.{ToXContent, XContentBuilder} +import org.elasticsearch.search.aggregations.AbstractAggregationBuilder +import scala.collection.mutable.ListBuffer + +@SuppressWarnings( + Array("org.wartremover.warts.MutableDataStructures", "org.wartremover.warts.NonUnitStatements")) +private[es] object ESAggInterpreter + extends AggregationInterpreter[() ⇒ ?, NonEmptyList[AbstractAggregationBuilder]] { + type State = () ⇒ NonEmptyList[AbstractAggregationBuilder] + object State { + def single(agg: ⇒ AbstractAggregationBuilder): State = () ⇒ NonEmptyList.of(agg) + + def apply(aggs: ⇒ List[AbstractAggregationBuilder]): State = () ⇒ NonEmptyList.fromListUnsafe(aggs) + } + + private final case class RawAggBuilder(name: String, + tpe: String, + meta: Option[JsonObject], + content: JsonObject) + extends AbstractAggregationBuilder(name, tpe) { + def toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder = { + builder.startObject(name) + + meta.foreach { m ⇒ + builder.startObject("meta") + m.toMap.foreach { + case (n, v) ⇒ + builder.rawField(n, v.toSmile) + } + builder.endObject() + } + + builder.startObject(tpe) + content.toMap.foreach { + case (n, v) ⇒ + builder.rawField(n, v.toSmile) + } + builder.endObject() + + builder.endObject() + } + } + + def apply(afs: NonEmptyList[AggregationFunction]): State = State { + afs.foldLeft(ListBuffer.empty[AbstractAggregationBuilder])((acc, sf) ⇒ acc ++= eval(sf)().toList).toList + } + + def rawF(af: AggregationFunction.raw): State = State.single { + RawAggBuilder(name = af.name, tpe = af.tpe, meta = af.meta, content = af.value) + } +} diff --git a/agni/core/app/foxcomm/agni/interpreter/es/ESSortInterpreter.scala b/agni/core/app/foxcomm/agni/interpreter/es/ESSortInterpreter.scala index 7cd881a92f..392c1927ba 100644 --- a/agni/core/app/foxcomm/agni/interpreter/es/ESSortInterpreter.scala +++ b/agni/core/app/foxcomm/agni/interpreter/es/ESSortInterpreter.scala @@ -2,8 +2,7 @@ package foxcomm.agni.interpreter.es import cats.data._ import foxcomm.agni._ -import foxcomm.agni.dsl.sort.SortFunction -import foxcomm.agni.dsl.sort.SortFunction.RawSortValue +import foxcomm.agni.dsl.sort._ import foxcomm.agni.interpreter.SortInterpreter import org.elasticsearch.common.xcontent.{ToXContent, XContentBuilder} import org.elasticsearch.search.sort.{SortBuilder, SortOrder} @@ -14,6 +13,11 @@ import scala.collection.mutable.ListBuffer Array("org.wartremover.warts.MutableDataStructures", "org.wartremover.warts.NonUnitStatements")) private[es] object ESSortInterpreter extends SortInterpreter[() ⇒ ?, NonEmptyList[SortBuilder]] { type State = () ⇒ NonEmptyList[SortBuilder] + object State { + def single(sort: ⇒ SortBuilder): State = () ⇒ NonEmptyList.of(sort) + + def apply(sorts: ⇒ List[SortBuilder]): State = () ⇒ NonEmptyList.fromListUnsafe(sorts) + } private final case class RawSortBuilder(content: RawSortValue) extends SortBuilder { @compileTimeOnly("forbidden method to call") @@ -34,12 +38,11 @@ private[es] object ESSortInterpreter extends SortInterpreter[() ⇒ ?, NonEmptyL } } - def apply(sfs: NonEmptyList[SortFunction]): State = () ⇒ { - NonEmptyList.fromListUnsafe( - sfs.foldLeft(ListBuffer.empty[SortBuilder])((acc, sf) ⇒ acc ++= eval(sf)().toList).toList) + def apply(sfs: NonEmptyList[SortFunction]): State = State { + sfs.foldLeft(ListBuffer.empty[SortBuilder])((acc, sf) ⇒ acc ++= eval(sf)().toList).toList } - def rawF(qf: SortFunction.raw): State = () ⇒ { - NonEmptyList.of(RawSortBuilder(qf.value)) + def rawF(sf: SortFunction.raw): State = State.single { + RawSortBuilder(sf.value) } } diff --git a/agni/core/app/foxcomm/agni/interpreter/es/package.scala b/agni/core/app/foxcomm/agni/interpreter/es/package.scala index 44999fb069..9b0c101078 100644 --- a/agni/core/app/foxcomm/agni/interpreter/es/package.scala +++ b/agni/core/app/foxcomm/agni/interpreter/es/package.scala @@ -1,15 +1,24 @@ package foxcomm.agni.interpreter import cats.data._ +import foxcomm.agni.dsl.aggregations.{AggregationFunction, FCAggregation} import foxcomm.agni.dsl.query.{FCQuery, QueryFunction} import foxcomm.agni.dsl.sort.{FCSort, SortFunction} import monix.eval.Coeval import org.elasticsearch.index.query.{BoolQueryBuilder, QueryBuilders} +import org.elasticsearch.search.aggregations.AbstractAggregationBuilder import org.elasticsearch.search.sort.SortBuilder package object es { - type ESQueryInterpreter = Kleisli[Coeval, FCQuery, BoolQueryBuilder] - type ESSortInterpreter = Kleisli[Coeval, FCSort, List[SortBuilder]] + type ESAggregationInterpreter = Kleisli[Coeval, FCAggregation, List[AbstractAggregationBuilder]] + type ESQueryInterpreter = Kleisli[Coeval, FCQuery, BoolQueryBuilder] + type ESSortInterpreter = Kleisli[Coeval, FCSort, List[SortBuilder]] + + val aggregationInterpreter: ESAggregationInterpreter = { + val eval: Interpreter[Coeval, NonEmptyList[AggregationFunction], List[AbstractAggregationBuilder]] = + ESAggInterpreter andThen (f ⇒ Coeval.eval(f().toList)) + Kleisli(_.aggs.fold(Coeval.now(List.empty[AbstractAggregationBuilder]))(eval)) + } val queryInterpreter: ESQueryInterpreter = { val eval: Interpreter[Coeval, NonEmptyList[QueryFunction], BoolQueryBuilder] = @@ -20,6 +29,6 @@ package object es { val sortInterpreter: ESSortInterpreter = { val eval: Interpreter[Coeval, NonEmptyList[SortFunction], List[SortBuilder]] = ESSortInterpreter andThen (f ⇒ Coeval.eval(f().toList)) - Kleisli(_.sort.fold(Coeval.now(List.empty[SortBuilder]))(eval)) + Kleisli(_.sorts.fold(Coeval.now(List.empty[SortBuilder]))(eval)) } } diff --git a/agni/core/app/foxcomm/agni/payload.scala b/agni/core/app/foxcomm/agni/payload.scala index 8b064047aa..e66bfe9139 100644 --- a/agni/core/app/foxcomm/agni/payload.scala +++ b/agni/core/app/foxcomm/agni/payload.scala @@ -1,6 +1,7 @@ package foxcomm.agni import cats.data.NonEmptyList +import foxcomm.agni.dsl.aggregations.FCAggregation import foxcomm.agni.dsl.query._ import foxcomm.agni.dsl.sort.FCSort import io.circe.JsonObject @@ -10,6 +11,9 @@ sealed trait SearchPayload { } object SearchPayload { final case class es(query: JsonObject, fields: Option[NonEmptyList[String]]) extends SearchPayload - final case class fc(query: FCQuery, sort: FCSort, fields: Option[NonEmptyList[String]]) + final case class fc(aggregations: FCAggregation, + query: FCQuery, + sort: FCSort, + fields: Option[NonEmptyList[String]]) extends SearchPayload } diff --git a/agni/core/test/foxcomm/agni/dsl/QueryDslSpec.scala b/agni/core/test/foxcomm/agni/dsl/DslSpec.scala similarity index 66% rename from agni/core/test/foxcomm/agni/dsl/QueryDslSpec.scala rename to agni/core/test/foxcomm/agni/dsl/DslSpec.scala index ea70c96eb5..b3fafb2cb2 100644 --- a/agni/core/test/foxcomm/agni/dsl/QueryDslSpec.scala +++ b/agni/core/test/foxcomm/agni/dsl/DslSpec.scala @@ -1,9 +1,13 @@ package foxcomm.agni.dsl import cats.data.NonEmptyVector +import foxcomm.agni.SearchPayload +import foxcomm.agni.dsl.aggregations.AggregationFunction import foxcomm.agni.dsl.query._ +import foxcomm.agni.dsl.sort.{RawSortValue, SortFunction} import io.circe.parser._ import io.circe.{Json, JsonObject} +import io.circe.generic.extras.auto._ import org.scalatest.EitherValues._ import org.scalatest.OptionValues._ import org.scalatest.{Assertion, FlatSpec, Matchers} @@ -12,7 +16,7 @@ import scala.io.Source import shapeless._ import shapeless.syntax.typeable._ -class QueryDslSpec extends FlatSpec with Matchers { +class DslSpec extends FlatSpec with Matchers { implicit class RichRangeBound[A](val rb: RangeBound[A]) { implicit def toMap: Map[RangeFunction, A] = Map.empty ++ rb.lower ++ rb.upper } @@ -23,14 +27,16 @@ class QueryDslSpec extends FlatSpec with Matchers { .fold(fail(s"Cannot cast query function ${qf.getClass.getName} to ${Typeable[T].describe}"))(assertion) "DSL" should "parse multiple queries" in { - val json = + val payload = parse( Source - .fromInputStream(getClass.getResourceAsStream("/query/multiple.json")) - .mkString).right.value + .fromInputStream(getClass.getResourceAsStream("/queries.json")) + .mkString).right.value.as[SearchPayload.fc].right.value - val queries = - json.as[FCQuery].right.value.query.map(_.toList).getOrElse(Nil) + payload.aggregations.aggs.isEmpty should === (true) + payload.sort.sorts.isEmpty should === (true) + + val queries = payload.query.query.map(_.toList).getOrElse(Nil) assertQueryFunction[QueryFunction.equals](queries.head) { equals ⇒ equals.field.toList should === (List(Coproduct[Field]("slug"))) equals.ctx should === (QueryContext.must) @@ -82,6 +88,44 @@ class QueryDslSpec extends FlatSpec with Matchers { } } + it should "parse raw aggregation definitions" in { + val payload = + parse( + Source + .fromInputStream(getClass.getResourceAsStream("/aggs.json")) + .mkString).right.value.as[SearchPayload.fc].right.value + + payload.query.query.isEmpty should === (true) + payload.sort.sorts.isEmpty should === (true) + + val aggs = payload.aggregations.aggs.map(_.toList).getOrElse(Nil) + aggs.head should === ( + AggregationFunction + .raw("some_name", "some_type", meta = Some(JsonObject.empty), value = JsonObject.empty)) + aggs(1) should === ( + AggregationFunction.raw("some_other_name", + "some_type", + meta = None, + value = JsonObject.singleton("some_key", Json.fromString("some_value")))) + } + + it should "parse raw sort definitions" in { + val payload = + parse( + Source + .fromInputStream(getClass.getResourceAsStream("/sorts.json")) + .mkString).right.value.as[SearchPayload.fc].right.value + + payload.aggregations.aggs.isEmpty should === (true) + payload.query.query.isEmpty should === (true) + + val sorts = payload.sort.sorts.map(_.toList).getOrElse(Nil) + sorts.head should === (SortFunction.raw(Coproduct[RawSortValue]("price"))) + sorts(1) should === ( + SortFunction.raw( + Coproduct[RawSortValue](JsonObject.singleton("date", Json.obj("order" → Json.fromString("desc")))))) + } + it should "limit max depth for bool query" in { val leaf = JsonObject.fromMap( Map( diff --git a/agni/core/test/resources/aggs.json b/agni/core/test/resources/aggs.json new file mode 100644 index 0000000000..e60ea1a187 --- /dev/null +++ b/agni/core/test/resources/aggs.json @@ -0,0 +1,18 @@ +{ + "type": "fc", + "aggregations": [ + { + "type": "some_type", + "name": "some_name", + "meta": {}, + "value": {} + }, + { + "type": "some_type", + "name": "some_other_name", + "value": { + "some_key": "some_value" + } + } + ] +} diff --git a/agni/core/test/resources/queries.json b/agni/core/test/resources/queries.json new file mode 100644 index 0000000000..86fd4645e4 --- /dev/null +++ b/agni/core/test/resources/queries.json @@ -0,0 +1,56 @@ +{ + "type": "fc", + "query": [ + { + "type": "equals", + "context": "must", + "in": "slug", + "value": [ "awesome", "whatever" ] + }, + { + "type": "matches^.5", + "context": "should", + "in": [ "title", "description", "skus.code" ], + "value": [ "food", "drink" ] + }, + { + "type": "range", + "in": "price", + "value": { + "<": 5000, + "gte": 1000 + } + }, + { + "type": "exists", + "context": "not", + "value": "archivedAt" + }, + { + "type": "raw", + "context": "filter", + "value": { + "match_all": {} + } + }, + { + "type": "bool", + "context": "should", + "value": [ + { + "type": "equals", + "in": "context", + "value": "default" + }, + { + "type": "bool", + "context": "not", + "value": { + "type": "exists", + "value": "context" + } + } + ] + } + ] +} diff --git a/agni/core/test/resources/query/multiple.json b/agni/core/test/resources/query/multiple.json deleted file mode 100644 index 20a4543aa3..0000000000 --- a/agni/core/test/resources/query/multiple.json +++ /dev/null @@ -1,53 +0,0 @@ -[ - { - "type": "equals", - "context": "must", - "in": "slug", - "value": [ "awesome", "whatever" ] - }, - { - "type": "matches^.5", - "context": "should", - "in": [ "title", "description", "skus.code" ], - "value": [ "food", "drink" ] - }, - { - "type": "range", - "in": "price", - "value": { - "<": 5000, - "gte": 1000 - } - }, - { - "type": "exists", - "context": "not", - "value": "archivedAt" - }, - { - "type": "raw", - "context": "filter", - "value": { - "match_all": {} - } - }, - { - "type": "bool", - "context": "should", - "value": [ - { - "type": "equals", - "in": "context", - "value": "default" - }, - { - "type": "bool", - "context": "not", - "value": { - "type": "exists", - "value": "context" - } - } - ] - } -] diff --git a/agni/core/test/resources/sorts.json b/agni/core/test/resources/sorts.json new file mode 100644 index 0000000000..779185b5ec --- /dev/null +++ b/agni/core/test/resources/sorts.json @@ -0,0 +1,13 @@ +{ + "type": "fc", + "sort": [ + { + "type": "raw", + "value": "price" + }, + { + "type": "raw", + "value": { "date": { "order": "desc" }} + } + ] +} From 58742131bed6f7c62c4e7ccb03c02b29e28bfda4 Mon Sep 17 00:00:00 2001 From: Krzysztof Janosz Date: Fri, 7 Jul 2017 12:19:44 +0200 Subject: [PATCH 4/4] Get rid of raw payload --- agni/api/app/foxcomm/agni/api/Api.scala | 4 +- .../core/app/foxcomm/agni/SearchService.scala | 65 ++++--------------- .../agni/dsl/query/QueryFunctions.scala | 11 ---- .../app/foxcomm/agni/dsl/query/package.scala | 11 ++++ agni/core/app/foxcomm/agni/dsl/sort.scala | 14 ++++ .../foxcomm/agni/interpreter/es/package.scala | 26 +++++++- agni/core/app/foxcomm/agni/payload.scala | 18 ++--- agni/core/test/foxcomm/agni/dsl/DslSpec.scala | 6 +- 8 files changed, 72 insertions(+), 83 deletions(-) diff --git a/agni/api/app/foxcomm/agni/api/Api.scala b/agni/api/app/foxcomm/agni/api/Api.scala index 4baff402a6..6df6905709 100644 --- a/agni/api/app/foxcomm/agni/api/Api.scala +++ b/agni/api/app/foxcomm/agni/api/Api.scala @@ -15,7 +15,7 @@ import org.elasticsearch.common.ValidationException object Api extends App { def endpoints(searchService: SearchService)(implicit s: Scheduler) = - post("api" :: "search" :: "translate" :: jsonBody[SearchPayload.fc]) { (searchPayload: SearchPayload.fc) ⇒ + post("api" :: "search" :: "translate" :: jsonBody[SearchPayload]) { (searchPayload: SearchPayload) ⇒ searchService .translate(searchPayload = searchPayload) .map(Ok) @@ -44,7 +44,7 @@ object Api extends App { implicit val s: Scheduler = Scheduler.global val config = AppConfig.load() - val svc = SearchService.fromConfig(config, aggregationInterpreter, queryInterpreter, sortInterpreter) + val svc = SearchService.fromConfig(config, dslInterpreter) Await.result( Http.server diff --git a/agni/core/app/foxcomm/agni/SearchService.scala b/agni/core/app/foxcomm/agni/SearchService.scala index d3c09058e0..80d96f7d51 100644 --- a/agni/core/app/foxcomm/agni/SearchService.scala +++ b/agni/core/app/foxcomm/agni/SearchService.scala @@ -5,38 +5,25 @@ import foxcomm.agni.interpreter.es._ import io.circe._ import io.circe.jawn.parseByteBuffer import monix.eval.{Coeval, Task} -import org.elasticsearch.action.search.{SearchAction, SearchRequestBuilder, SearchResponse} +import org.elasticsearch.action.search.{SearchAction, SearchRequest, SearchRequestBuilder, SearchResponse} import org.elasticsearch.client.Client import org.elasticsearch.client.transport.TransportClient import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.InetSocketTransportAddress -import org.elasticsearch.common.xcontent.{ToXContent, XContentFactory} -import org.elasticsearch.index.query.QueryBuilder import org.elasticsearch.search.SearchHit @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) -class SearchService private (client: Client, - ai: ESAggregationInterpreter, - qi: ESQueryInterpreter, - si: ESSortInterpreter) { +class SearchService private (client: Client, interpreter: ESSearchInterpreter) { import SearchService.ExtractJsonObject - def translate(searchPayload: SearchPayload.fc): Task[Json] = { - def buildJson(qb: QueryBuilder): Coeval[Json] = - Coeval.eval { - val builder = XContentFactory.jsonBuilder() - builder.prettyPrint() - builder.startObject() - builder.field("query") - qb.toXContent(builder, ToXContent.EMPTY_PARAMS) - builder.endObject() - parseByteBuffer(builder.bytes().toChannelBuffer.toByteBuffer) - .fold(Coeval.raiseError(_), Coeval.eval(_)) - }.flatten + def translate(searchPayload: SearchPayload): Task[Json] = { + def buildJson(req: SearchRequest): Coeval[Json] = + parseByteBuffer(req.source().toChannelBuffer.toByteBuffer) + .fold(Coeval.raiseError(_), Coeval.eval(_)) for { - builder ← qi(searchPayload.query).task - json ← buildJson(builder).task + req ← interpreter(searchPayload → new SearchRequestBuilder(client, SearchAction.INSTANCE)).task + json ← buildJson(req).task } yield json } @@ -52,31 +39,13 @@ class SearchService private (client: Client, .setTypes(searchType) .setSize(searchSize) searchFrom.foreach(builder.setFrom) - searchPayload.fields.foreach(fs ⇒ builder.setFetchSource(fs.toList.toArray, Array.empty[String])) builder } - def evalQuery(builder: SearchRequestBuilder): Coeval[SearchRequestBuilder] = searchPayload match { - case SearchPayload.es(query, _) ⇒ - Coeval.eval(builder.setQuery(Json.fromJsonObject(query).toBytes)) - case SearchPayload.fc(aggs, query, sort, _) ⇒ - for { - aggs ← ai(aggs) - query ← qi(query) - sorts ← si(sort) - } yield { - aggs.foreach(builder.addAggregation) - builder.setQuery(query) - sorts.foreach(builder.addSort) - builder - } - } - - def setupBuilder: Task[SearchRequestBuilder] = (prepareBuilder flatMap evalQuery).task + def searchRequest: Task[SearchRequest] = prepareBuilder.flatMap(b ⇒ interpreter(searchPayload → b)).task for { - builder ← setupBuilder - request = builder.request() + request ← searchRequest response ← async[SearchResponse, SearchResult](client.search(request, _)) } yield { val hits = response.getHits @@ -102,16 +71,10 @@ object SearchService { .flatMap(_.asObject) } - def apply(client: Client, - ai: ESAggregationInterpreter, - qi: ESQueryInterpreter, - si: ESSortInterpreter): SearchService = - new SearchService(client, ai, qi, si) + def apply(client: Client, interpreter: ESSearchInterpreter): SearchService = + new SearchService(client, interpreter) - def fromConfig(config: AppConfig, - ai: ESAggregationInterpreter, - qi: ESQueryInterpreter, - si: ESSortInterpreter): SearchService = { + def fromConfig(config: AppConfig, interpreter: ESSearchInterpreter): SearchService = { val esConfig = config.elasticsearch val settings = Settings.settingsBuilder().put("cluster.name", esConfig.cluster).build() @@ -121,6 +84,6 @@ object SearchService { .build() .addTransportAddresses(esConfig.host.toList.map(new InetSocketTransportAddress(_)): _*) - apply(client, ai, qi, si) + apply(client, interpreter) } } diff --git a/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala b/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala index 8cd9a73e65..5a4f1c41bc 100644 --- a/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala +++ b/agni/core/app/foxcomm/agni/dsl/query/QueryFunctions.scala @@ -1,6 +1,5 @@ package foxcomm.agni.dsl.query -import cats.data.NonEmptyList import cats.implicits._ import foxcomm.agni.dsl._ import io.circe._ @@ -170,14 +169,4 @@ private[query] trait QueryFunctions { this: QueryData ⇒ } } } - - sealed case class FCQuery(query: Option[NonEmptyList[QueryFunction]]) - object FCQuery { - implicit val decodeFCQuery: Decoder[FCQuery] = - Decoder - .decodeOption( - Decoder.decodeNonEmptyList[QueryFunction] or - Decoder[QueryFunction].map(NonEmptyList.of(_))) - .map(FCQuery(_)) - } } diff --git a/agni/core/app/foxcomm/agni/dsl/query/package.scala b/agni/core/app/foxcomm/agni/dsl/query/package.scala index 294aed5bd9..bf44d0573e 100644 --- a/agni/core/app/foxcomm/agni/dsl/query/package.scala +++ b/agni/core/app/foxcomm/agni/dsl/query/package.scala @@ -1,8 +1,19 @@ package foxcomm.agni.dsl import cats.data.NonEmptyList +import io.circe.Decoder package object query extends QueryData with QueryFunctions { + final case class FCQuery(query: Option[NonEmptyList[QueryFunction]]) + object FCQuery { + implicit val decodeFCQuery: Decoder[FCQuery] = + Decoder + .decodeOption( + Decoder.decodeNonEmptyList[QueryFunction] or + Decoder[QueryFunction].map(NonEmptyList.of(_))) + .map(FCQuery(_)) + } + implicit class RichQueryValue[T](val qv: QueryValue[T]) extends AnyVal { def toNEL: NonEmptyList[T] = qv.eliminate(NonEmptyList.of(_), _.eliminate(identity, _.impossible)) diff --git a/agni/core/app/foxcomm/agni/dsl/sort.scala b/agni/core/app/foxcomm/agni/dsl/sort.scala index 0c9866eb51..2b91d03be7 100644 --- a/agni/core/app/foxcomm/agni/dsl/sort.scala +++ b/agni/core/app/foxcomm/agni/dsl/sort.scala @@ -1,6 +1,7 @@ package foxcomm.agni.dsl import cats.data.NonEmptyList +import cats.implicits._ import io.circe._ import io.circe.generic.extras.auto._ import shapeless._ @@ -13,6 +14,19 @@ object sort { sealed trait SortFunction object SortFunction { final case class raw private (value: RawSortValue) extends SortFunction + + implicit val decodeSortFunction: Decoder[SortFunction] = { + val all: Map[String, Decoder[_ <: SortFunction]] = Map.empty.withDefaultValue(Decoder[raw]) + + Decoder.instance { hc ⇒ + val c = hc.downField(Discriminator) + val tpe = c.focus.flatMap(_.asString) + tpe match { + case Some(t) ⇒ all(t).tryDecode(hc) + case None ⇒ Either.left(DecodingFailure("Unknown sort function type", c.history)) + } + } + } } final case class FCSort(sorts: Option[NonEmptyList[SortFunction]]) diff --git a/agni/core/app/foxcomm/agni/interpreter/es/package.scala b/agni/core/app/foxcomm/agni/interpreter/es/package.scala index 9b0c101078..4df42a8b15 100644 --- a/agni/core/app/foxcomm/agni/interpreter/es/package.scala +++ b/agni/core/app/foxcomm/agni/interpreter/es/package.scala @@ -1,10 +1,12 @@ package foxcomm.agni.interpreter import cats.data._ -import foxcomm.agni.dsl.aggregations.{AggregationFunction, FCAggregation} -import foxcomm.agni.dsl.query.{FCQuery, QueryFunction} -import foxcomm.agni.dsl.sort.{FCSort, SortFunction} +import foxcomm.agni.SearchPayload +import foxcomm.agni.dsl.aggregations._ +import foxcomm.agni.dsl.query._ +import foxcomm.agni.dsl.sort._ import monix.eval.Coeval +import org.elasticsearch.action.search.{SearchRequest, SearchRequestBuilder} import org.elasticsearch.index.query.{BoolQueryBuilder, QueryBuilders} import org.elasticsearch.search.aggregations.AbstractAggregationBuilder import org.elasticsearch.search.sort.SortBuilder @@ -13,6 +15,7 @@ package object es { type ESAggregationInterpreter = Kleisli[Coeval, FCAggregation, List[AbstractAggregationBuilder]] type ESQueryInterpreter = Kleisli[Coeval, FCQuery, BoolQueryBuilder] type ESSortInterpreter = Kleisli[Coeval, FCSort, List[SortBuilder]] + type ESSearchInterpreter = Kleisli[Coeval, (SearchPayload, SearchRequestBuilder), SearchRequest] val aggregationInterpreter: ESAggregationInterpreter = { val eval: Interpreter[Coeval, NonEmptyList[AggregationFunction], List[AbstractAggregationBuilder]] = @@ -31,4 +34,21 @@ package object es { ESSortInterpreter andThen (f ⇒ Coeval.eval(f().toList)) Kleisli(_.sorts.fold(Coeval.now(List.empty[SortBuilder]))(eval)) } + + @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) + val dslInterpreter: ESSearchInterpreter = Kleisli { + case (payload, builder) ⇒ + val req = for { + aggs ← aggregationInterpreter(payload.aggregations) + query ← queryInterpreter(payload.query) + sorts ← sortInterpreter(payload.sort) + } yield { + aggs.foreach(builder.addAggregation) + builder.setQuery(query) + sorts.foreach(builder.addSort) + payload.fields.foreach(fs ⇒ builder.setFetchSource(fs.toList.toArray, Array.empty[String])) + builder.request() + } + req + } } diff --git a/agni/core/app/foxcomm/agni/payload.scala b/agni/core/app/foxcomm/agni/payload.scala index e66bfe9139..ef6fde9824 100644 --- a/agni/core/app/foxcomm/agni/payload.scala +++ b/agni/core/app/foxcomm/agni/payload.scala @@ -2,18 +2,10 @@ package foxcomm.agni import cats.data.NonEmptyList import foxcomm.agni.dsl.aggregations.FCAggregation -import foxcomm.agni.dsl.query._ +import foxcomm.agni.dsl.query.FCQuery import foxcomm.agni.dsl.sort.FCSort -import io.circe.JsonObject -sealed trait SearchPayload { - def fields: Option[NonEmptyList[String]] -} -object SearchPayload { - final case class es(query: JsonObject, fields: Option[NonEmptyList[String]]) extends SearchPayload - final case class fc(aggregations: FCAggregation, - query: FCQuery, - sort: FCSort, - fields: Option[NonEmptyList[String]]) - extends SearchPayload -} +final case class SearchPayload(aggregations: FCAggregation, + query: FCQuery, + sort: FCSort, + fields: Option[NonEmptyList[String]]) diff --git a/agni/core/test/foxcomm/agni/dsl/DslSpec.scala b/agni/core/test/foxcomm/agni/dsl/DslSpec.scala index b3fafb2cb2..c32a9d82bc 100644 --- a/agni/core/test/foxcomm/agni/dsl/DslSpec.scala +++ b/agni/core/test/foxcomm/agni/dsl/DslSpec.scala @@ -31,7 +31,7 @@ class DslSpec extends FlatSpec with Matchers { parse( Source .fromInputStream(getClass.getResourceAsStream("/queries.json")) - .mkString).right.value.as[SearchPayload.fc].right.value + .mkString).right.value.as[SearchPayload].right.value payload.aggregations.aggs.isEmpty should === (true) payload.sort.sorts.isEmpty should === (true) @@ -93,7 +93,7 @@ class DslSpec extends FlatSpec with Matchers { parse( Source .fromInputStream(getClass.getResourceAsStream("/aggs.json")) - .mkString).right.value.as[SearchPayload.fc].right.value + .mkString).right.value.as[SearchPayload].right.value payload.query.query.isEmpty should === (true) payload.sort.sorts.isEmpty should === (true) @@ -114,7 +114,7 @@ class DslSpec extends FlatSpec with Matchers { parse( Source .fromInputStream(getClass.getResourceAsStream("/sorts.json")) - .mkString).right.value.as[SearchPayload.fc].right.value + .mkString).right.value.as[SearchPayload].right.value payload.aggregations.aggs.isEmpty should === (true) payload.query.query.isEmpty should === (true)