Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sup.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,13 @@ func (sup *Stackup) Run(network *Network, envVars EnvList, commands ...*Command)
wg.Add(1)
go func(c Client) {
defer wg.Done()
_, err := io.Copy(os.Stdout, prefixer.New(c.Stdout(), prefix))
var err error
if task.Output == nil {
_, err = io.Copy(os.Stdout, prefixer.New(c.Stdout(), prefix))
} else {
// for download task, remote task stdout piped into local stdin of tar process
_, err = io.Copy(task.Output, c.Stdout())
}
if err != nil && err != io.EOF {
// TODO: io.Copy() should not return io.EOF at all.
// Upstream bug? Or prefixer.WriteTo() bug?
Expand Down
27 changes: 18 additions & 9 deletions supfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ func (n *Networks) Get(name string) (Network, bool) {

// Command represents command(s) to be run remotely.
type Command struct {
Name string `yaml:"-"` // Command name.
Desc string `yaml:"desc"` // Command description.
Local string `yaml:"local"` // Command(s) to be run locally.
Run string `yaml:"run"` // Command(s) to be run remotelly.
Script string `yaml:"script"` // Load command(s) from script and run it remotelly.
Upload []Upload `yaml:"upload"` // See Upload struct.
Stdin bool `yaml:"stdin"` // Attach localhost STDOUT to remote commands' STDIN?
Once bool `yaml:"once"` // The command should be run "once" (on one host only).
Serial int `yaml:"serial"` // Max number of clients processing a task in parallel.
Name string `yaml:"-"` // Command name.
Desc string `yaml:"desc"` // Command description.
Local string `yaml:"local"` // Command(s) to be run locally.
Run string `yaml:"run"` // Command(s) to be run remotelly.
Script string `yaml:"script"` // Load command(s) from script and run it remotelly.
Upload []Upload `yaml:"upload"` // See Upload struct.
Download []Download `yaml:"download"` // See Download struct.
Stdin bool `yaml:"stdin"` // Attach localhost STDOUT to remote commands' STDIN?
Once bool `yaml:"once"` // The command should be run "once" (on one host only).
Serial int `yaml:"serial"` // Max number of clients processing a task in parallel.

// API backward compatibility. Will be deprecated in v1.0.
RunOnce bool `yaml:"run_once"` // The command should be run once only.
Expand Down Expand Up @@ -151,6 +152,14 @@ type Upload struct {
Exc string `yaml:"exclude"`
}

// Download represents file copy operation from remote SrcFolder/Src to local Dst
type Download struct {
SrcFolder string `yaml:"src_folder"`
Src string `yaml:"src"`
Dst string `yaml:"dst"`
Exc string `yaml:"exclude"` // todo: to support Exclude setting
}

// EnvVar represents an environment variable
type EnvVar struct {
Key string
Expand Down
26 changes: 26 additions & 0 deletions tar.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
)

// Copying dirs/files over SSH using TAR.
// upload:
// tar -C . -cvzf - $SRC | ssh $HOST "tar -C $DST -xvzf -"
// download:
// ssh $HOST "tar -C $SRC_DIR -czvf - $SRC_FILE" | tar -C $DST -xzvf -

// RemoteTarCommand returns command to be run on remote SSH host
// to properly receive the created TAR stream.
Expand Down Expand Up @@ -51,3 +54,26 @@ func NewTarStreamReader(cwd, path, exclude string) (io.Reader, error) {

return stdout, nil
}

// RemoteTarCreateCommand forms "tar -C $SRC_DIR -czvf - $SRC_FILE"
// which is the remote part of download task
func RemoteTarCreateCommand(dir, src string) string {
return fmt.Sprintf("tar -C \"%s\" -czvf - \"%s\"", dir, src)
}

// NewTarStreamWriter creates a tar stream writer to local path
// by calling tar -C $DST -xzvf -
// which is the local part of download task
func NewTarStreamWriter(dst string) (io.Writer, error) {
cmd := exec.Command("tar", "-C", dst, "-xzvf", "-")
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, errors.Wrap(err, "tar: stdin pipe failed")
}

if err := cmd.Start(); err != nil {
return nil, errors.Wrap(err, "tar: starting cmd failed")
}

return stdin, nil
}
100 changes: 47 additions & 53 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
type Task struct {
Run string
Input io.Reader
Output io.Writer
Clients []Client
TTY bool
}
Expand Down Expand Up @@ -42,24 +43,28 @@ func (sup *Stackup) createTasks(cmd *Command, clients []Client, env string) ([]*
TTY: false,
}

if cmd.Once {
task.Clients = []Client{clients[0]}
tasks = append(tasks, &task)
} else if cmd.Serial > 0 {
// Each "serial" task client group is executed sequentially.
for i := 0; i < len(clients); i += cmd.Serial {
j := i + cmd.Serial
if j > len(clients) {
j = len(clients)
}
copy := task
copy.Clients = clients[i:j]
tasks = append(tasks, &copy)
}
} else {
task.Clients = clients
tasks = append(tasks, &task)
addTask(task, cmd, clients, &tasks)
}

// todo: impl download support
for _, download := range cmd.Download {
dst, err := ResolveLocalPath(cwd, download.Dst, env)
if err != nil {
return nil, errors.Wrap(err, "download: "+download.Dst)
}
tarWriter, err := NewTarStreamWriter(dst)
if err != nil {
return nil, errors.Wrap(err, "download: "+download.Dst)
}

// todo: support download.Exclude
task := Task{
Run: RemoteTarCreateCommand(download.SrcFolder, download.Src),
Output: tarWriter,
TTY: false,
}

addTask(task, cmd, clients, &tasks)
}

// Script. Read the file as a multiline input command.
Expand All @@ -83,24 +88,8 @@ func (sup *Stackup) createTasks(cmd *Command, clients []Client, env string) ([]*
if cmd.Stdin {
task.Input = os.Stdin
}
if cmd.Once {
task.Clients = []Client{clients[0]}
tasks = append(tasks, &task)
} else if cmd.Serial > 0 {
// Each "serial" task client group is executed sequentially.
for i := 0; i < len(clients); i += cmd.Serial {
j := i + cmd.Serial
if j > len(clients) {
j = len(clients)
}
copy := task
copy.Clients = clients[i:j]
tasks = append(tasks, &copy)
}
} else {
task.Clients = clients
tasks = append(tasks, &task)
}

addTask(task, cmd, clients, &tasks)
}

// Local command.
Expand Down Expand Up @@ -135,29 +124,34 @@ func (sup *Stackup) createTasks(cmd *Command, clients []Client, env string) ([]*
if cmd.Stdin {
task.Input = os.Stdin
}
if cmd.Once {
task.Clients = []Client{clients[0]}
tasks = append(tasks, &task)
} else if cmd.Serial > 0 {
// Each "serial" task client group is executed sequentially.
for i := 0; i < len(clients); i += cmd.Serial {
j := i + cmd.Serial
if j > len(clients) {
j = len(clients)
}
copy := task
copy.Clients = clients[i:j]
tasks = append(tasks, &copy)
}
} else {
task.Clients = clients
tasks = append(tasks, &task)
}

addTask(task, cmd, clients, &tasks)
}

return tasks, nil
}

func addTask(task Task, cmd *Command, clients []Client, tasks *[]*Task) {
if cmd.Once {
task.Clients = []Client{clients[0]}
*tasks = append(*tasks, &task)
} else if cmd.Serial > 0 {
// Each "serial" task client group is executed sequentially.
for i := 0; i < len(clients); i += cmd.Serial {
j := i + cmd.Serial
if j > len(clients) {
j = len(clients)
}
copy := task
copy.Clients = clients[i:j]
*tasks = append(*tasks, &copy)
}
} else {
task.Clients = clients
*tasks = append(*tasks, &task)
}
}

type ErrTask struct {
Task *Task
Reason string
Expand Down