Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ require (
github.com/spf13/pflag v1.0.6
github.com/spf13/viper v1.20.1
github.com/stretchr/testify v1.10.0
github.com/superfly/fly-go v0.1.49
github.com/superfly/fly-go v0.1.50-0.20250729070814-609171ef635a
github.com/superfly/graphql v0.2.6
github.com/superfly/lfsc-go v0.1.1
github.com/superfly/macaroon v0.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/superfly/fly-go v0.1.49 h1:nC+apgggqVRdf42N+ucd97JS8QF8dQfr4o1LLSDZDlU=
github.com/superfly/fly-go v0.1.49/go.mod h1:YcVyr0bQm6cdydoMsqIHDPZDQqbmErwCSIuJI4pNwZM=
github.com/superfly/fly-go v0.1.50-0.20250729070814-609171ef635a h1:TjHR2Stuk+q8VBRLKjpbc52px7VE6Ua4JnrqBed6O1Q=
github.com/superfly/fly-go v0.1.50-0.20250729070814-609171ef635a/go.mod h1:YcVyr0bQm6cdydoMsqIHDPZDQqbmErwCSIuJI4pNwZM=
github.com/superfly/graphql v0.2.6 h1:zppbodNerWecoXEdjkhrqaNaSjGqobhXNlViHFuZzb4=
github.com/superfly/graphql v0.2.6/go.mod h1:CVfDl31srm8HnJ9udwLu6hFNUW/P6GUM2dKcG1YQ8jc=
github.com/superfly/lfsc-go v0.1.1 h1:dGjLgt81D09cG+aR9lJZIdmonjZSR5zYCi7s54+ZU2Q=
Expand Down
2 changes: 1 addition & 1 deletion internal/command/deploy/machines_releasecommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (md *machineDeployment) runReleaseCommand(ctx context.Context, commandType
stream, err = logs.NewNatsStream(ctx, md.apiClient, logOpts)
if err != nil {
// Silently fallback to app logs polling if NATS streaming client is unavailable.
stream = logs.NewPollingStream(md.apiClient)
stream = logs.NewPollingStream(md.flapsClient)
}
return nil
})
Expand Down
34 changes: 22 additions & 12 deletions internal/command/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
"github.com/superfly/flyctl/iostreams"
"github.com/superfly/flyctl/logs"

"github.com/superfly/fly-go"
"github.com/superfly/fly-go/flaps"
"github.com/superfly/flyctl/internal/appconfig"
"github.com/superfly/flyctl/internal/command"
"github.com/superfly/flyctl/internal/config"
"github.com/superfly/flyctl/internal/flag"
"github.com/superfly/flyctl/internal/flapsutil"
"github.com/superfly/flyctl/internal/flyutil"
"github.com/superfly/flyctl/internal/logger"
"github.com/superfly/flyctl/internal/render"
Expand Down Expand Up @@ -77,15 +80,16 @@ func run(ctx context.Context) error {
var eg *errgroup.Group
eg, ctx = errgroup.WithContext(ctx)

var streams []<-chan logs.LogEntry
var streams []<-chan fly.LogEntry
if opts.NoTail {
streams = []<-chan logs.LogEntry{
poll(ctx, eg, client, opts),
streams = []<-chan fly.LogEntry{
poll(ctx, eg, opts),
}
} else {
opts.NoTail = true
pollingCtx, cancelPolling := context.WithCancel(ctx)
streams = []<-chan logs.LogEntry{
poll(pollingCtx, eg, client, opts),
streams = []<-chan fly.LogEntry{
poll(pollingCtx, eg, opts),
nats(ctx, eg, client, opts, cancelPolling),
}
}
Expand All @@ -97,13 +101,19 @@ func run(ctx context.Context) error {
return eg.Wait()
}

func poll(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *logs.LogOptions) <-chan logs.LogEntry {
c := make(chan logs.LogEntry)
func poll(ctx context.Context, eg *errgroup.Group, opts *logs.LogOptions) <-chan fly.LogEntry {
c := make(chan fly.LogEntry)

eg.Go(func() (err error) {
defer close(c)

if err = logs.Poll(ctx, c, client, opts); errors.Is(err, context.Canceled) {
flapsClient, err := flapsutil.NewClientWithOptions(ctx, flaps.NewClientOpts{
AppName: opts.AppName,
})
if err != nil {
return
}
if err = logs.Poll(ctx, c, flapsClient, opts); errors.Is(err, context.Canceled) {
// if the parent context is cancelled then the errorgroup will return
// context.Canceled because nats and/or printStreams will return it.
err = nil
Expand All @@ -115,8 +125,8 @@ func poll(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *
return c
}

func nats(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *logs.LogOptions, cancelPolling context.CancelFunc) <-chan logs.LogEntry {
c := make(chan logs.LogEntry)
func nats(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *logs.LogOptions, cancelPolling context.CancelFunc) <-chan fly.LogEntry {
c := make(chan fly.LogEntry)

eg.Go(func() error {
defer close(c)
Expand Down Expand Up @@ -146,7 +156,7 @@ func nats(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *
return c
}

func printStreams(ctx context.Context, streams ...<-chan logs.LogEntry) error {
func printStreams(ctx context.Context, streams ...<-chan fly.LogEntry) error {
var eg *errgroup.Group
eg, ctx = errgroup.WithContext(ctx)

Expand All @@ -163,7 +173,7 @@ func printStreams(ctx context.Context, streams ...<-chan logs.LogEntry) error {
return eg.Wait()
}

func printStream(ctx context.Context, w io.Writer, stream <-chan logs.LogEntry, json bool) error {
func printStream(ctx context.Context, w io.Writer, stream <-chan fly.LogEntry, json bool) error {
for {
select {
case <-ctx.Done():
Expand Down
1 change: 1 addition & 0 deletions internal/flapsutil/flaps_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type FlapsClient interface {
GenerateSecretKey(ctx context.Context, name string, typ string) (*fly.SetSecretKeyResp, error)
Get(ctx context.Context, machineID string) (*fly.Machine, error)
GetAllVolumes(ctx context.Context) ([]fly.Volume, error)
GetLogs(ctx context.Context, machineID string) ([]fly.AppLogEntry, error)
GetMany(ctx context.Context, machineIDs []string) ([]*fly.Machine, error)
GetMetadata(ctx context.Context, machineID string) (map[string]string, error)
GetProcesses(ctx context.Context, machineID string) (fly.MachinePsResponse, error)
Expand Down
4 changes: 2 additions & 2 deletions internal/render/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/logrusorgru/aurora"

"github.com/superfly/flyctl/logs"
"github.com/superfly/fly-go"

"github.com/superfly/flyctl/internal/format"
)
Expand Down Expand Up @@ -43,7 +43,7 @@ func HideAllocID() LogOption {
}
}

func LogEntry(w io.Writer, entry logs.LogEntry, opts ...LogOption) (err error) {
func LogEntry(w io.Writer, entry fly.LogEntry, opts ...LogOption) (err error) {
options := &LogOptions{}
for _, opt := range opts {
opt(options)
Expand Down
54 changes: 0 additions & 54 deletions logs/entry.go

This file was deleted.

2 changes: 1 addition & 1 deletion logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ func (opts *LogOptions) toNatsSubject() (subject string) {

type LogStream interface {
Err() error
Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry
Stream(ctx context.Context, opts *LogOptions) <-chan fly.LogEntry
}
22 changes: 6 additions & 16 deletions logs/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/nats-io/nats.go"

"github.com/superfly/fly-go"
"github.com/superfly/flyctl/agent"
"github.com/superfly/flyctl/internal/config"
)
Expand Down Expand Up @@ -46,8 +47,8 @@ func NewNatsStream(ctx context.Context, apiClient WebClient, opts *LogOptions) (
}

// natsLogStream implements LogStream
func (s *natsLogStream) Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry {
out := make(chan LogEntry)
func (s *natsLogStream) Stream(ctx context.Context, opts *LogOptions) <-chan fly.LogEntry {
out := make(chan fly.LogEntry)

go func() {
defer close(out)
Expand Down Expand Up @@ -96,14 +97,14 @@ func (d *natsDialer) Dial(network, address string) (net.Conn, error) {
return d.Dialer.DialContext(d.ctx, network, address)
}

func fromNats(ctx context.Context, out chan<- LogEntry, nc *nats.Conn, opts *LogOptions) (err error) {
func fromNats(ctx context.Context, out chan<- fly.LogEntry, nc *nats.Conn, opts *LogOptions) (err error) {
var sub *nats.Subscription
if sub, err = nc.SubscribeSync(opts.toNatsSubject()); err != nil {
return
}
defer sub.Unsubscribe()

var log natsLog
var log fly.AppLogEntry
for {
var msg *nats.Msg
if msg, err = sub.NextMsgWithContext(ctx); err != nil {
Expand All @@ -116,18 +117,7 @@ func fromNats(ctx context.Context, out chan<- LogEntry, nc *nats.Conn, opts *Log
break
}

out <- LogEntry{
Instance: log.Fly.App.Instance,
Level: log.Log.Level,
Message: log.Message,
Region: log.Fly.Region,
Timestamp: log.Timestamp,
Meta: Meta{
Instance: log.Fly.App.Instance,
Region: log.Fly.Region,
Event: struct{ Provider string }{log.Event.Provider},
},
}
out <- log.LogEntry()
}

return
Expand Down
29 changes: 9 additions & 20 deletions logs/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@ import (
"github.com/azazeal/pause"
"github.com/pkg/errors"

fly "github.com/superfly/fly-go"
"github.com/superfly/fly-go"
"github.com/superfly/flyctl/internal/flapsutil"
)

type pollingStream struct {
err error
apiClient WebClient
apiClient flapsutil.FlapsClient
}

func NewPollingStream(client WebClient) LogStream {
func NewPollingStream(client flapsutil.FlapsClient) LogStream {
return &pollingStream{apiClient: client}
}

func (s *pollingStream) Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry {
out := make(chan LogEntry)
func (s *pollingStream) Stream(ctx context.Context, opts *LogOptions) <-chan fly.LogEntry {
out := make(chan fly.LogEntry)

go func() {
defer close(out)
Expand All @@ -35,15 +36,14 @@ func (s *pollingStream) Err() error {
return s.err
}

func Poll(ctx context.Context, out chan<- LogEntry, client WebClient, opts *LogOptions) error {
func Poll(ctx context.Context, out chan<- fly.LogEntry, client flapsutil.FlapsClient, opts *LogOptions) error {
const (
minWait = time.Millisecond << 6
maxWait = minWait << 6
)

var (
errorCount int
nextToken string
waitFor = minWait
)

Expand All @@ -52,7 +52,7 @@ func Poll(ctx context.Context, out chan<- LogEntry, client WebClient, opts *LogO
pause.For(ctx, waitFor)
}

entries, token, err := client.GetAppLogs(ctx, opts.AppName, nextToken, opts.RegionCode, opts.VMID)
entries, err := client.GetLogs(ctx, opts.VMID)
if err != nil {
switch errorCount++; {
default:
Expand All @@ -77,19 +77,8 @@ func Poll(ctx context.Context, out chan<- LogEntry, client WebClient, opts *LogO

waitFor = 0

if token != "" {
nextToken = token
}

for _, entry := range entries {
out <- LogEntry{
Instance: entry.Instance,
Level: entry.Level,
Message: entry.Message,
Region: entry.Region,
Timestamp: entry.Timestamp,
Meta: entry.Meta,
}
out <- entry.LogEntry()
}

if opts.NoTail {
Expand Down
Loading