Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agents/grpc/src/command_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ CommandStream::~CommandStream() {
nsuv::ns_mutex::scoped_lock lock(lock_);
// try cancel and wait until OnDone is called
if (!write_state_.done) {
cancelling_for_destruction_ = true;
context_.TryCancel();
do {
uv_cond_wait(&on_done_cond_, lock_.base());
Expand All @@ -53,12 +54,11 @@ void CommandStream::OnDone(const Status& s) {
nsuv::ns_mutex::scoped_lock lock(lock_);
write_state_.done = true;
uv_cond_signal(&on_done_cond_);
if (!obs) {
if (!obs || cancelling_for_destruction_) {
return;
}
}

// Don't notify the observer if the stream was cancelled (destroyed)
obs->on_command_stream_done(s);
}

Expand Down
1 change: 1 addition & 0 deletions agents/grpc/src/command_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class CommandStream:
TSQueue<grpcagent::CommandResponse> response_q_;
nsuv::ns_mutex lock_;
uv_cond_t on_done_cond_;
bool cancelling_for_destruction_ = false;
};

} // namespace grpc
Expand Down
55 changes: 55 additions & 0 deletions test/agents/test-grpc-basic.mjs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
// Flags: --expose-internals
import { mustCall, mustSucceed } from '../common/index.mjs';
import fixtures from '../common/fixtures.js';
Expand Down Expand Up @@ -126,6 +126,61 @@
},
});

tests.push({
name: 'should reconnect after initial invalid NSOLID_GRPC',
test: async (getEnv, isSecure) => {
return new Promise((resolve) => {
const grpcServer = new GRPCServer({ tls: isSecure });
grpcServer.start(mustSucceed(async (port) => {
grpcServer.on('exit', mustCall((data) => {
checkExitData(data.msg, data.metadata, agentId, { code: 0, error: null, profile: '' });
grpcServer.close();
resolve();
}));

const correctEnv = getEnv(port, isSecure);
const env = {
NODE_DEBUG_NATIVE: 'nsolid_grpc_agent',
NSOLID_GRPC: '127.0.0.1:1',
};
if (correctEnv.NSOLID_GRPC_INSECURE) {
env.NSOLID_GRPC_INSECURE = correctEnv.NSOLID_GRPC_INSECURE;
}
if (correctEnv.NSOLID_GRPC_CERTS) {
env.NSOLID_GRPC_CERTS = correctEnv.NSOLID_GRPC_CERTS;
}

const opts = {
stdio: ['inherit', 'inherit', 'inherit', 'ipc'],
env,
};
const child = new TestClient([], opts);
const agentId = await child.id();

const config = {};
if (correctEnv.NSOLID_GRPC) {
config.grpc = correctEnv.NSOLID_GRPC;
}
if (correctEnv.NSOLID_SAAS) {
config.saas = correctEnv.NSOLID_SAAS;
}

grpcServer.on('command', async ({ agentId }) => {
// Verify the CommandStream is working by sending a command from server to client
const infoResult = await grpcServer.info(agentId);
assert.ok(infoResult);
const exit = await child.shutdown(0);
assert.ok(exit);
assert.strictEqual(exit.code, 0);
assert.strictEqual(exit.signal, null);
});

await child.config(config);
}));
});
},
});

const testConfigs = [
{
getEnv: (port, isSecure) => {
Expand Down
Loading