Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions instrumentation/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@
| System property | Type | Default | Description |
| -------------------------------------------------------------- | ------- | ------- | --------------------------------------------------- |
| `otel.instrumentation.opensearch.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |

## Settings for the [OpenSearch Java Client](https://docs.opensearch.org/latest/clients/java/) instrumentation

| System property | Type | Default | Description |
|-----------------------------------------------------------------|---------| ------- |------------------------------------------------------|
| `otel.instrumentation.opensearch.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |
| `otel.instrumentation.opensearch.capture-search-query` | Boolean | `false` | Enable the capture of sanitized search query bodies. |
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ dependencies {
library("org.opensearch.client:opensearch-java:3.0.0")
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
implementation("com.fasterxml.jackson.core:jackson-databind")

testImplementation("org.opensearch.client:opensearch-rest-client:3.0.0")
testImplementation(project(":instrumentation:opensearch:opensearch-rest-common:testing"))
testInstrumentation(project(":instrumentation:apache-httpclient:apache-httpclient-5.0:javaagent"))

// For testing AwsSdk2Transport
// AwsSdk2Transport supports awssdk version 2.26.0
testInstrumentation(project(":instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent"))
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
testImplementation("software.amazon.awssdk:auth:2.22.0")
testImplementation("software.amazon.awssdk:identity-spi:2.22.0")
testImplementation("software.amazon.awssdk:apache-client:2.22.0")
testImplementation("software.amazon.awssdk:netty-nio-client:2.22.0")
testImplementation("software.amazon.awssdk:regions:2.22.0")
testImplementation("software.amazon.awssdk:auth:2.26.0")
testImplementation("software.amazon.awssdk:identity-spi:2.26.0")
testImplementation("software.amazon.awssdk:apache-client:2.26.0")
testImplementation("software.amazon.awssdk:netty-nio-client:2.26.0")
testImplementation("software.amazon.awssdk:netty-nio-client:2.26.0")
testImplementation("software.amazon.awssdk:regions:2.26.0")
}

tasks {
Expand All @@ -39,14 +41,47 @@ tasks {
systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false")
}

test {
filter {
excludeTestsMatching("OpenSearchCaptureSearchQueryTest")
}
}

val testCaptureSearchQuery by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath

filter {
includeTestsMatching("OpenSearchCaptureSearchQueryTest")
}
jvmArgs("-Dotel.instrumentation.opensearch.capture-search-query=true")
}

val testStableSemconv by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath

filter {
excludeTestsMatching("OpenSearchCaptureSearchQueryTest")
}
jvmArgs("-Dotel.semconv-stability.opt-in=database")
systemProperty("metadataConfig", "otel.semconv-stability.opt-in=database")
}

val testCaptureSearchQueryStableSemconv by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath

filter {
includeTestsMatching("OpenSearchCaptureSearchQueryTest")
}
jvmArgs("-Dotel.instrumentation.opensearch.capture-search-query=true")
jvmArgs("-Dotel.semconv-stability.opt-in=database")
}

check {
dependsOn(testCaptureSearchQuery)
dependsOn(testStableSemconv)
dependsOn(testCaptureSearchQueryStableSemconv)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ public String getDbNamespace(OpenSearchRequest request) {
@Override
@Nullable
public String getDbQueryText(OpenSearchRequest request) {
return request.getMethod() + " " + request.getOperation();
// keep the previous logic in case of failure to extract the query body
if (request.getBody() == null) {
return request.getMethod() + " " + request.getOperation();
} else {
return request.getBody();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;

import static java.util.logging.Level.FINE;

import jakarta.json.stream.JsonGenerator;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.apache.hc.core5.http.ContentType;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.NdJsonpSerializable;
import org.opensearch.client.transport.GenericSerializable;

public class OpenSearchBodyExtractor {

private static final Logger logger = Logger.getLogger(OpenSearchBodyExtractor.class.getName());

@Nullable
public static String extract(JsonpMapper mapper, Object request) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();

if (request instanceof NdJsonpSerializable) {
writeNdJson(mapper, (NdJsonpSerializable) request, baos);
} else if (request instanceof GenericSerializable) {
ContentType.parse(((GenericSerializable) request).serialize(baos));
} else {
JsonGenerator generator = mapper.jsonProvider().createGenerator(baos);
mapper.serialize(request, generator);
generator.close();
}

String body = baos.toString(StandardCharsets.UTF_8);
return body.isEmpty() ? null : body;
} catch (RuntimeException e) {
logger.log(FINE, "Failure extracting body", e);
return null;
}
}

private static void writeNdJson(
JsonpMapper mapper, NdJsonpSerializable value, ByteArrayOutputStream baos) {
try {
Iterator<?> values = value._serializables();
while (values.hasNext()) {
Object item = values.next();
if (item instanceof NdJsonpSerializable && item != value) {
// do not recurse on the item itself
writeNdJson(mapper, (NdJsonpSerializable) item, baos);
} else {
JsonGenerator generator = mapper.jsonProvider().createGenerator(baos);
mapper.serialize(item, generator);
generator.close();
baos.write('\n');
}
}
} catch (RuntimeException e) {
logger.log(FINE, "Failure serializing NdJson", e);
}
}

private OpenSearchBodyExtractor() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;

import static java.util.logging.Level.FINE;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

public class OpenSearchBodySanitizer {

private static final Logger logger = Logger.getLogger(OpenSearchBodySanitizer.class.getName());

private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new ObjectMapper();
private static final String MASKED_VALUE = "?";
private static final OpenSearchBodySanitizer DEFAULT_INSTANCE =
new OpenSearchBodySanitizer(DEFAULT_OBJECT_MAPPER);

private final ObjectMapper objectMapper;

private OpenSearchBodySanitizer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

public static OpenSearchBodySanitizer create() {
return new OpenSearchBodySanitizer(DEFAULT_OBJECT_MAPPER);
}

public static OpenSearchBodySanitizer create(ObjectMapper objectMapper) {
return new OpenSearchBodySanitizer(objectMapper);
}

public static OpenSearchBodySanitizer getDefault() {
return DEFAULT_INSTANCE;
}

public static String sanitize(String jsonString) {
return DEFAULT_INSTANCE.sanitizeInstance(jsonString);
}

public String sanitizeInstance(String jsonString) {
if (jsonString == null) {
return null;
}

List<String> queries = QuerySplitter.splitQueries(jsonString);
if (queries.isEmpty()) {
return null;
}

List<String> sanitizedQueries = new ArrayList<>();
for (String query : queries) {
String sanitized = sanitizeSingleQuery(query);
sanitizedQueries.add(sanitized);
}

return QuerySplitter.joinQueries(sanitizedQueries);
}

private String sanitizeSingleQuery(String query) {
try {
JsonNode rootNode = objectMapper.readTree(query);
JsonNode sanitizedNode = sanitizeNode(rootNode);
return objectMapper.writeValueAsString(sanitizedNode);
} catch (Exception e) {
logger.log(FINE, "Failure sanitizing single query", e);
return query;
}
}

private JsonNode sanitizeNode(JsonNode node) {
if (node == null || node.isNull()) {
return node;
}

if (node.isTextual()) {
return new TextNode(MASKED_VALUE);
}

if (node.isNumber() || node.isBoolean()) {
return new TextNode(MASKED_VALUE);
}

if (node.isArray()) {
ArrayNode arrayNode = objectMapper.createArrayNode();
for (JsonNode element : node) {
arrayNode.add(sanitizeNode(element));
}
return arrayNode;
}

if (node.isObject()) {
ObjectNode objectNode = objectMapper.createObjectNode();

for (Map.Entry<String, JsonNode> field : node.properties()) {
String key = field.getKey();
JsonNode value = field.getValue();

objectNode.set(key, sanitizeNode(value));
}
return objectNode;
}

return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;

import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;

@AutoValue
public abstract class OpenSearchRequest {

public static OpenSearchRequest create(String method, String endpoint) {
return new AutoValue_OpenSearchRequest(method, endpoint);
public static OpenSearchRequest create(String method, String endpoint, @Nullable String body) {
return new AutoValue_OpenSearchRequest(method, endpoint, body);
}

public abstract String getMethod();

public abstract String getOperation();

@Nullable
public abstract String getBody();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig;

public final class OpenSearchSingletons {
private static final Instrumenter<OpenSearchRequest, Void> INSTRUMENTER = createInstrumenter();

public static final boolean CAPTURE_SEARCH_QUERY =
AgentInstrumentationConfig.get()
.getBoolean("otel.instrumentation.opensearch.capture-search-query", false);

public static Instrumenter<OpenSearchRequest, Void> instrumenter() {
return INSTRUMENTER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.opensearch.core.MsearchRequest;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.OpenSearchTransport;

public class OpenSearchTransportInstrumentation implements TypeInstrumentation {
@Override
Expand Down Expand Up @@ -60,10 +64,22 @@ private AdviceScope(OpenSearchRequest otelRequest, Context context, Scope scope)
}

@Nullable
public static AdviceScope start(Object request, Endpoint<Object, Object, Object> endpoint) {
public static AdviceScope start(
Object request, Endpoint<Object, Object, Object> endpoint, JsonpMapper jsonpMapper) {
Context parentContext = Context.current();

String queryBody = null;

if (OpenSearchSingletons.CAPTURE_SEARCH_QUERY
&& (request instanceof SearchRequest || request instanceof MsearchRequest)) {
String rawBody = OpenSearchBodyExtractor.extract(jsonpMapper, request);
queryBody = OpenSearchBodySanitizer.sanitize(rawBody);
}

OpenSearchRequest otelRequest =
OpenSearchRequest.create(endpoint.method(request), endpoint.requestUrl(request));
OpenSearchRequest.create(
endpoint.method(request), endpoint.requestUrl(request), queryBody);

if (!instrumenter().shouldStart(parentContext, otelRequest)) {
return null;
}
Expand Down Expand Up @@ -94,9 +110,10 @@ public static class PerformRequestAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static AdviceScope onEnter(
@Advice.This OpenSearchTransport openSearchTransport,
@Advice.Argument(0) Object request,
@Advice.Argument(1) Endpoint<Object, Object, Object> endpoint) {
return AdviceScope.start(request, endpoint);
return AdviceScope.start(request, endpoint, openSearchTransport.jsonpMapper());
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand All @@ -114,9 +131,11 @@ public static class PerformRequestAsyncAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static Object[] onEnter(
@Advice.This OpenSearchTransport openSearchTransport,
@Advice.Argument(0) Object request,
@Advice.Argument(1) Endpoint<Object, Object, Object> endpoint) {
AdviceScope adviceScope = AdviceScope.start(request, endpoint);
AdviceScope adviceScope =
AdviceScope.start(request, endpoint, openSearchTransport.jsonpMapper());
return new Object[] {adviceScope};
}

Expand Down
Loading
Loading