Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
import io.grpc.xds.XdsConfig.XdsClusterConfig;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
// import io.grpc.xds.client.BackendMetricPropagation;]
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
Expand Down
47 changes: 23 additions & 24 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,8 @@ private abstract class XdsWatcherBase<T extends ResourceUpdate>

@Nullable
private StatusOr<T> data;
@Nullable
private Status ambientError; // To hold transient errors


private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
Expand All @@ -640,42 +642,39 @@ private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
}

@Override
public void onError(Status error) {
checkNotNull(error, "error");
public void onResourceChanged(StatusOr<T> update) {
if (cancelled) {
return;
}
// Don't update configuration on error, if we've already received configuration
if (!hasDataValue()) {
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s: %s",
toContextString(), error.getCode(), error.getDescription())));
maybePublishConfig();
}
}
ambientError = null;
if (update.hasValue()) {
data = update;
subscribeToChildren(update.getValue());
} else {
Status status = update.getStatus();
Status translatedStatus = Status.UNAVAILABLE.withDescription(
String.format("Error retrieving %s: %s. Details: %s%s",
toContextString(),
status.getCode(),
status.getDescription() != null ? status.getDescription() : "",
nodeInfo()));

@Override
public void onResourceDoesNotExist(String resourceName) {
if (cancelled) {
return;
data = StatusOr.fromStatus(translatedStatus);
}

checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
this.data = StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
toContextString() + " does not exist" + nodeInfo()));
maybePublishConfig();
}

@Override
public void onChanged(T update) {
checkNotNull(update, "update");
public void onAmbientError(Status error) {
if (cancelled) {
return;
}

this.data = StatusOr.fromValue(update);
subscribeToChildren(update);
maybePublishConfig();
ambientError = error.withDescription(
String.format("Ambient error for %s: %s. Details: %s%s",
toContextString(),
error.getCode(),
error.getDescription(),
nodeInfo()));
}

protected abstract void subscribeToChildren(T update);
Expand Down
153 changes: 78 additions & 75 deletions xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.GrpcUtil;
Expand Down Expand Up @@ -382,18 +383,38 @@ private DiscoveryState(String resourceName) {
}

@Override
public void onChanged(final LdsUpdate update) {
public void onResourceChanged(final StatusOr<LdsUpdate> update) {
if (stopped) {
return;
}
logger.log(Level.FINEST, "Received Lds update {0}", update);
if (update.listener() == null) {
onResourceDoesNotExist("Non-API");

if (!update.hasValue()) {
// This is a definitive resource error (e.g., NOT_FOUND).
// We must treat the resource as unavailable and tear down the server.
Status status = update.getStatus();
StatusException statusException = Status.UNAVAILABLE.withDescription(
String.format("Listener %s unavailable: %s", resourceName, status.getDescription()))
.withCause(status.asException())
.asException();
handleConfigNotFoundOrMismatch(statusException);
return;
}

String ldsAddress = update.listener().address();
if (ldsAddress == null || update.listener().protocol() != Protocol.TCP
// The original 'onChanged' logic starts here.
final LdsUpdate ldsUpdate = update.getValue();
logger.log(Level.FINEST, "Received Lds update {0}", ldsUpdate);
if (ldsUpdate.listener() == null) {
handleConfigNotFoundOrMismatch(
Status.NOT_FOUND.withDescription("Listener is null in LdsUpdate").asException());
return;
}

// This check is now covered by the '!update.hasValue()' block above.
// The original check was: if (update.listener() == null)

// The ipAddressesMatch function and its logic remain critical.
String ldsAddress = ldsUpdate.listener().address();
if (ldsAddress == null || ldsUpdate.listener().protocol() != Protocol.TCP
|| !ipAddressesMatch(ldsAddress)) {
handleConfigNotFoundOrMismatch(
Status.UNKNOWN.withDescription(
Expand All @@ -402,21 +423,19 @@ public void onChanged(final LdsUpdate update) {
listenerAddress, ldsAddress)).asException());
return;
}

// The rest of the logic is a direct copy from the original onChanged method.
if (!pendingRds.isEmpty()) {
// filter chain state has not yet been applied to filterChainSelectorManager and there
// are two sets of sslContextProviderSuppliers, so we release the old ones.
releaseSuppliersInFlight();
pendingRds.clear();
}

filterChains = update.listener().filterChains();
defaultFilterChain = update.listener().defaultFilterChain();
// Filters are loaded even if the server isn't serving yet.
filterChains = ldsUpdate.listener().filterChains();
defaultFilterChain = ldsUpdate.listener().defaultFilterChain();
updateActiveFilters();

List<FilterChain> allFilterChains = filterChains;
List<FilterChain> allFilterChains = new ArrayList<>(filterChains);
if (defaultFilterChain != null) {
allFilterChains = new ArrayList<>(filterChains);
allFilterChains.add(defaultFilterChain);
}

Expand Down Expand Up @@ -450,43 +469,36 @@ public void onChanged(final LdsUpdate update) {
}
}

private boolean ipAddressesMatch(String ldsAddress) {
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
return false;
}
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
return listenerIp.equals(ldsIp);
}

@Override
public void onResourceDoesNotExist(final String resourceName) {
if (stopped) {
return;
}
StatusException statusException = Status.UNAVAILABLE.withDescription(
String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
xdsClient.getBootstrapInfo().node().getId())).asException();
handleConfigNotFoundOrMismatch(statusException);
}

@Override
public void onError(final Status error) {
public void onAmbientError(final Status error) {
if (stopped) {
return;
}
// This logic is preserved from the original 'onError' method.
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);

// If the server isn't serving yet, a transient error is a startup failure.
// If it is already serving, we ignore it to prevent an outage.
if (!isServing) {
listener.onNotServing(errorWithNodeId.asException());
}
}

private boolean ipAddressesMatch(String ldsAddress) {
HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
|| ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
return false;
}
InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
return listenerIp.equals(ldsIp);
}

private void shutdown() {
stopped = true;
cleanUpRouteDiscoveryStates();
Expand Down Expand Up @@ -775,54 +787,45 @@ private RouteDiscoveryState(String resourceName) {
}

@Override
public void onChanged(final RdsUpdate update) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
}
if (savedVirtualHosts == null && !isPending) {
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
}
savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
updateRdsRoutingConfig();
maybeUpdateSelector();
public void onResourceChanged(final StatusOr<RdsUpdate> update) {
syncContext.execute(() -> {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return; // Watcher has been cancelled.
}
});
}

@Override
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
if (update.hasValue()) {
// This is a successful update, taken from the original onChanged.
if (savedVirtualHosts == null && !isPending) {
logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
}
logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
savedVirtualHosts = ImmutableList.copyOf(update.getValue().virtualHosts);
} else {
// This is a definitive resource error, taken from onResourceDoesNotExist.
logger.log(Level.WARNING, "Rds {0} unavailable: {1}",
new Object[]{resourceName, update.getStatus()});
savedVirtualHosts = null;
updateRdsRoutingConfig();
maybeUpdateSelector();
}
// In both cases, a change has occurred that requires a config update.
updateRdsRoutingConfig();
maybeUpdateSelector();
});
}

@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return;
}
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
new Object[]{resourceName, errorWithNodeId});
maybeUpdateSelector();
public void onAmbientError(final Status error) {
syncContext.execute(() -> {
if (!routeDiscoveryStates.containsKey(resourceName)) {
return; // Watcher has been cancelled.
}
// This is a transient error, taken from the original onError.
String description = error.getDescription() == null ? "" : error.getDescription() + " ";
Status errorWithNodeId = error.withDescription(
description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
new Object[]{resourceName, errorWithNodeId});

// Per gRFC A88, ambient errors should not trigger a configuration change.
// Therefore, we do NOT call maybeUpdateSelector() here.
});
}

Expand Down
6 changes: 4 additions & 2 deletions xds/src/main/java/io/grpc/xds/client/BootstrapperImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,15 @@ private List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLogger lo
Object implSpecificConfig = getImplSpecificConfig(serverConfig, serverUri);

boolean resourceTimerIsTransientError = false;
boolean ignoreResourceDeletion = false;
boolean ignoreResourceDeletion = xdsDataErrorHandlingEnabled;
// "For forward compatibility reasons, the client will ignore any entry in the list that it
// does not understand, regardless of type."
List<?> serverFeatures = JsonUtil.getList(serverConfig, "server_features");
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
if (serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION)) {
ignoreResourceDeletion = true;
}
resourceTimerIsTransientError = xdsDataErrorHandlingEnabled
&& serverFeatures.contains(SERVER_FEATURE_RESOURCE_TIMER_IS_TRANSIENT_ERROR);
}
Expand Down
42 changes: 14 additions & 28 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,44 +453,30 @@ private void handleRpcStreamClosed(Status status) {
stopwatch.reset();
}

Status newStatus = status;
if (responseReceived) {
// A closed ADS stream after a successful response is not considered an error. Servers may
// close streams for various reasons during normal operation, such as load balancing or
// underlying connection hitting its max connection age limit (see gRFC A9).
if (!status.isOk()) {
newStatus = Status.OK;
logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
+ "response was received, so this will not be treated as an error. Cause: {2}",
status.getCode(), status.getDescription(), status.getCause());
} else {
logger.log(XdsLogLevel.DEBUG,
"ADS stream closed by server after a response was received");
}
} else {
// If the ADS stream is closed without ever having received a response from the server, then
// the XdsClient should consider that a connectivity error (see gRFC A57).
Status statusToPropagate = status;
if (!responseReceived && status.isOk()) {
// If the ADS stream is closed with OK without ever having received a response,
// it is a connectivity error.
statusToPropagate = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
}
if (!statusToPropagate.isOk()) {
inError = true;
if (status.isOk()) {
newStatus = Status.UNAVAILABLE.withDescription(
"ADS stream closed with OK before receiving a response");
}
logger.log(
XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
logger.log(XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
statusToPropagate.getCode(), statusToPropagate.getDescription(),
statusToPropagate.getCause());
}

close(newStatus.asException());

// FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
// to avoid TSAN races, since tests may wait until callbacks are called but then would run
// concurrently with the stopwatch and schedule.
long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
close(status.asException());
rpcRetryTimer =
syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);

xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived);
// Notify the handler of the stream closure before cleaning up the stream state.
xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived);
Comment on lines +478 to +479

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The original code uses newStatus for logging the error, but the stream closure is notified with the original status. This could lead to confusion when debugging. It's better to use the statusToPropagate for both logging and notification.

        logger.log(
            XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
            statusToPropagate.getCode(), statusToPropagate.getDescription(), statusToPropagate.getCause());
      }

      close(status.asException());
      // Notify the handler of the stream closure before cleaning up the stream state.
      xdsResponseHandler.handleStreamClosed(statusToPropagate, !responseReceived);

Rules References

}

private void close(Exception error) {
Expand Down
Loading
Loading