Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{:deps {org.clojure/clojure {:mvn/version "1.11.3"}
org.clojure/core.async {:mvn/version "1.6.681"}
com.fluree/db {:git/url "https://github.com/fluree/db.git"
:git/sha "f5f76fb2373b712aeee96574e1bbc2cd0277fce1"}
:git/sha "2843030084a3a9378b312ffaaa21f23fb45f5a26"}
com.fluree/json-ld {:git/url "https://github.com/fluree/json-ld.git"
:git/sha "74083536c84d77f8cdd4b686b5661714010baad3"}

Expand All @@ -23,8 +23,10 @@
camel-snake-kebab/camel-snake-kebab {:mvn/version "0.4.3"}

;; logging
ch.qos.logback/logback-classic {:mvn/version "1.5.6"}
org.slf4j/slf4j-api {:mvn/version "2.0.13"}
org.slf4j/slf4j-api {:mvn/version "2.0.13"}
ch.qos.logback/logback-classic {:mvn/version "1.5.6"}
net.logstash.logback/logstash-logback-encoder {:mvn/version "9.0"}
com.github.steffan-westcott/clj-otel-api {:mvn/version "0.2.10"}

;; http
;; ring-jetty9-adapter 0.30.x+ uses Jetty 12 & requires JDK 17+
Expand Down Expand Up @@ -53,6 +55,16 @@
integrant/repl {:mvn/version "0.3.3"}}
:jvm-opts ["-Djdk.attach.allowAttachSelf"]}

:otel
{:jvm-opts [ ;;; download jar from https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases
;; "-javaagent:/path/to/opentelemetry-javaagent.jar"
"-Dotel.exporter.otlp.endpoint=http://localhost:4318"
"-Dotel.exporter.otlp.protocol=http/protobuf"
"-Dotel.logs.exporter=none"
"-Dotel.metrics.exporter=none"
"-Dotel.service.name=fluree-server"
"-Dotel.instrumentation.http.capture.headers.server.request=x-amzn-trace-id"]}

:run-dev
{:main-opts ["-m" "fluree.server" "--profile" "dev"]}

Expand Down
75 changes: 60 additions & 15 deletions resources/logback.xml
Original file line number Diff line number Diff line change
@@ -1,19 +1,64 @@
<configuration>
<property scope="context" name="LOG_LEVEL" value="${LOG_LEVEL:-INFO}"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>

<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>

<logger name="fluree" level="info" additivity="false">
<appender-ref ref="CONSOLE"/>
</logger>

<root level="error">
<appender-ref ref="CONSOLE"/>
</root>
<appender name="JSON_CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<!-- Timestamp in ISO8601 -->
<timestamp>
<fieldName>timestamp</fieldName>
<pattern>yyyy-MM-dd'T'HH:mm:ss.SSSZ</pattern>
</timestamp>

<!-- Log level -->
<logLevel>
<fieldName>level</fieldName>
</logLevel>

<!-- Thread name -->
<threadName>
<fieldName>thread</fieldName>
</threadName>

<!-- Logger name -->
<loggerName>
<fieldName>logger</fieldName>
</loggerName>

<!-- Message -->
<message>
<fieldName>message</fieldName>
</message>

<!-- Mapped diagnostic context (MDC) -->
<mdc>
<fieldName>mdc</fieldName>
</mdc>

<!-- Throwable / stacktrace -->
<throwable>
<fieldName>exception</fieldName>
<throwableConverter class="net.logstash.logback.stacktrace.ShortenedThrowableConverter">
<maxDepthPerThrowable>30</maxDepthPerThrowable>
<maxLength>2048</maxLength>
<shortenedClassNameLength>20</shortenedClassNameLength>
</throwableConverter>
</throwable>
</providers>
</encoder>
</appender>


<root level="${LOG_LEVEL}">
<!-- Change ref to JSON_CONSOLE for structured logs -->
<appender-ref ref="CONSOLE"/>
</root>

</configuration>
21 changes: 14 additions & 7 deletions src/fluree/server/consensus/events.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
[fluree.db.ledger :as ledger]
[fluree.db.track :as-alias track]
[fluree.db.util.async :refer [<? go-try]]
[fluree.db.util.log :as log]))
[fluree.db.util.log :as log]
[fluree.db.util.trace :as trace]))

(defn event-type
[event]
Expand All @@ -28,6 +29,10 @@
[x]
(string? x))

(defn with-otel-ctx
[evt]
(assoc evt :otel/context (trace/get-context)))

(defn with-txn
[evt txn]
(cond (nil? txn)
Expand Down Expand Up @@ -65,14 +70,16 @@
:ledger-id ledger-id
:opts opts
:instant (System/currentTimeMillis)}]
(with-txn evt txn)))
(-> evt
(with-txn txn)
(with-otel-ctx))))

(defn drop-ledger
"Create a new event message to drop an existing ledger."
[ledger-id]
{:type :ledger-drop
:ledger-id ledger-id
:instant (System/currentTimeMillis)})
(with-otel-ctx {:type :ledger-drop
:ledger-id ledger-id
:instant (System/currentTimeMillis)}))

(defn commit-transaction
"Create a new event message to commit a new transaction. The `txn` argument may
Expand All @@ -85,8 +92,8 @@
:opts opts
:instant (System/currentTimeMillis)}]
(if (= :turtle (:format opts))
(with-turtle-txn evt txn)
(with-txn evt txn))))
(with-otel-ctx (with-turtle-txn evt txn))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could benefit from some threading with ->

(with-otel-ctx (with-txn evt txn)))))

(defn get-txn
"Gets the transaction value, either a transaction document or the storage
Expand Down
78 changes: 42 additions & 36 deletions src/fluree/server/consensus/standalone.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[fluree.db.api :as fluree]
[fluree.db.util.async :refer [<? go-try]]
[fluree.db.util.log :as log]
[fluree.db.util.trace :as trace]
[fluree.server.consensus :as consensus]
[fluree.server.consensus.events :as events]
[fluree.server.consensus.response :as response]
Expand All @@ -12,55 +13,60 @@
(set! *warn-on-reflection* true)

(defn create-ledger!
[conn watcher broadcaster {:keys [ledger-id tx-id txn opts] :as _params}]
[conn watcher broadcaster {:keys [ledger-id tx-id txn opts otel/context] :as _params}]
(go-try
(let [commit-result (if txn
(deref! (fluree/create-with-txn conn txn opts))
(let [db (deref! (fluree/create conn ledger-id opts))]
(shared-create/genesis-result db)))]
(let [commit-result
(trace/form ::create-ledger! {:parent context}
(if txn
(deref! (fluree/create-with-txn conn txn opts))
(let [db (deref! (fluree/create conn ledger-id opts))]
(shared-create/genesis-result db))))]
(response/announce-new-ledger watcher broadcaster ledger-id tx-id commit-result))))

(defn drop-ledger!
[conn watcher broadcaster {:keys [ledger-id] :as _params}]
[conn watcher broadcaster {:keys [ledger-id otel/context] :as _params}]
(go-try
(let [drop-result (deref! (fluree/drop conn ledger-id))]
(let [drop-result (trace/form ::drop-ledger! {:parent context}
(deref! (fluree/drop conn ledger-id)))]
(response/announce-dropped-ledger watcher broadcaster ledger-id drop-result))))

(defn transact!
[conn watcher broadcaster {:keys [ledger-id tx-id txn opts]}]
[conn watcher broadcaster {:keys [ledger-id tx-id txn opts otel/context]}]
(go-try
(let [commit-result (case (:op opts)
:update (deref! (fluree/update! conn ledger-id txn opts))
:upsert (deref! (fluree/upsert! conn ledger-id txn opts))
:insert (deref! (fluree/insert! conn ledger-id txn opts)))]
(let [commit-result (trace/form ::transact! {:parent context}
(case (:op opts)
:update (deref! (fluree/update! conn ledger-id txn opts))
:upsert (deref! (fluree/upsert! conn ledger-id txn opts))
:insert (deref! (fluree/insert! conn ledger-id txn opts))))]
(response/announce-commit watcher broadcaster ledger-id tx-id commit-result))))

(defn process-event
[conn watcher broadcaster event]
(go
(try
(let [event* (if (events/resolve-txn? event)
(<? (events/resolve-txns conn event))
event)
event-type (events/event-type event*)]
(cond
(= :ledger-create event-type)
(<? (create-ledger! conn watcher broadcaster event*))

(= :ledger-drop event-type)
(<? (drop-ledger! conn watcher broadcaster event*))

(= :tx-queue event-type)
(<? (transact! conn watcher broadcaster event*))

:else
(throw (ex-info (str "Unexpected event message: event type '" event-type "' not"
" one of (':ledger-create', ':ledger-drop', ':tx-queue')")
{:status 500, :error :consensus/unexpected-event}))))
(catch Exception e
(let [{:keys [ledger-id tx-id]} event]
(log/warn e "Error processing consensus event")
(response/announce-error watcher broadcaster ledger-id tx-id e))))))
(trace/with-parent-context ::process-event (:otel/context event)
(go
(try
(let [event* (if (events/resolve-txn? event)
(<? (events/resolve-txns conn event))
event)
event-type (events/event-type event*)]
(cond
(= :ledger-create event-type)
(<? (create-ledger! conn watcher broadcaster event*))

(= :ledger-drop event-type)
(<? (drop-ledger! conn watcher broadcaster event*))

(= :tx-queue event-type)
(<? (transact! conn watcher broadcaster event*))

:else
(throw (ex-info (str "Unexpected event message: event type '" event-type "' not"
" one of (':ledger-create', ':ledger-drop', ':tx-queue')")
{:status 500, :error :consensus/unexpected-event}))))
(catch Exception e
(let [{:keys [ledger-id tx-id]} event]
(log/warn e "Error processing consensus event")
(response/announce-error watcher broadcaster ledger-id tx-id e)))))))

(defn error?
[result]
Expand Down
6 changes: 4 additions & 2 deletions src/fluree/server/handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
[reitit.swagger :as swagger]
[reitit.swagger-ui :as swagger-ui]
[ring.adapter.jetty9 :as http]
[ring.middleware.cors :as rmc])
[ring.middleware.cors :as rmc]
[steffan-westcott.clj-otel.api.trace.http :as trace-http])
(:import (java.io InputStream)))

(set! *warn-on-reflection* true)
Expand Down Expand Up @@ -656,4 +657,5 @@
app-routes (cond-> ["" {:middleware app-middleware} default-fluree-routes]
(seq custom-routes) (conj custom-routes))
router (app-router app-routes)]
(ring/ring-handler router fallback-handler))))
(trace-http/wrap-server-span
(ring/ring-handler router fallback-handler)))))
4 changes: 3 additions & 1 deletion src/fluree/server/handlers/create.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns fluree.server.handlers.create
(:require [fluree.db.api :as fluree]
[fluree.db.util.trace :as trace]
[fluree.server.consensus :as consensus]
[fluree.server.handlers.shared :as shared :refer [deref! defhandler]]
[fluree.server.handlers.transact :as srv-tx]
Expand Down Expand Up @@ -50,7 +51,8 @@
(throw (ex-info "Ledger ID must be provided"
{:status 400 :error :db/invalid-ledger-id})))
opts* (prepare-create-options opts)
commit-event (deref! (create-ledger! consensus watcher ledger-id txn opts*))
commit-event (trace/form ::create-handler {}
(deref! (create-ledger! consensus watcher ledger-id txn opts*)))
response-body (srv-tx/commit-event->response-body commit-event)]
(shared/with-tracking-headers {:status 201, :body response-body}
commit-event)))
4 changes: 3 additions & 1 deletion src/fluree/server/handlers/drop.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns fluree.server.handlers.drop
(:require [fluree.db.util.log :as log]
[fluree.db.util.trace :as trace]
[fluree.server.consensus :as consensus]
[fluree.server.handlers.shared :as shared :refer [deref! defhandler]]
[fluree.server.watcher :as watcher]))
Expand All @@ -24,5 +25,6 @@
{:keys [body]} :parameters}]
(log/debug "drop body:" body)
(let [ledger-id (:ledger body)
resp-p (drop-ledger consensus watcher ledger-id)]
resp-p (trace/form ::drop-handler {}
(drop-ledger consensus watcher ledger-id))]
{:status 200, :body (deref! resp-p)}))
7 changes: 5 additions & 2 deletions src/fluree/server/handlers/ledger.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns fluree.server.handlers.ledger
(:require [fluree.db.api :as fluree]
[fluree.db.util.log :as log]
[fluree.db.util.trace :as trace]
[fluree.server.handler :as-alias handler]
[fluree.server.handlers.shared :refer [defhandler deref!] :as shared]))

Expand All @@ -10,15 +11,17 @@
;; supply ledger-alias from path params if not overridden by a header
opts* (update opts :ledger #(or % (:ledger-alias path)))
{:keys [status result] :as query-response}
(deref! (fluree/query-connection conn query opts*))]
(trace/form ::query-handler {}
(deref! (fluree/query-connection conn query opts*)))]
(log/debug "query handler received query:" query opts*)
(shared/with-tracking-headers {:status status, :body result}
query-response)))

(defhandler history
[{:keys [fluree/conn fluree/opts] {{ledger :from :as query} :body} :parameters :as _req}]
(let [query* (dissoc query :from)
result (deref! (fluree/history conn ledger query* opts))]
result (trace/form ::history-handler {}
(deref! (fluree/history conn ledger query* opts)))]
(log/debug "history handler received query:" query opts "result:" result)
;; fluree/history may return either raw result or wrapped in {:status :result}
(if (and (map? result) (:status result) (:result result))
Expand Down
10 changes: 7 additions & 3 deletions src/fluree/server/handlers/transact.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[fluree.crypto :as crypto]
[fluree.db.api :as fluree]
[fluree.db.query.fql.parse :as parse]
[fluree.db.util.trace :as trace]
[fluree.json-ld :as json-ld]
[fluree.server.consensus :as consensus]
[fluree.server.handlers.shared :as shared :refer [defhandler deref!]]
Expand Down Expand Up @@ -63,7 +64,8 @@
raw-txn (assoc :raw-txn raw-txn)
did (assoc :did did))
{:keys [status] :as commit-event}
(deref! (transact! consensus watcher ledger-id txn-with-ledger opts*))
(trace/form ::update-handler {}
(deref! (transact! consensus watcher ledger-id txn-with-ledger opts*)))

body (commit-event->response-body commit-event)]
(shared/with-tracking-headers {:status status, :body body}
Expand All @@ -77,7 +79,8 @@
raw-txn (assoc :raw-txn raw-txn)
did (assoc :identity did))
{:keys [status] :as commit-event}
(deref! (transact! consensus watcher ledger-id insert-txn opts*))
(trace/form ::insert-handler {}
(deref! (transact! consensus watcher ledger-id insert-txn opts*)))

body (commit-event->response-body commit-event)]
(shared/with-tracking-headers {:status status, :body body}
Expand All @@ -91,7 +94,8 @@
raw-txn (assoc :raw-txn raw-txn)
did (assoc :identity did))
{:keys [status] :as commit-event}
(deref! (transact! consensus watcher ledger-id upsert-txn opts*))
(trace/form ::upsert-handler {}
(deref! (transact! consensus watcher ledger-id upsert-txn opts*)))

body (commit-event->response-body commit-event)]
(shared/with-tracking-headers {:status status, :body body}
Expand Down
Loading