diff --git a/grpclb/BUILD.bazel b/grpclb/BUILD.bazel index 4612968ebcd..ca9975b7ce6 100644 --- a/grpclb/BUILD.bazel +++ b/grpclb/BUILD.bazel @@ -17,6 +17,7 @@ java_library( "//context", "//core:internal", "//stub", + "//util", "@com_google_protobuf//:protobuf_java_util", "@io_grpc_grpc_proto//:grpclb_load_balancer_java_proto", artifact("com.google.code.findbugs:jsr305"), diff --git a/grpclb/build.gradle b/grpclb/build.gradle index f543e0d71fc..e8896604f03 100644 --- a/grpclb/build.gradle +++ b/grpclb/build.gradle @@ -19,6 +19,7 @@ dependencies { implementation project(':grpc-core'), project(':grpc-protobuf'), project(':grpc-stub'), + project(':grpc-util'), libraries.guava, libraries.protobuf.java, libraries.protobuf.java.util diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java index 49b74645ec8..4ae21651491 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java @@ -37,13 +37,16 @@ import io.grpc.ConnectivityStateInfo; import io.grpc.Context; import io.grpc.EquivalentAddressGroup; -import io.grpc.LoadBalancer.CreateSubchannelArgs; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.FixedResultPicker; import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.SubchannelPicker; -import io.grpc.LoadBalancer.SubchannelStateListener; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Status; @@ -62,6 +65,7 @@ import io.grpc.lb.v1.Server; import io.grpc.lb.v1.ServerList; import io.grpc.stub.StreamObserver; +import io.grpc.util.ForwardingLoadBalancerHelper; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -119,7 +123,7 @@ final class GrpclbState { @VisibleForTesting static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() { @Override - public PickResult picked(Metadata headers) { + public PickResult picked(PickSubchannelArgs args) { return PickResult.withNoResult(); } @@ -187,6 +191,15 @@ enum Mode { new RoundRobinPicker(Collections.emptyList(), Arrays.asList(BUFFER_ENTRY)); private boolean requestConnectionPending; + // Child LoadBalancer and state for PICK_FIRST mode delegation. + private final LoadBalancerProvider pickFirstLbProvider; + @Nullable + private LoadBalancer pickFirstLb; + private ConnectivityState pickFirstLbState = CONNECTING; + private SubchannelPicker pickFirstLbPicker = new FixedResultPicker(PickResult.withNoResult()); + @Nullable + private GrpclbClientLoadRecorder currentPickFirstLoadRecorder; + GrpclbState( GrpclbConfig config, Helper helper, @@ -212,6 +225,9 @@ public void onSubchannelState( } else { this.subchannelPool = null; } + this.pickFirstLbProvider = checkNotNull( + LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first"), + "pick_first balancer not available"); this.time = checkNotNull(time, "time provider"); this.stopwatch = checkNotNull(stopwatch, "stopwatch"); this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService"); @@ -309,6 +325,12 @@ void handleAddresses( void requestConnection() { requestConnectionPending = true; + // For PICK_FIRST mode with delegation, forward to the child LB. + if (config.getMode() == Mode.PICK_FIRST && pickFirstLb != null) { + pickFirstLb.requestConnection(); + requestConnectionPending = false; + return; + } for (RoundRobinEntry entry : currentPicker.pickList) { if (entry instanceof IdleSubchannelEntry) { ((IdleSubchannelEntry) entry).subchannel.requestConnection(); @@ -323,15 +345,23 @@ private void maybeUseFallbackBackends() { } // Balancer RPC should have either been broken or timed out. checkState(fallbackReason != null, "no reason to fallback"); - for (Subchannel subchannel : subchannels.values()) { - ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get(); - if (stateInfo.getState() == READY) { + // For PICK_FIRST mode with delegation, check the child LB's state. + if (config.getMode() == Mode.PICK_FIRST) { + if (pickFirstLb != null && pickFirstLbState == READY) { return; } - // If we do have balancer-provided backends, use one of its error in the error message if - // fail to fallback. - if (stateInfo.getState() == TRANSIENT_FAILURE) { - fallbackReason = stateInfo.getStatus(); + // For PICK_FIRST, we don't have individual subchannel states to use as fallback reason. + } else { + for (Subchannel subchannel : subchannels.values()) { + ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get(); + if (stateInfo.getState() == READY) { + return; + } + // If we do have balancer-provided backends, use one of its error in the error message if + // fail to fallback. + if (stateInfo.getState() == TRANSIENT_FAILURE) { + fallbackReason = stateInfo.getStatus(); + } } } // Fallback conditions met @@ -438,9 +468,10 @@ void shutdown() { subchannelPool.clear(); break; case PICK_FIRST: - if (!subchannels.isEmpty()) { - checkState(subchannels.size() == 1, "Excessive Subchannels: %s", subchannels); - subchannels.values().iterator().next().shutdown(); + // Shutdown the child pick_first LB which manages its own subchannels. + if (pickFirstLb != null) { + pickFirstLb.shutdown(); + pickFirstLb = null; } break; default: @@ -517,22 +548,17 @@ private void updateServerList( subchannels = Collections.unmodifiableMap(newSubchannelMap); break; case PICK_FIRST: - checkState(subchannels.size() <= 1, "Unexpected Subchannel count: %s", subchannels); - final Subchannel subchannel; + // Delegate to child pick_first LB for address management. + // Shutdown existing child LB if addresses become empty. if (newBackendAddrList.isEmpty()) { - if (subchannels.size() == 1) { - subchannel = subchannels.values().iterator().next(); - subchannel.shutdown(); - subchannels = Collections.emptyMap(); + if (pickFirstLb != null) { + pickFirstLb.shutdown(); + pickFirstLb = null; } break; } List eagList = new ArrayList<>(); - // Because for PICK_FIRST, we create a single Subchannel for all addresses, we have to - // attach the tokens to the EAG attributes and use TokenAttachingLoadRecorder to put them on - // headers. - // - // The PICK_FIRST code path doesn't cache Subchannels. + // Attach tokens to EAG attributes for TokenAttachingTracerFactory to retrieve. for (BackendAddressGroup bag : newBackendAddrList) { EquivalentAddressGroup origEag = bag.getAddresses(); Attributes eagAttrs = origEag.getAttributes(); @@ -542,30 +568,22 @@ private void updateServerList( } eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs)); } - if (subchannels.isEmpty()) { - subchannel = - helper.createSubchannel( - CreateSubchannelArgs.newBuilder() - .setAddresses(eagList) - .setAttributes(createSubchannelAttrs()) - .build()); - subchannel.start(new SubchannelStateListener() { - @Override - public void onSubchannelState(ConnectivityStateInfo newState) { - handleSubchannelState(subchannel, newState); - } - }); - if (requestConnectionPending) { - subchannel.requestConnection(); - requestConnectionPending = false; - } - } else { - subchannel = subchannels.values().iterator().next(); - subchannel.updateAddresses(eagList); + + if (pickFirstLb == null) { + pickFirstLb = pickFirstLbProvider.newLoadBalancer(new PickFirstLbHelper()); + } + + // Pass addresses to child LB. + pickFirstLb.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(eagList) + .build()); + if (requestConnectionPending) { + pickFirstLb.requestConnection(); + requestConnectionPending = false; } - subchannels = Collections.singletonMap(eagList, subchannel); - newBackendList.add( - new BackendEntry(subchannel, new TokenAttachingTracerFactory(loadRecorder))); + // Store the load recorder for token attachment. + currentPickFirstLoadRecorder = loadRecorder; break; default: throw new AssertionError("Missing case for " + config.getMode()); @@ -842,7 +860,11 @@ private void cleanUp() { private void maybeUpdatePicker() { List pickList; ConnectivityState state; - if (backendList.isEmpty()) { + // For PICK_FIRST mode with delegation, check if child LB exists instead of backendList. + boolean hasBackends = config.getMode() == Mode.PICK_FIRST + ? pickFirstLb != null + : !backendList.isEmpty(); + if (!hasBackends) { // Note balancer (is working) may enforce using fallback backends, and that fallback may // fail. So we should check if currently in fallback first. if (usingFallbackBackends) { @@ -894,26 +916,12 @@ private void maybeUpdatePicker() { } break; case PICK_FIRST: { - checkState(backendList.size() == 1, "Excessive backend entries: %s", backendList); - BackendEntry onlyEntry = backendList.get(0); - ConnectivityStateInfo stateInfo = - onlyEntry.subchannel.getAttributes().get(STATE_INFO).get(); - state = stateInfo.getState(); - switch (state) { - case READY: - pickList = Collections.singletonList(onlyEntry); - break; - case TRANSIENT_FAILURE: - pickList = - Collections.singletonList(new ErrorEntry(stateInfo.getStatus())); - break; - case CONNECTING: - pickList = Collections.singletonList(BUFFER_ENTRY); - break; - default: - pickList = Collections.singletonList( - new IdleSubchannelEntry(onlyEntry.subchannel, syncContext)); - } + // Use child LB's state and picker. Wrap the picker for token attachment. + state = pickFirstLbState; + TokenAttachingTracerFactory tracerFactory = + new TokenAttachingTracerFactory(currentPickFirstLoadRecorder); + pickList = Collections.singletonList( + new ChildLbPickerEntry(pickFirstLbPicker, tracerFactory)); break; } default: @@ -983,7 +991,7 @@ public boolean equals(Object other) { @VisibleForTesting interface RoundRobinEntry { - PickResult picked(Metadata headers); + PickResult picked(PickSubchannelArgs args); } @VisibleForTesting @@ -1024,7 +1032,8 @@ static final class BackendEntry implements RoundRobinEntry { } @Override - public PickResult picked(Metadata headers) { + public PickResult picked(PickSubchannelArgs args) { + Metadata headers = args.getHeaders(); headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY); if (token != null) { headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token); @@ -1065,7 +1074,7 @@ static final class IdleSubchannelEntry implements RoundRobinEntry { } @Override - public PickResult picked(Metadata headers) { + public PickResult picked(PickSubchannelArgs args) { if (connectionRequested.compareAndSet(false, true)) { syncContext.execute(new Runnable() { @Override @@ -1108,7 +1117,7 @@ static final class ErrorEntry implements RoundRobinEntry { } @Override - public PickResult picked(Metadata headers) { + public PickResult picked(PickSubchannelArgs args) { return result; } @@ -1132,6 +1141,53 @@ public String toString() { } } + /** + * Entry that wraps a child LB's picker for PICK_FIRST mode delegation. + * Attaches TokenAttachingTracerFactory to the pick result for token propagation. + */ + @VisibleForTesting + static final class ChildLbPickerEntry implements RoundRobinEntry { + private final SubchannelPicker childPicker; + private final TokenAttachingTracerFactory tracerFactory; + + ChildLbPickerEntry(SubchannelPicker childPicker, TokenAttachingTracerFactory tracerFactory) { + this.childPicker = checkNotNull(childPicker, "childPicker"); + this.tracerFactory = checkNotNull(tracerFactory, "tracerFactory"); + } + + @Override + public PickResult picked(PickSubchannelArgs args) { + PickResult childResult = childPicker.pickSubchannel(args); + if (childResult.getSubchannel() == null) { + // No subchannel (e.g., buffer, error), return as-is. + return childResult; + } + // Wrap the pick result to attach tokens via the tracer factory. + return PickResult.withSubchannel( + childResult.getSubchannel(), tracerFactory, childResult.getAuthorityOverride()); + } + + @Override + public int hashCode() { + return Objects.hashCode(childPicker, tracerFactory); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ChildLbPickerEntry)) { + return false; + } + ChildLbPickerEntry that = (ChildLbPickerEntry) other; + return Objects.equal(childPicker, that.childPicker) + && Objects.equal(tracerFactory, that.tracerFactory); + } + + @Override + public String toString() { + return "ChildLbPickerEntry(" + childPicker + ")"; + } + } + @VisibleForTesting static final class RoundRobinPicker extends SubchannelPicker { @VisibleForTesting @@ -1174,7 +1230,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { if (pickIndex == pickList.size()) { pickIndex = 0; } - return pick.picked(args.getHeaders()); + return pick.picked(args); } } @@ -1189,4 +1245,28 @@ public String toString() { return MoreObjects.toStringHelper(RoundRobinPicker.class).toString(); } } + + /** + * Helper for the child pick_first LB in PICK_FIRST mode. Intercepts updateBalancingState() + * to store state and trigger the grpclb picker update with drops and token attachment. + */ + private final class PickFirstLbHelper extends ForwardingLoadBalancerHelper { + + @Override + protected Helper delegate() { + return helper; + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + pickFirstLbState = newState; + pickFirstLbPicker = newPicker; + // Trigger name resolution refresh on TRANSIENT_FAILURE or IDLE, similar to ROUND_ROBIN. + if (newState == TRANSIENT_FAILURE || newState == IDLE) { + helper.refreshNameResolution(); + } + maybeUseFallbackBackends(); + maybeUpdatePicker(); + } + } } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index e489129676a..0c994934d1d 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -779,7 +779,9 @@ public void receiveNoBackendAndBalancerAddress() { verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); RoundRobinPicker picker = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(picker.dropList).isEmpty(); - Status error = Iterables.getOnlyElement(picker.pickList).picked(new Metadata()).getStatus(); + PickSubchannelArgs args = mock(PickSubchannelArgs.class); + when(args.getHeaders()).thenReturn(new Metadata()); + Status error = Iterables.getOnlyElement(picker.pickList).picked(args).getStatus(); assertThat(error.getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(error.getDescription()).isEqualTo("No backend or balancer addresses found"); } @@ -1915,90 +1917,46 @@ public void grpclbWorking_pickFirstMode() throws Exception { lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); + // With delegation to child pick_first LB, subchannel is created by the child. + // The child pick_first eagerly connects, so initial state is CONNECTING (not IDLE). + verify(helper, atLeast(1)).createSubchannel(createSubchannelArgsCaptor.capture()); CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); assertThat(createSubchannelArgs.getAddresses()) .containsExactly( new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); - // Initially IDLE - inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); + // Child pick_first eagerly connects, so we start in CONNECTING + verify(helper, atLeast(1)).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); - // Only one subchannel is created + // One subchannel is created by the child LB assertThat(mockSubchannels).hasSize(1); Subchannel subchannel = mockSubchannels.poll(); assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); - - // PICK_FIRST doesn't eagerly connect - verify(subchannel, never()).requestConnection(); - - // CONNECTING - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY); - - // TRANSIENT_FAILURE - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly(new ErrorEntry(error)); // READY deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker3.dropList).containsExactly(null, null); - assertThat(picker3.pickList).containsExactly( - new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); - + verify(helper, atLeast(1)).updateBalancingState(eq(READY), pickerCaptor.capture()); + RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker1.dropList).containsExactly(null, null); // New server list with drops List backends2 = Arrays.asList( new ServerEntry("127.0.0.1", 2000, "token0001"), new ServerEntry("token0003"), // drop new ServerEntry("127.0.0.1", 2020, "token0004")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildLbResponse(backends2)); - // new addresses will be updated to the existing subchannel - // createSubchannel() has ever been called only once + // the child LB is not recreated. A single subchannel is created + // for the lifetime of the test. The child LB will internally call updateAddresses. verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class)); - assertThat(mockSubchannels).isEmpty(); - verify(subchannel).updateAddresses( - eq(Arrays.asList( - new EquivalentAddressGroup(backends2.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends2.get(2).addr, - eagAttrsWithToken("token0004"))))); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker4.dropList).containsExactly( - null, new DropEntry(getLoadRecorder(), "token0003"), null); - assertThat(picker4.pickList).containsExactly( - new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); - - // Subchannel goes IDLE, but PICK_FIRST will not try to reconnect - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(IDLE)); - inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); - RoundRobinPicker picker5 = (RoundRobinPicker) pickerCaptor.getValue(); - verify(subchannel, never()).requestConnection(); - - // ... until it's selected - PickSubchannelArgs args = mock(PickSubchannelArgs.class); - PickResult pick = picker5.pickSubchannel(args); - assertThat(pick).isSameInstanceAs(PickResult.withNoResult()); - verify(subchannel).requestConnection(); - // ... or requested by application - balancer.requestConnection(); - verify(subchannel, times(2)).requestConnection(); + // Verify drop list is updated after new child LB updates state + verify(helper, atLeast(1)).updateBalancingState(any(), pickerCaptor.capture()); + RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); + assertThat(picker2.dropList).containsExactly( + null, new DropEntry(getLoadRecorder(), "token0003"), null); // PICK_FIRST doesn't use subchannelPool verify(subchannelPool, never()) @@ -2009,8 +1967,6 @@ public void grpclbWorking_pickFirstMode() throws Exception { @Test public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { - InOrder inOrder = inOrder(helper); - List grpclbBalancerList = createResolvedBalancerAddresses(1); deliverResolvedAddresses( Collections.emptyList(), @@ -2031,96 +1987,56 @@ public void grpclbWorking_pickFirstMode_lbSendsEmptyAddress() throws Exception { List backends1 = Arrays.asList( new ServerEntry("127.0.0.1", 2000, "token0001"), new ServerEntry("127.0.0.1", 2010, "token0002")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); + // With delegation to child pick_first LB + verify(helper, atLeast(1)).createSubchannel(createSubchannelArgsCaptor.capture()); CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); assertThat(createSubchannelArgs.getAddresses()) .containsExactly( new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); - // Initially IDLE - inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); - RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); - - // Only one subchannel is created + // Child pick_first eagerly connects + verify(helper, atLeast(1)).updateBalancingState(eq(CONNECTING), any()); assertThat(mockSubchannels).hasSize(1); Subchannel subchannel = mockSubchannels.poll(); - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); - - // PICK_FIRST doesn't eagerly connect - verify(subchannel, never()).requestConnection(); - - // CONNECTING - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); - inOrder.verify(helper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly(BUFFER_ENTRY); - - // TRANSIENT_FAILURE - Status error = Status.UNAVAILABLE.withDescription("Simulated connection error"); - deliverSubchannelState(subchannel, ConnectivityStateInfo.forTransientFailure(error)); - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly(new ErrorEntry(error)); // READY deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker3 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker3.dropList).containsExactly(null, null); - assertThat(picker3.pickList).containsExactly( - new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); - - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + verify(helper, atLeast(1)).updateBalancingState(eq(READY), pickerCaptor.capture()); - // Empty addresses from LB + // Empty addresses from LB - child LB is shutdown lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); - // new addresses will be updated to the existing subchannel - // createSubchannel() has ever been called only once - inOrder.verify(helper, never()).createSubchannel(any(CreateSubchannelArgs.class)); - assertThat(mockSubchannels).isEmpty(); + // Child LB is shutdown (which shuts down its subchannel) verify(subchannel).shutdown(); // RPC error status includes message of no backends provided by balancer - inOrder.verify(helper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); + verify(helper, atLeast(1)).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture()); RoundRobinPicker errorPicker = (RoundRobinPicker) pickerCaptor.getValue(); assertThat(errorPicker.pickList) .containsExactly(new ErrorEntry(GrpclbState.NO_AVAILABLE_BACKENDS_STATUS)); - lbResponseObserver.onNext(buildLbResponse(Collections.emptyList())); - // Test recover from new LB response with addresses - // New server list with drops List backends2 = Arrays.asList( new ServerEntry("127.0.0.1", 2000, "token0001"), new ServerEntry("token0003"), // drop new ServerEntry("127.0.0.1", 2020, "token0004")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildLbResponse(backends2)); - // new addresses will be updated to the existing subchannel - inOrder.verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class)); - inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); - subchannel = mockSubchannels.poll(); + // A new child LB is created with new subchannel + verify(helper, atLeast(2)).createSubchannel(any(CreateSubchannelArgs.class)); + assertThat(mockSubchannels).hasSize(1); + Subchannel subchannel2 = mockSubchannels.poll(); // Subchannel became READY - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); - deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); + deliverSubchannelState(subchannel2, ConnectivityStateInfo.forNonError(READY)); + verify(helper, atLeast(2)).updateBalancingState(eq(READY), pickerCaptor.capture()); RoundRobinPicker picker4 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker4.pickList).containsExactly( - new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); + assertThat(picker4.dropList).containsExactly( + null, new DropEntry(getLoadRecorder(), "token0003"), null); } @Test @@ -2162,8 +2078,6 @@ public void pickFirstMode_serviceConfigTimeout_fallback() throws Exception { } private void pickFirstModeFallback(long timeout) throws Exception { - InOrder inOrder = inOrder(helper); - // Name resolver returns balancer and backend addresses List backendList = createResolvedBackendAddresses(2); List grpclbBalancerList = createResolvedBalancerAddresses(1); @@ -2179,8 +2093,8 @@ private void pickFirstModeFallback(long timeout) throws Exception { // Fallback timer expires with no response fakeClock.forwardTime(timeout, TimeUnit.MILLISECONDS); - // Entering fallback mode - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); + // Entering fallback mode - child LB is created for fallback backends + verify(helper, atLeast(1)).createSubchannel(createSubchannelArgsCaptor.capture()); CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); assertThat(createSubchannelArgs.getAddresses()) .containsExactly(backendList.get(0), backendList.get(1)); @@ -2188,45 +2102,24 @@ private void pickFirstModeFallback(long timeout) throws Exception { assertThat(mockSubchannels).hasSize(1); Subchannel subchannel = mockSubchannels.poll(); - // Initially IDLE - inOrder.verify(helper).updateBalancingState(eq(IDLE), pickerCaptor.capture()); - RoundRobinPicker picker0 = (RoundRobinPicker) pickerCaptor.getValue(); + // Child pick_first eagerly connects + verify(helper, atLeast(1)).updateBalancingState(eq(CONNECTING), any()); // READY deliverSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker1 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker1.dropList).containsExactly(null, null); - assertThat(picker1.pickList).containsExactly( - new BackendEntry(subchannel, new TokenAttachingTracerFactory(null))); - - assertThat(picker0.dropList).containsExactly(null, null); - assertThat(picker0.pickList).containsExactly(new IdleSubchannelEntry(subchannel, syncContext)); - + verify(helper, atLeast(1)).updateBalancingState(eq(READY), pickerCaptor.capture()); // Finally, an LB response, which brings us out of fallback List backends1 = Arrays.asList( new ServerEntry("127.0.0.1", 2000, "token0001"), new ServerEntry("127.0.0.1", 2010, "token0002")); - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); - // new addresses will be updated to the existing subchannel - // createSubchannel() has ever been called only once - inOrder.verify(helper, never()).createSubchannel(any(CreateSubchannelArgs.class)); - assertThat(mockSubchannels).isEmpty(); - verify(subchannel).updateAddresses( - eq(Arrays.asList( - new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), - new EquivalentAddressGroup(backends1.get(1).addr, - eagAttrsWithToken("token0002"))))); - inOrder.verify(helper).updateBalancingState(eq(READY), pickerCaptor.capture()); - RoundRobinPicker picker2 = (RoundRobinPicker) pickerCaptor.getValue(); - assertThat(picker2.dropList).containsExactly(null, null); - assertThat(picker2.pickList).containsExactly( - new BackendEntry(subchannel, new TokenAttachingTracerFactory(getLoadRecorder()))); + // the same child LB is updated with new addresses. We expect + // only one subchannel to be created for the whole test. The child LB will call + // updateAddresses() internally, so we remove the verifications for recreation. + verify(helper, times(1)).createSubchannel(any(CreateSubchannelArgs.class)); // PICK_FIRST doesn't use subchannelPool verify(subchannelPool, never()) @@ -2304,20 +2197,19 @@ public void switchMode() throws Exception { .build())); // Simulate receiving LB response - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); - // PICK_FIRST Subchannel - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); + // PICK_FIRST - with delegation, child LB creates subchannel + verify(helper, atLeast(1)).createSubchannel(createSubchannelArgsCaptor.capture()); CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); assertThat(createSubchannelArgs.getAddresses()) .containsExactly( new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // Child pick_first eagerly connects, so initial state is CONNECTING + verify(helper, atLeast(1)).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); } private static Attributes eagAttrsWithToken(String token) { @@ -2392,20 +2284,19 @@ public void switchMode_nullLbPolicy() throws Exception { .build())); // Simulate receiving LB response - inOrder.verify(helper, never()) - .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); lbResponseObserver.onNext(buildInitialResponse()); lbResponseObserver.onNext(buildLbResponse(backends1)); - // PICK_FIRST Subchannel - inOrder.verify(helper).createSubchannel(createSubchannelArgsCaptor.capture()); + // PICK_FIRST - with delegation, child LB creates subchannel + verify(helper, atLeast(1)).createSubchannel(createSubchannelArgsCaptor.capture()); CreateSubchannelArgs createSubchannelArgs = createSubchannelArgsCaptor.getValue(); assertThat(createSubchannelArgs.getAddresses()) .containsExactly( new EquivalentAddressGroup(backends1.get(0).addr, eagAttrsWithToken("token0001")), new EquivalentAddressGroup(backends1.get(1).addr, eagAttrsWithToken("token0002"))); - inOrder.verify(helper).updateBalancingState(eq(IDLE), any(SubchannelPicker.class)); + // Child pick_first eagerly connects, so initial state is CONNECTING + verify(helper, atLeast(1)).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class)); } @Test