From ede1b8044e0fe220c02d291ad8d8a26cb26dac96 Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Tue, 29 Jul 2025 00:10:18 -0700 Subject: [PATCH 1/2] use flaps client GetLogs API for fetching recent logs --- .../command/deploy/machines_releasecommand.go | 2 +- internal/command/logs/logs.go | 34 +++++++----- internal/flapsutil/flaps_client.go | 1 + internal/render/logs.go | 4 +- logs/entry.go | 54 ------------------- logs/logs.go | 2 +- logs/nats.go | 22 +++----- logs/polling.go | 29 ++++------ 8 files changed, 42 insertions(+), 106 deletions(-) delete mode 100644 logs/entry.go diff --git a/internal/command/deploy/machines_releasecommand.go b/internal/command/deploy/machines_releasecommand.go index 775044a1dd..73b5b8d78d 100644 --- a/internal/command/deploy/machines_releasecommand.go +++ b/internal/command/deploy/machines_releasecommand.go @@ -90,7 +90,7 @@ func (md *machineDeployment) runReleaseCommand(ctx context.Context, commandType stream, err = logs.NewNatsStream(ctx, md.apiClient, logOpts) if err != nil { // Silently fallback to app logs polling if NATS streaming client is unavailable. - stream = logs.NewPollingStream(md.apiClient) + stream = logs.NewPollingStream(md.flapsClient) } return nil }) diff --git a/internal/command/logs/logs.go b/internal/command/logs/logs.go index abfd09f46c..1b8a453cd4 100644 --- a/internal/command/logs/logs.go +++ b/internal/command/logs/logs.go @@ -14,10 +14,13 @@ import ( "github.com/superfly/flyctl/iostreams" "github.com/superfly/flyctl/logs" + "github.com/superfly/fly-go" + "github.com/superfly/fly-go/flaps" "github.com/superfly/flyctl/internal/appconfig" "github.com/superfly/flyctl/internal/command" "github.com/superfly/flyctl/internal/config" "github.com/superfly/flyctl/internal/flag" + "github.com/superfly/flyctl/internal/flapsutil" "github.com/superfly/flyctl/internal/flyutil" "github.com/superfly/flyctl/internal/logger" "github.com/superfly/flyctl/internal/render" @@ -77,15 +80,16 @@ func run(ctx context.Context) error { var eg *errgroup.Group eg, ctx = errgroup.WithContext(ctx) - var streams []<-chan logs.LogEntry + var streams []<-chan fly.LogEntry if opts.NoTail { - streams = []<-chan logs.LogEntry{ - poll(ctx, eg, client, opts), + streams = []<-chan fly.LogEntry{ + poll(ctx, eg, opts), } } else { + opts.NoTail = true pollingCtx, cancelPolling := context.WithCancel(ctx) - streams = []<-chan logs.LogEntry{ - poll(pollingCtx, eg, client, opts), + streams = []<-chan fly.LogEntry{ + poll(pollingCtx, eg, opts), nats(ctx, eg, client, opts, cancelPolling), } } @@ -97,13 +101,19 @@ func run(ctx context.Context) error { return eg.Wait() } -func poll(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *logs.LogOptions) <-chan logs.LogEntry { - c := make(chan logs.LogEntry) +func poll(ctx context.Context, eg *errgroup.Group, opts *logs.LogOptions) <-chan fly.LogEntry { + c := make(chan fly.LogEntry) eg.Go(func() (err error) { defer close(c) - if err = logs.Poll(ctx, c, client, opts); errors.Is(err, context.Canceled) { + flapsClient, err := flapsutil.NewClientWithOptions(ctx, flaps.NewClientOpts{ + AppName: opts.AppName, + }) + if err != nil { + return + } + if err = logs.Poll(ctx, c, flapsClient, opts); errors.Is(err, context.Canceled) { // if the parent context is cancelled then the errorgroup will return // context.Canceled because nats and/or printStreams will return it. err = nil @@ -115,8 +125,8 @@ func poll(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts * return c } -func nats(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *logs.LogOptions, cancelPolling context.CancelFunc) <-chan logs.LogEntry { - c := make(chan logs.LogEntry) +func nats(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts *logs.LogOptions, cancelPolling context.CancelFunc) <-chan fly.LogEntry { + c := make(chan fly.LogEntry) eg.Go(func() error { defer close(c) @@ -146,7 +156,7 @@ func nats(ctx context.Context, eg *errgroup.Group, client flyutil.Client, opts * return c } -func printStreams(ctx context.Context, streams ...<-chan logs.LogEntry) error { +func printStreams(ctx context.Context, streams ...<-chan fly.LogEntry) error { var eg *errgroup.Group eg, ctx = errgroup.WithContext(ctx) @@ -163,7 +173,7 @@ func printStreams(ctx context.Context, streams ...<-chan logs.LogEntry) error { return eg.Wait() } -func printStream(ctx context.Context, w io.Writer, stream <-chan logs.LogEntry, json bool) error { +func printStream(ctx context.Context, w io.Writer, stream <-chan fly.LogEntry, json bool) error { for { select { case <-ctx.Done(): diff --git a/internal/flapsutil/flaps_client.go b/internal/flapsutil/flaps_client.go index c789d9dcb0..7a29a56d2d 100644 --- a/internal/flapsutil/flaps_client.go +++ b/internal/flapsutil/flaps_client.go @@ -28,6 +28,7 @@ type FlapsClient interface { GenerateSecretKey(ctx context.Context, name string, typ string) (*fly.SetSecretKeyResp, error) Get(ctx context.Context, machineID string) (*fly.Machine, error) GetAllVolumes(ctx context.Context) ([]fly.Volume, error) + GetLogs(ctx context.Context, machineID string) ([]fly.AppLogEntry, error) GetMany(ctx context.Context, machineIDs []string) ([]*fly.Machine, error) GetMetadata(ctx context.Context, machineID string) (map[string]string, error) GetProcesses(ctx context.Context, machineID string) (fly.MachinePsResponse, error) diff --git a/internal/render/logs.go b/internal/render/logs.go index 67f9d7c7ef..8a627d3bfd 100644 --- a/internal/render/logs.go +++ b/internal/render/logs.go @@ -8,7 +8,7 @@ import ( "github.com/logrusorgru/aurora" - "github.com/superfly/flyctl/logs" + "github.com/superfly/fly-go" "github.com/superfly/flyctl/internal/format" ) @@ -43,7 +43,7 @@ func HideAllocID() LogOption { } } -func LogEntry(w io.Writer, entry logs.LogEntry, opts ...LogOption) (err error) { +func LogEntry(w io.Writer, entry fly.LogEntry, opts ...LogOption) (err error) { options := &LogOptions{} for _, opt := range opts { opt(options) diff --git a/logs/entry.go b/logs/entry.go deleted file mode 100644 index fcc03ff60c..0000000000 --- a/logs/entry.go +++ /dev/null @@ -1,54 +0,0 @@ -package logs - -type LogEntry struct { - Level string `json:"level"` - Instance string `json:"instance"` - Message string `json:"message"` - Region string `json:"region"` - Timestamp string `json:"timestamp"` - Meta Meta `json:"meta"` -} - -type Meta struct { - Instance string - Region string - Event struct { - Provider string - } - HTTP struct { - Request struct { - ID string - Method string - Version string - } - Response struct { - StatusCode int `json:"status_code"` - } - } - Error struct { - Code int - Message string - } - URL struct { - Full string - } -} - -type natsLog struct { - Event struct { - Provider string `json:"provider"` - } `json:"event"` - Fly struct { - App struct { - Instance string `json:"instance"` - Name string `json:"name"` - } `json:"app"` - Region string `json:"region"` - } `json:"fly"` - Host string `json:"host"` - Log struct { - Level string `json:"level"` - } `json:"log"` - Message string `json:"message"` - Timestamp string `json:"timestamp"` -} diff --git a/logs/logs.go b/logs/logs.go index e1d31f05ec..a8616c7c1e 100644 --- a/logs/logs.go +++ b/logs/logs.go @@ -45,5 +45,5 @@ func (opts *LogOptions) toNatsSubject() (subject string) { type LogStream interface { Err() error - Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry + Stream(ctx context.Context, opts *LogOptions) <-chan fly.LogEntry } diff --git a/logs/nats.go b/logs/nats.go index 19ebc63d3b..83447b7cef 100644 --- a/logs/nats.go +++ b/logs/nats.go @@ -8,6 +8,7 @@ import ( "github.com/nats-io/nats.go" + "github.com/superfly/fly-go" "github.com/superfly/flyctl/agent" "github.com/superfly/flyctl/internal/config" ) @@ -46,8 +47,8 @@ func NewNatsStream(ctx context.Context, apiClient WebClient, opts *LogOptions) ( } // natsLogStream implements LogStream -func (s *natsLogStream) Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry { - out := make(chan LogEntry) +func (s *natsLogStream) Stream(ctx context.Context, opts *LogOptions) <-chan fly.LogEntry { + out := make(chan fly.LogEntry) go func() { defer close(out) @@ -96,14 +97,14 @@ func (d *natsDialer) Dial(network, address string) (net.Conn, error) { return d.Dialer.DialContext(d.ctx, network, address) } -func fromNats(ctx context.Context, out chan<- LogEntry, nc *nats.Conn, opts *LogOptions) (err error) { +func fromNats(ctx context.Context, out chan<- fly.LogEntry, nc *nats.Conn, opts *LogOptions) (err error) { var sub *nats.Subscription if sub, err = nc.SubscribeSync(opts.toNatsSubject()); err != nil { return } defer sub.Unsubscribe() - var log natsLog + var log fly.AppLogEntry for { var msg *nats.Msg if msg, err = sub.NextMsgWithContext(ctx); err != nil { @@ -116,18 +117,7 @@ func fromNats(ctx context.Context, out chan<- LogEntry, nc *nats.Conn, opts *Log break } - out <- LogEntry{ - Instance: log.Fly.App.Instance, - Level: log.Log.Level, - Message: log.Message, - Region: log.Fly.Region, - Timestamp: log.Timestamp, - Meta: Meta{ - Instance: log.Fly.App.Instance, - Region: log.Fly.Region, - Event: struct{ Provider string }{log.Event.Provider}, - }, - } + out <- log.LogEntry() } return diff --git a/logs/polling.go b/logs/polling.go index f10b341558..59e44fdc40 100644 --- a/logs/polling.go +++ b/logs/polling.go @@ -7,20 +7,21 @@ import ( "github.com/azazeal/pause" "github.com/pkg/errors" - fly "github.com/superfly/fly-go" + "github.com/superfly/fly-go" + "github.com/superfly/flyctl/internal/flapsutil" ) type pollingStream struct { err error - apiClient WebClient + apiClient flapsutil.FlapsClient } -func NewPollingStream(client WebClient) LogStream { +func NewPollingStream(client flapsutil.FlapsClient) LogStream { return &pollingStream{apiClient: client} } -func (s *pollingStream) Stream(ctx context.Context, opts *LogOptions) <-chan LogEntry { - out := make(chan LogEntry) +func (s *pollingStream) Stream(ctx context.Context, opts *LogOptions) <-chan fly.LogEntry { + out := make(chan fly.LogEntry) go func() { defer close(out) @@ -35,7 +36,7 @@ func (s *pollingStream) Err() error { return s.err } -func Poll(ctx context.Context, out chan<- LogEntry, client WebClient, opts *LogOptions) error { +func Poll(ctx context.Context, out chan<- fly.LogEntry, client flapsutil.FlapsClient, opts *LogOptions) error { const ( minWait = time.Millisecond << 6 maxWait = minWait << 6 @@ -43,7 +44,6 @@ func Poll(ctx context.Context, out chan<- LogEntry, client WebClient, opts *LogO var ( errorCount int - nextToken string waitFor = minWait ) @@ -52,7 +52,7 @@ func Poll(ctx context.Context, out chan<- LogEntry, client WebClient, opts *LogO pause.For(ctx, waitFor) } - entries, token, err := client.GetAppLogs(ctx, opts.AppName, nextToken, opts.RegionCode, opts.VMID) + entries, err := client.GetLogs(ctx, opts.VMID) if err != nil { switch errorCount++; { default: @@ -77,19 +77,8 @@ func Poll(ctx context.Context, out chan<- LogEntry, client WebClient, opts *LogO waitFor = 0 - if token != "" { - nextToken = token - } - for _, entry := range entries { - out <- LogEntry{ - Instance: entry.Instance, - Level: entry.Level, - Message: entry.Message, - Region: entry.Region, - Timestamp: entry.Timestamp, - Meta: entry.Meta, - } + out <- entry.LogEntry() } if opts.NoTail { From e7df5ece1fa5339eb47d6261fe893b10fd38b87c Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Tue, 29 Jul 2025 12:13:51 -0700 Subject: [PATCH 2/2] bump fly-go dependency use superfly/fly-go#159 --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 1cf4da7735..f6fffa727d 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,7 @@ require ( github.com/spf13/pflag v1.0.6 github.com/spf13/viper v1.20.1 github.com/stretchr/testify v1.10.0 - github.com/superfly/fly-go v0.1.49 + github.com/superfly/fly-go v0.1.50-0.20250729070814-609171ef635a github.com/superfly/graphql v0.2.6 github.com/superfly/lfsc-go v0.1.1 github.com/superfly/macaroon v0.3.0 diff --git a/go.sum b/go.sum index 154a76143a..35a64328a2 100644 --- a/go.sum +++ b/go.sum @@ -630,6 +630,8 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/superfly/fly-go v0.1.49 h1:nC+apgggqVRdf42N+ucd97JS8QF8dQfr4o1LLSDZDlU= github.com/superfly/fly-go v0.1.49/go.mod h1:YcVyr0bQm6cdydoMsqIHDPZDQqbmErwCSIuJI4pNwZM= +github.com/superfly/fly-go v0.1.50-0.20250729070814-609171ef635a h1:TjHR2Stuk+q8VBRLKjpbc52px7VE6Ua4JnrqBed6O1Q= +github.com/superfly/fly-go v0.1.50-0.20250729070814-609171ef635a/go.mod h1:YcVyr0bQm6cdydoMsqIHDPZDQqbmErwCSIuJI4pNwZM= github.com/superfly/graphql v0.2.6 h1:zppbodNerWecoXEdjkhrqaNaSjGqobhXNlViHFuZzb4= github.com/superfly/graphql v0.2.6/go.mod h1:CVfDl31srm8HnJ9udwLu6hFNUW/P6GUM2dKcG1YQ8jc= github.com/superfly/lfsc-go v0.1.1 h1:dGjLgt81D09cG+aR9lJZIdmonjZSR5zYCi7s54+ZU2Q=