From 88d0169f73fffa800019fced37ea88322d7e4b8e Mon Sep 17 00:00:00 2001 From: bplatz Date: Thu, 9 Oct 2025 22:47:15 -0400 Subject: [PATCH 1/2] Add NDJSON streaming support for queries and update related tests - Introduced a new `ndjson-streaming-body` function to handle streaming responses in NDJSON format. - Updated the query handler to support streaming queries based on the `Accept` header. - Modified transaction handling to use the `transact` namespace instead of `ledger`. - Added integration tests for streaming queries, including metadata tracking and limit functionality. - Updated dependencies and fixed related test cases. --- README.md | 68 +++++++ deps.edn | 2 +- src/fluree/server/consensus/events.clj | 6 +- .../consensus/raft/handlers/new_commit.clj | 4 +- src/fluree/server/handlers/ledger.clj | 85 ++++++++- .../integration/streaming_query_test.clj | 173 ++++++++++++++++++ test/fluree/server/task_test.clj | 2 +- 7 files changed, 325 insertions(+), 15 deletions(-) create mode 100644 test/fluree/server/integration/streaming_query_test.clj diff --git a/README.md b/README.md index 2d238af7..4bb4afac 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,74 @@ Expected output (SPARQL Results JSON format): } ``` +### Streaming Queries (NDJSON) + +For large result sets, you can use streaming queries to receive results as they are produced rather than waiting for the entire result set. This reduces memory pressure and enables progressive rendering in UIs. + +Streaming is enabled by setting the `Accept` header to `application/x-ndjson` (Newline-Delimited JSON): + +```bash +curl -X POST http://localhost:8090/fluree/query \ + -H "Content-Type: application/json" \ + -H "Accept: application/x-ndjson" \ + -d '{ + "from": "example/ledger", + "@context": { + "schema": "http://schema.org/", + "ex": "http://example.org/" + }, + "select": ["?name", "?email"], + "where": { + "@id": "?person", + "@type": "schema:Person", + "schema:name": "?name", + "schema:email": "?email" + } + }' +``` + +Expected output (NDJSON format - one JSON array per line): +``` +["Alice Johnson","alice@example.com"] +["Bob Smith","bob@example.com"] +``` + +**Key differences from buffered queries:** +- Results are returned as tuples (arrays) instead of maps for efficiency +- Each result is on its own line (newline-delimited) +- Server returns HTTP 206 (Partial Content) instead of 200 +- Stream closes when all results are emitted (no explicit completion marker by default) + +**Enable query tracking** to get fuel, time, and policy statistics in the metadata: + +```bash +curl -X POST http://localhost:8090/fluree/query \ + -H "Content-Type: application/json" \ + -H "Accept: application/x-ndjson" \ + -H "fluree-track-fuel: true" \ + -H "fluree-track-time: true" \ + -d '{ + "from": "example/ledger", + "@context": { + "schema": "http://schema.org/", + "ex": "http://example.org/" + }, + "select": ["?name"], + "where": { + "@id": "?person", + "@type": "schema:Person", + "schema:name": "?name" + } + }' +``` + +Expected output with tracking: +``` +["Alice Johnson"] +["Bob Smith"] +{"_fluree-meta":{"status":200,"fuel":245,"time":"3ms"}} +``` + ### Insert More Data The `/insert` endpoint adds new data to the ledger. If the subject already exists, the operation will merge the new properties with existing ones: diff --git a/deps.edn b/deps.edn index 8129df76..1275650f 100644 --- a/deps.edn +++ b/deps.edn @@ -1,7 +1,7 @@ {:deps {org.clojure/clojure {:mvn/version "1.11.3"} org.clojure/core.async {:mvn/version "1.6.681"} com.fluree/db {:git/url "https://github.com/fluree/db.git" - :git/sha "f2bcf88ddf90c44ef50369da4edcb82f91f023e1"} + :git/sha "74fcc49d1154146cc0e66126322f3ad5b0047969"} com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git" :git/sha "74083536c84d77f8cdd4b686b5661714010baad3"} diff --git a/src/fluree/server/consensus/events.clj b/src/fluree/server/consensus/events.clj index 8532b820..38607139 100644 --- a/src/fluree/server/consensus/events.clj +++ b/src/fluree/server/consensus/events.clj @@ -3,8 +3,8 @@ protocols" (:require [clojure.string :as str] [fluree.db.connection :as connection] - [fluree.db.ledger :as ledger] [fluree.db.track :as-alias track] + [fluree.db.transact :as transact] [fluree.db.util.async :refer [ event :opts :raw-txn)] - (let [{:keys [address]} ( event (assoc-in [:opts :raw-txn-address] address) (update :opts dissoc :raw-txn))) @@ -122,7 +122,7 @@ (go-try (if-let [txn (:txn event)] (let [{:keys [ledger-id]} event - {:keys [address]} ( event (assoc :txn-address address) (dissoc :txn))] diff --git a/src/fluree/server/consensus/raft/handlers/new_commit.clj b/src/fluree/server/consensus/raft/handlers/new_commit.clj index 217bb543..ef51aff0 100644 --- a/src/fluree/server/consensus/raft/handlers/new_commit.clj +++ b/src/fluree/server/consensus/raft/handlers/new_commit.clj @@ -1,8 +1,8 @@ (ns fluree.server.consensus.raft.handlers.new-commit (:require [clojure.core.async :as async] - [fluree.db.ledger :as ledger] [fluree.db.storage :as storage] [fluree.db.storage.file :as file-storage] + [fluree.db.transact :as transact] [fluree.db.util.async :refer [ (json/parse json false) ;; address is not yet written into the commit file, add it (assoc "address" address))] - (ledger/publish-commit conn commit-json))) + (transact/publish-commit conn commit-json))) (defn store-ledger-files "Persist both the data-file and commit-file to disk only if redundant diff --git a/src/fluree/server/handlers/ledger.clj b/src/fluree/server/handlers/ledger.clj index c6150e1a..095e2c98 100644 --- a/src/fluree/server/handlers/ledger.clj +++ b/src/fluree/server/handlers/ledger.clj @@ -1,17 +1,86 @@ (ns fluree.server.handlers.ledger - (:require [fluree.db.api :as fluree] + (:require [clojure.core.async :as async] + [clojure.java.io :as io] + [fluree.db.api :as fluree] + [fluree.db.util :as util] + [fluree.db.util.json :as json] [fluree.db.util.log :as log] [fluree.server.handler :as-alias handler] - [fluree.server.handlers.shared :refer [defhandler deref!] :as shared])) + [fluree.server.handlers.shared :refer [defhandler deref!] :as shared]) + (:import (ring.core.protocols StreamableResponseBody))) + +(defn ndjson-streaming-body + "Converts a core.async channel of results into a Ring StreamableResponseBody + that writes NDJSON (newline-delimited JSON) to the output stream. + + Handles: + - Normal results: written as JSON objects + - Exceptions: written as error objects with :error and :status keys + - Metadata: {:_fluree-meta {:status 200, :fuel ..., :time ..., :policy ...}} + written as final line when query tracking is enabled + + Each item is written as a JSON line followed by newline. + The stream closes when the channel closes." + [result-ch] + (reify StreamableResponseBody + (write-body-to-stream [_ _ output-stream] + (with-open [writer (io/writer output-stream)] + (loop [] + (when-some [result (async/ query-res :body str/split-lines) + results (map #(json/parse % false) lines) + ;; Last line is metadata, rest are data + data-results (butlast results) + meta-result (last results)] + ;; Should get 3 data results + 1 metadata result + (is (= 4 (count results))) + ;; Data results are vectors (tuples) in streaming mode: [name age] + (is (every? vector? data-results)) + ;; Metadata result contains :_fluree-meta key + (is (map? meta-result)) + (is (contains? meta-result "_fluree-meta")) + (is (= 200 (get-in meta-result ["_fluree-meta" "status"]))) + ;; Verify all three people are in results (first element of each tuple is name) + (is (= #{"Alice" "Bob" "Charlie"} + (set (map first data-results))))))) + + (testing "Query without Accept header returns buffered JSON array" + (let [ledger-name (create-rand-ledger "buffered-query-test") + txn-req {:body + (json/stringify + {"ledger" ledger-name + "@context" test-system/default-context + "insert" {"id" "ex:test" + "type" "schema:Test" + "ex:name" "test"}}) + :headers json-headers} + txn-res (api-post :transact txn-req) + _ (assert (= 200 (:status txn-res))) + + query-req {:body + (json/stringify + {"@context" test-system/default-context + "from" ledger-name + "select" ["?name"] + "where" {"id" "?person" + "ex:name" "?name"}}) + :headers json-headers} + query-res (api-post :query query-req)] + + (is (= 200 (:status query-res))) + (is (str/includes? (get-in query-res [:headers "content-type"]) "application/json")) + + (let [results (json/parse (:body query-res) false)] + (is (vector? results)) + (is (= 1 (count results)))))) + + (testing "Streaming query with metadata tracking" + (let [ledger-name (create-rand-ledger "streaming-meta-test") + txn-req {:body + (json/stringify + {"ledger" ledger-name + "@context" test-system/default-context + "insert" {"id" "ex:test" + "type" "schema:Test" + "ex:name" "test"}}) + :headers json-headers} + txn-res (api-post :transact txn-req) + _ (assert (= 200 (:status txn-res))) + + query-req {:body + (json/stringify + {"@context" test-system/default-context + "from" ledger-name + "select" ["?name"] + "where" {"id" "?person" + "ex:name" "?name"}}) + :headers (assoc json-headers + "accept" "application/x-ndjson" + "fluree-track-fuel" "true" + "fluree-track-time" "true")} + query-res (api-post :query query-req)] + + (is (= 206 (:status query-res))) + + (let [lines (-> query-res :body str/split-lines) + results (map #(json/parse % false) lines) + data-results (butlast results) + meta-result (last results)] + ;; Last line should be metadata with _fluree-meta key + (is (contains? meta-result "_fluree-meta")) + (is (contains? (get meta-result "_fluree-meta") "fuel")) + (is (contains? (get meta-result "_fluree-meta") "time")) + ;; All other lines should be vectors (tuples) + (is (every? vector? data-results))))) + + (testing "Streaming query with LIMIT" + (let [ledger-name (create-rand-ledger "streaming-limit-test") + txn-req {:body + (json/stringify + {"ledger" ledger-name + "@context" test-system/default-context + "insert" (for [i (range 10)] + {"id" (str "ex:person" i) + "type" "schema:Person" + "ex:name" (str "Person " i)})}) + :headers json-headers} + txn-res (api-post :transact txn-req) + _ (assert (= 200 (:status txn-res))) + + query-req {:body + (json/stringify + {"@context" test-system/default-context + "from" ledger-name + "select" ["?name"] + "where" {"id" "?person" + "type" "schema:Person" + "ex:name" "?name"} + "limit" 3}) + :headers (assoc json-headers "accept" "application/x-ndjson")} + query-res (api-post :query query-req)] + + (is (= 206 (:status query-res))) + + (let [lines (-> query-res :body str/split-lines) + results (map #(json/parse % false) lines) + data-results (butlast results) + meta-result (last results)] + ;; Should get 3 data results + 1 metadata result (LIMIT 3) + (is (= 4 (count results))) + (is (= 3 (count data-results))) + (is (contains? meta-result "_fluree-meta")))))) diff --git a/test/fluree/server/task_test.clj b/test/fluree/server/task_test.clj index f8fa72f9..ee809dad 100644 --- a/test/fluree/server/task_test.clj +++ b/test/fluree/server/task_test.clj @@ -25,7 +25,7 @@ {"@id" (str "ex:item-" i) "@type" "ex:Item" "ex:value" i}))}] - @(fluree/transact! conn txn))) + @(fluree/update! conn txn))) (deftest reindex-task (test-system/set-server-ports) From b198fc3a66da2b65422fece67ac660e393d9a2a3 Mon Sep 17 00:00:00 2001 From: bplatz Date: Thu, 9 Oct 2025 22:53:58 -0400 Subject: [PATCH 2/2] Update README to clarify NDJSON output format for query results --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 4bb4afac..df73cbe1 100644 --- a/README.md +++ b/README.md @@ -269,14 +269,14 @@ curl -X POST http://localhost:8090/fluree/query \ }' ``` -Expected output (NDJSON format - one JSON array per line): +Expected output (NDJSON format - one JSON value per line): ``` ["Alice Johnson","alice@example.com"] ["Bob Smith","bob@example.com"] ``` **Key differences from buffered queries:** -- Results are returned as tuples (arrays) instead of maps for efficiency +- Results are streamed as individual items, not wrapped in an outer array - Each result is on its own line (newline-delimited) - Server returns HTTP 206 (Partial Content) instead of 200 - Stream closes when all results are emitted (no explicit completion marker by default)