Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions core/controlcommands/commandqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m *CommandQueue) Start() {
}
if err == nil && response == nil {
log.WithField("partition", entry.cmd.GetEnvironmentId().String()).
Error("nil response")
Errorf("did not receive neither response nor error for %s", entry.cmd.GetName())
}

entry.callback <- response
Expand Down Expand Up @@ -198,6 +198,11 @@ func (m *CommandQueue) commit(command MesosCommand) (response MesosCommandRespon
// Wait for goroutines to finish
for i := 0; i < len(command.targets()); i++ {
respSemaphore := <-semaphore
// for the sake of better error propagation, we treat a lack of response as a response with error,
// even though it's not technically the same. it can be surely done better, but it would require a larger refactoring.
if respSemaphore.err != nil && respSemaphore.response == nil {
respSemaphore.response = NewMesosCommandResponse(command, respSemaphore.err)
}
responses[respSemaphore.receiver] = respSemaphore.response
if respSemaphore.err != nil {
sendErrorList = append(sendErrorList, respSemaphore.err)
Expand All @@ -215,12 +220,11 @@ func (m *CommandQueue) commit(command MesosCommand) (response MesosCommandRespon
}
return
}(), "\n"))
return
}
response = consolidateResponses(command, responses)

log.WithField("partition", command.GetEnvironmentId().String()).
Debug("responses consolidated, CommandQueue commit done")

return response, nil
return response, err
}
2 changes: 1 addition & 1 deletion core/controlcommands/mesoscommandservent.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *Servent) RunCommand(cmd MesosCommand, receiver MesosCommandTarget) (Mes
// By the time we get here, ProcessResponse should have already added a Response to the
// pending call, and removed it from servent.pending.
case <-time.After(cmd.GetResponseTimeout()):
call.Error = fmt.Errorf("MesosCommand %s timed out for task %s", cmd.GetName(), receiver.TaskId.Value)
call.Error = fmt.Errorf("%s timed out for task %s", cmd.GetName(), receiver.TaskId.Value)

log.WithPrefix("servent").
WithField("partition", cmd.GetEnvironmentId().String()).
Expand Down
11 changes: 2 additions & 9 deletions core/task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
close(notify)

if response == nil {
return errors.New("nil response")
return fmt.Errorf("no response from Mesos to CONFIGURE transition request within %ds timeout", int(cmd.ResponseTimeout.Seconds()))
}

if response.IsMultiResponse() {
Expand All @@ -765,14 +765,7 @@ func (m *Manager) configureTasks(envId uid.ID, tasks Tasks) error {
task := m.GetTask(k.TaskId.Value)
var taskDescription string
if task != nil {
tci := task.GetTaskCommandInfo()
tciValue := "unknown command"
if tci.Value != nil {
tciValue = *tci.Value
}

taskDescription = fmt.Sprintf("task '%s' on %s (id %s, name %s) failed with error: %s", tciValue, task.GetHostname(), task.GetTaskId(), task.GetName(), v.Error())

taskDescription = fmt.Sprintf("task '%s' on %s (id %s) failed with error: %s", task.GetParent().GetName(), task.GetHostname(), task.GetTaskId(), v.Error())
} else {
taskDescription = fmt.Sprintf("unknown task (id %s) failed with error: %s", k.TaskId.Value, v.Error())
}
Expand Down
1 change: 1 addition & 0 deletions core/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type parentRole interface {
ConsolidatedVarStack() (varStack map[string]string, err error)
CollectInboundChannels() []channel.Inbound
SendEvent(event.Event)
GetName() string
}

type Traits struct {
Expand Down