Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
/.lsp/
/dev/data/*
/data*/**
.calva/repl.calva-repl
5 changes: 4 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{:deps {org.clojure/clojure {:mvn/version "1.11.3"}
org.clojure/core.async {:mvn/version "1.6.681"}
com.fluree/db {:git/url "https://github.com/fluree/db.git"
:git/sha "dca1e35afb8b84f557d84b27f0dd387064626012"}
:git/sha "5684c80721398c50886bccc37ba232722d9a406f"}
com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git"
:git/sha "73a990a4b803d0b4cfbbbe4dc16275b39a3add4e"}

Expand All @@ -24,6 +24,9 @@
ch.qos.logback/logback-classic {:mvn/version "1.5.6"}
org.slf4j/slf4j-api {:mvn/version "2.0.13"}

;; distributed tracing
com.github.steffan-westcott/clj-otel-api {:mvn/version "0.2.7"}

;; http
;; ring-jetty9-adapter 0.30.x+ uses Jetty 12 & requires JDK 17+
;; so we have to stay on 0.22.x b/c our minimum JDK version is 11
Expand Down
39 changes: 31 additions & 8 deletions src/fluree/server/consensus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,46 @@
"To allow for pluggable consensus, we have a Transactor protocol. In order to allow
for a new consensus type, we need to create a record with all of the following
methods. Currently, we support Raft and Standalone."
(:require [fluree.db.util.log :as log]
[fluree.server.consensus.events :as events]))
(:require [clojure.string :as string]
[fluree.db.util.log :as log]
[fluree.server.consensus.events :as events]
[steffan-westcott.clj-otel.api.trace.span :as span]
[steffan-westcott.clj-otel.context :as otel-context]))

(set! *warn-on-reflection* true)

(defprotocol Transactor
(-queue-new-ledger [transactor new-ledger-params])
(-queue-new-transaction [transactor new-tx-params]))

(defn with-trace-context
[event]
(assoc event ::trace-context (otel-context/->headers)))

;; need to lowercase map keys before calling otel-context/headers->merged-context
;; https://github.com/steffan-westcott/clj-otel/issues/26
(defn ^:private lowercase-keys [m]
(into {} (map (fn [[k v]] [(string/lower-case (name k)) v])) m))

(defn get-trace-context [event]
(-> event ::trace-context lowercase-keys otel-context/headers->merged-context))

(defn queue-new-ledger
[transactor ledger-id tx-id txn opts]
(log/trace "queue-new-ledger:" ledger-id tx-id txn)
(let [event-params (events/create-ledger ledger-id tx-id txn opts)]
(-queue-new-ledger transactor event-params)))
(log/with-mdc {:tx.id tx-id}
(log/trace "queue-new-ledger:" ledger-id tx-id txn)
(span/with-span! {:name "fluree.server.consensus/queue-new-ledger"
:span-kind :producer
:attributes {:tx.id tx-id}}
(let [event-params (events/create-ledger ledger-id tx-id txn opts)]
(-queue-new-ledger transactor (with-trace-context event-params))))))

(defn queue-new-transaction
[transactor ledger-id tx-id txn opts]
(log/trace "queue-new-transaction:" txn)
(let [event-params (events/commit-transaction ledger-id tx-id txn opts)]
(-queue-new-transaction transactor event-params)))
(log/with-mdc {:tx.id tx-id}
(log/trace "queue-new-transaction:" txn)
(span/with-span! {:name "fluree.server.consensus/queue-new-transaction"
:span-kind :producer
:attributes {:tx.id tx-id}}
(let [event-params (events/commit-transaction ledger-id tx-id txn opts)]
(-queue-new-transaction transactor (with-trace-context event-params))))))
51 changes: 29 additions & 22 deletions src/fluree/server/consensus/standalone.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
[fluree.server.consensus :as consensus]
[fluree.server.consensus.events :as events]
[fluree.server.consensus.response :as response]
[fluree.server.handlers.shared :refer [deref!]]))
[fluree.server.handlers.shared :refer [deref!]]
[fluree.server.watcher :as watcher]

Check warning on line 10 in src/fluree/server/consensus/standalone.clj

View workflow job for this annotation

GitHub Actions / clj-kondo lint

namespace fluree.server.watcher is required but never used
[fluree.server.handlers.shared :refer [deref!]]

Check warning on line 11 in src/fluree/server/consensus/standalone.clj

View workflow job for this annotation

GitHub Actions / clj-kondo lint

duplicate require of fluree.server.handlers.shared
[steffan-westcott.clj-otel.api.trace.span :as span]))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -36,27 +39,31 @@

(defn process-event
[conn watcher broadcaster event]
(go
(try
(let [event* (if (events/resolve-txn? event)
(<? (events/resolve-txns conn event))
event)
event-type (events/event-type event*)]
(cond
(= :ledger-create event-type)
(<? (create-ledger! conn watcher broadcaster event*))

(= :tx-queue event-type)
(<? (transact! conn watcher broadcaster event*))

:else
(throw (ex-info (str "Unexpected event message: event type '" event-type "' not"
" one of (':ledger-create', ':tx-queue')")
{:status 500, :error :consensus/unexpected-event}))))
(catch Exception e
(let [{:keys [ledger-id tx-id]} event]
(log/warn e "Error processing consensus event")
(response/announce-error watcher broadcaster ledger-id tx-id e))))))
(span/with-span! {:name "fluree.server.consensus.standalone/process-event"
;; should be :consumer but only :server supported by xray https://github.com/aws-observability/aws-otel-collector/issues/1773
:span-kind :server
:parent (consensus/get-trace-context event)}
(go
(try
(let [event* (if (events/resolve-txn? event)
(<? (events/resolve-txns conn event))
event)
event-type (events/event-type event*)]
(cond
(= :ledger-create event-type)
(<? (create-ledger! conn watcher broadcaster event*))

(= :tx-queue event-type)
(<? (transact! conn watcher broadcaster event*))

:else
(throw (ex-info (str "Unexpected event message: event type '" event-type "' not"
" one of (':ledger-create', ':tx-queue')")
{:status 500, :error :consensus/unexpected-event}))))
(catch Exception e
(let [{:keys [ledger-id tx-id]} event]
(log/warn e "Error processing consensus event")
(response/announce-error watcher broadcaster ledger-id tx-id e)))))))

(defn error?
[result]
Expand Down
5 changes: 4 additions & 1 deletion src/fluree/server/handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
[reitit.swagger :as swagger]
[reitit.swagger-ui :as swagger-ui]
[ring.adapter.jetty9 :as http]
[ring.middleware.cors :as rmc])
[ring.middleware.cors :as rmc]
[steffan-westcott.clj-otel.api.trace.http :as trace-http])
(:import (java.io InputStream)))

(set! *warn-on-reflection* true)
Expand Down Expand Up @@ -446,6 +447,8 @@
[10 wrap-cors]
[10 (partial wrap-assoc-system connection consensus
watcher subscriptions)]
[20 (partial trace-http/wrap-reitit-route)]
[25 (partial trace-http/wrap-server-span)]
[50 unwrap-credential]
[100 wrap-set-fuel-header]
[200 coercion/coerce-exceptions-middleware]
Expand Down
13 changes: 9 additions & 4 deletions src/fluree/server/handlers/create.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
[fluree.server.handlers.transact :refer [derive-tx-id extract-ledger-id
monitor-consensus-persistence
monitor-commit]]
[fluree.server.watcher :as watcher]))
[fluree.server.watcher :as watcher]
[steffan-westcott.clj-otel.api.trace.span :as span]))

(set! *warn-on-reflection* true)

Expand All @@ -22,8 +23,10 @@
(let [p (promise)
tx-id (derive-tx-id txn)
result-ch (watcher/create-watch watcher tx-id)]
(queue-consensus consensus watcher ledger-id tx-id txn opts)
(monitor-commit p ledger-id tx-id result-ch)
(log/with-mdc {:tx.id tx-id}
(span/add-span-data! {:tx.id tx-id})
(queue-consensus consensus watcher ledger-id tx-id txn opts)
(monitor-commit p ledger-id tx-id result-ch))
p))

(defhandler default
Expand All @@ -33,5 +36,7 @@
(let [txn (fluree/format-txn body opts)
ledger-id (or (:ledger opts)
(extract-ledger-id txn))
resp-p (create-ledger consensus watcher ledger-id txn {})]
resp-p (log/with-mdc {:ledger.id ledger-id}
(do (span/add-span-data! {:attributes (org.slf4j.MDC/getCopyOfContextMap)})
(create-ledger consensus watcher ledger-id txn {})))]
{:status 201, :body (deref! resp-p)}))
12 changes: 7 additions & 5 deletions src/fluree/server/handlers/ledger.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
[fluree.db.api :as fluree]
[fluree.db.util.log :as log]
[fluree.server.handler :as-alias handler]
[fluree.server.handlers.shared :refer [defhandler deref!]]))
[fluree.server.handlers.shared :refer [defhandler deref!]]
[steffan-westcott.clj-otel.api.trace.span :as span]))

(defhandler query
[{:keys [fluree/conn fluree/opts] {:keys [body]} :parameters :as _req}]
Expand All @@ -14,9 +15,10 @@

(defhandler history
[{:keys [fluree/conn fluree/opts] {{ledger :from :as query} :body} :parameters :as _req}]

(let [ledger* (->> ledger (fluree/load conn) deref!)
query* (dissoc query :from)]
(log/debug "history handler received query:" query opts)
{:status 200
:body (deref! (fluree/history ledger* query* opts))}))
(log/with-mdc {:ledger.id ledger}
(span/add-span-data! {:attributes (org.slf4j.MDC/getCopyOfContextMap)})
(log/debug "history handler received query:" query opts)
{:status 200
:body (deref! (fluree/history ledger* query* opts))})))
15 changes: 10 additions & 5 deletions src/fluree/server/handlers/remote_resource.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
[fluree.db.connection :as connection]
[fluree.db.util.async :refer [<??]]
[fluree.db.util.log :as log]
[fluree.server.handlers.shared :refer [defhandler]]))
[fluree.server.handlers.shared :refer [defhandler]]
[steffan-westcott.clj-otel.api.trace.span :as span]))

(defhandler latest-commit
[{:keys [fluree/conn]
{{ledger-address :resource :as body} :body} :parameters}]
;; todo: mdc and span properties as needed
(log/debug "Latest commit lookup request:" body)
(let [result (<?? (connection/read-publisher-commit conn ledger-address))]
{:status 200
Expand All @@ -16,6 +18,7 @@
(defhandler read-resource-address
[{:keys [fluree/conn]
{{resource-address :resource :as body} :body} :parameters}]
;; todo: mdc and span properties as needed
(log/debug "Remote resource read request:" body)
(let [result (<?? (connection/read-file-address conn resource-address))]
{:status 200
Expand All @@ -24,7 +27,9 @@
(defhandler published-ledger-addresses
[{:keys [fluree/conn]
{{:keys [ledger] :as body} :body} :parameters}]
(log/debug "Retrieve ledger address request:" body)
(let [result (<?? (connection/published-addresses conn ledger))]
{:status 200
:body {:addresses result}}))
(log/with-mdc {:ledger.id ledger}
(span/add-span-data! {:attributes (org.slf4j.MDC/getCopyOfContextMap)})
(log/debug "Retrieve ledger address request:" body)
(let [result (<?? (connection/published-addresses conn ledger))]
{:status 200
:body {:addresses result}})))
7 changes: 5 additions & 2 deletions src/fluree/server/handlers/transact.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
[fluree.server.consensus :as consensus]
[fluree.server.consensus.events :as events]
[fluree.server.handlers.shared :refer [defhandler deref!]]
[fluree.server.watcher :as watcher]))
[fluree.server.watcher :as watcher]
[steffan-westcott.clj-otel.api.trace.span :as span]))

(set! *warn-on-reflection* true)

Expand Down Expand Up @@ -109,5 +110,7 @@
opts* (cond-> (assoc opts :format :fql)
raw-txn (assoc :raw-txn raw-txn)
did (assoc :did did))
resp-p (transact! consensus watcher ledger-id txn opts*)]
resp-p (log/with-mdc {:ledger.id ledger-id}
(do (span/add-span-data! {:attributes (org.slf4j.MDC/getCopyOfContextMap)})
(transact! consensus watcher ledger-id txn opts*)))]
{:status 200, :body (deref! resp-p)}))
Loading