diff --git a/README.md b/README.md index 2d238af..df73cbe 100644 --- a/README.md +++ b/README.md @@ -243,6 +243,74 @@ Expected output (SPARQL Results JSON format): } ``` +### Streaming Queries (NDJSON) + +For large result sets, you can use streaming queries to receive results as they are produced rather than waiting for the entire result set. This reduces memory pressure and enables progressive rendering in UIs. + +Streaming is enabled by setting the `Accept` header to `application/x-ndjson` (Newline-Delimited JSON): + +```bash +curl -X POST http://localhost:8090/fluree/query \ + -H "Content-Type: application/json" \ + -H "Accept: application/x-ndjson" \ + -d '{ + "from": "example/ledger", + "@context": { + "schema": "http://schema.org/", + "ex": "http://example.org/" + }, + "select": ["?name", "?email"], + "where": { + "@id": "?person", + "@type": "schema:Person", + "schema:name": "?name", + "schema:email": "?email" + } + }' +``` + +Expected output (NDJSON format - one JSON value per line): +``` +["Alice Johnson","alice@example.com"] +["Bob Smith","bob@example.com"] +``` + +**Key differences from buffered queries:** +- Results are streamed as individual items, not wrapped in an outer array +- Each result is on its own line (newline-delimited) +- Server returns HTTP 206 (Partial Content) instead of 200 +- Stream closes when all results are emitted (no explicit completion marker by default) + +**Enable query tracking** to get fuel, time, and policy statistics in the metadata: + +```bash +curl -X POST http://localhost:8090/fluree/query \ + -H "Content-Type: application/json" \ + -H "Accept: application/x-ndjson" \ + -H "fluree-track-fuel: true" \ + -H "fluree-track-time: true" \ + -d '{ + "from": "example/ledger", + "@context": { + "schema": "http://schema.org/", + "ex": "http://example.org/" + }, + "select": ["?name"], + "where": { + "@id": "?person", + "@type": "schema:Person", + "schema:name": "?name" + } + }' +``` + +Expected output with tracking: +``` +["Alice Johnson"] +["Bob Smith"] +{"_fluree-meta":{"status":200,"fuel":245,"time":"3ms"}} +``` + ### Insert More Data The `/insert` endpoint adds new data to the ledger. If the subject already exists, the operation will merge the new properties with existing ones: diff --git a/deps.edn b/deps.edn index 8129df7..1275650 100644 --- a/deps.edn +++ b/deps.edn @@ -1,7 +1,7 @@ {:deps {org.clojure/clojure {:mvn/version "1.11.3"} org.clojure/core.async {:mvn/version "1.6.681"} com.fluree/db {:git/url "https://github.com/fluree/db.git" - :git/sha "f2bcf88ddf90c44ef50369da4edcb82f91f023e1"} + :git/sha "74fcc49d1154146cc0e66126322f3ad5b0047969"} com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git" :git/sha "74083536c84d77f8cdd4b686b5661714010baad3"} diff --git a/src/fluree/server/consensus/events.clj b/src/fluree/server/consensus/events.clj index 8532b82..3860713 100644 --- a/src/fluree/server/consensus/events.clj +++ b/src/fluree/server/consensus/events.clj @@ -3,8 +3,8 @@ protocols" (:require [clojure.string :as str] [fluree.db.connection :as connection] - [fluree.db.ledger :as ledger] [fluree.db.track :as-alias track] + [fluree.db.transact :as transact] [fluree.db.util.async :refer [ event :opts :raw-txn)] - (let [{:keys [address]} ( event (assoc-in [:opts :raw-txn-address] address) (update :opts dissoc :raw-txn))) @@ -122,7 +122,7 @@ (go-try (if-let [txn (:txn event)] (let [{:keys [ledger-id]} event - {:keys [address]} ( event (assoc :txn-address address) (dissoc :txn))] diff --git a/src/fluree/server/consensus/raft/handlers/new_commit.clj b/src/fluree/server/consensus/raft/handlers/new_commit.clj index 217bb54..ef51aff 100644 --- a/src/fluree/server/consensus/raft/handlers/new_commit.clj +++ b/src/fluree/server/consensus/raft/handlers/new_commit.clj @@ -1,8 +1,8 @@ (ns fluree.server.consensus.raft.handlers.new-commit (:require [clojure.core.async :as async] - [fluree.db.ledger :as ledger] [fluree.db.storage :as storage] [fluree.db.storage.file :as file-storage] + [fluree.db.transact :as transact] [fluree.db.util.async :refer [ (json/parse json false) ;; address is not yet written into the commit file, add it (assoc "address" address))] - (ledger/publish-commit conn commit-json))) + (transact/publish-commit conn commit-json))) (defn store-ledger-files "Persist both the data-file and commit-file to disk only if redundant diff --git a/src/fluree/server/handlers/ledger.clj b/src/fluree/server/handlers/ledger.clj index c6150e1..095e2c9 100644 --- a/src/fluree/server/handlers/ledger.clj +++ b/src/fluree/server/handlers/ledger.clj @@ -1,17 +1,86 @@ (ns fluree.server.handlers.ledger - (:require [fluree.db.api :as fluree] + (:require [clojure.core.async :as async] + [clojure.java.io :as io] + [fluree.db.api :as fluree] + [fluree.db.util :as util] + [fluree.db.util.json :as json] [fluree.db.util.log :as log] [fluree.server.handler :as-alias handler] - [fluree.server.handlers.shared :refer [defhandler deref!] :as shared])) + [fluree.server.handlers.shared :refer [defhandler deref!] :as shared]) + (:import (ring.core.protocols StreamableResponseBody))) + +(defn ndjson-streaming-body + "Converts a core.async channel of results into a Ring StreamableResponseBody + that writes NDJSON (newline-delimited JSON) to the output stream. + + Handles: + - Normal results: written as JSON objects + - Exceptions: written as error objects with :error and :status keys + - Metadata: {:_fluree-meta {:status 200, :fuel ..., :time ..., :policy ...}} + written as final line when query tracking is enabled + + Each item is written as a JSON line followed by newline. + The stream closes when the channel closes." + [result-ch] + (reify StreamableResponseBody + (write-body-to-stream [_ _ output-stream] + (with-open [writer (io/writer output-stream)] + (loop [] + (when-some [result (async/ query-res :body str/split-lines) + results (map #(json/parse % false) lines) + ;; Last line is metadata, rest are data + data-results (butlast results) + meta-result (last results)] + ;; Should get 3 data results + 1 metadata result + (is (= 4 (count results))) + ;; Data results are vectors (tuples) in streaming mode: [name age] + (is (every? vector? data-results)) + ;; Metadata result contains :_fluree-meta key + (is (map? meta-result)) + (is (contains? meta-result "_fluree-meta")) + (is (= 200 (get-in meta-result ["_fluree-meta" "status"]))) + ;; Verify all three people are in results (first element of each tuple is name) + (is (= #{"Alice" "Bob" "Charlie"} + (set (map first data-results))))))) + + (testing "Query without Accept header returns buffered JSON array" + (let [ledger-name (create-rand-ledger "buffered-query-test") + txn-req {:body + (json/stringify + {"ledger" ledger-name + "@context" test-system/default-context + "insert" {"id" "ex:test" + "type" "schema:Test" + "ex:name" "test"}}) + :headers json-headers} + txn-res (api-post :transact txn-req) + _ (assert (= 200 (:status txn-res))) + + query-req {:body + (json/stringify + {"@context" test-system/default-context + "from" ledger-name + "select" ["?name"] + "where" {"id" "?person" + "ex:name" "?name"}}) + :headers json-headers} + query-res (api-post :query query-req)] + + (is (= 200 (:status query-res))) + (is (str/includes? (get-in query-res [:headers "content-type"]) "application/json")) + + (let [results (json/parse (:body query-res) false)] + (is (vector? results)) + (is (= 1 (count results)))))) + + (testing "Streaming query with metadata tracking" + (let [ledger-name (create-rand-ledger "streaming-meta-test") + txn-req {:body + (json/stringify + {"ledger" ledger-name + "@context" test-system/default-context + "insert" {"id" "ex:test" + "type" "schema:Test" + "ex:name" "test"}}) + :headers json-headers} + txn-res (api-post :transact txn-req) + _ (assert (= 200 (:status txn-res))) + + query-req {:body + (json/stringify + {"@context" test-system/default-context + "from" ledger-name + "select" ["?name"] + "where" {"id" "?person" + "ex:name" "?name"}}) + :headers (assoc json-headers + "accept" "application/x-ndjson" + "fluree-track-fuel" "true" + "fluree-track-time" "true")} + query-res (api-post :query query-req)] + + (is (= 206 (:status query-res))) + + (let [lines (-> query-res :body str/split-lines) + results (map #(json/parse % false) lines) + data-results (butlast results) + meta-result (last results)] + ;; Last line should be metadata with _fluree-meta key + (is (contains? meta-result "_fluree-meta")) + (is (contains? (get meta-result "_fluree-meta") "fuel")) + (is (contains? (get meta-result "_fluree-meta") "time")) + ;; All other lines should be vectors (tuples) + (is (every? vector? data-results))))) + + (testing "Streaming query with LIMIT" + (let [ledger-name (create-rand-ledger "streaming-limit-test") + txn-req {:body + (json/stringify + {"ledger" ledger-name + "@context" test-system/default-context + "insert" (for [i (range 10)] + {"id" (str "ex:person" i) + "type" "schema:Person" + "ex:name" (str "Person " i)})}) + :headers json-headers} + txn-res (api-post :transact txn-req) + _ (assert (= 200 (:status txn-res))) + + query-req {:body + (json/stringify + {"@context" test-system/default-context + "from" ledger-name + "select" ["?name"] + "where" {"id" "?person" + "type" "schema:Person" + "ex:name" "?name"} + "limit" 3}) + :headers (assoc json-headers "accept" "application/x-ndjson")} + query-res (api-post :query query-req)] + + (is (= 206 (:status query-res))) + + (let [lines (-> query-res :body str/split-lines) + results (map #(json/parse % false) lines) + data-results (butlast results) + meta-result (last results)] + ;; Should get 3 data results + 1 metadata result (LIMIT 3) + (is (= 4 (count results))) + (is (= 3 (count data-results))) + (is (contains? meta-result "_fluree-meta")))))) diff --git a/test/fluree/server/task_test.clj b/test/fluree/server/task_test.clj index f8fa72f..ee809da 100644 --- a/test/fluree/server/task_test.clj +++ b/test/fluree/server/task_test.clj @@ -25,7 +25,7 @@ {"@id" (str "ex:item-" i) "@type" "ex:Item" "ex:value" i}))}] - @(fluree/transact! conn txn))) + @(fluree/update! conn txn))) (deftest reindex-task (test-system/set-server-ports)