diff --git a/cmd/ctr/commands/images/pull.go b/cmd/ctr/commands/images/pull.go index 0999b8dc6137..c93c8834db50 100644 --- a/cmd/ctr/commands/images/pull.go +++ b/cmd/ctr/commands/images/pull.go @@ -265,9 +265,14 @@ func ProgressHandler(ctx context.Context, out io.Writer) (transfer.ProgressFunc, statuses = map[string]*progressNode{} roots = []*progressNode{} progress transfer.ProgressFunc - pc = make(chan transfer.Progress, 1) - status string - closeC = make(chan struct{}) + // Use a buffered channel for progress to allow multiple completed + // progress updates to be processed before shutting down the progress + // handler. Currently the progress stream does not have an explicit + // end, however, done indicates the server has already commpleted + // sending all progress. + pc = make(chan transfer.Progress, 5) + status string + closeC = make(chan struct{}) ) progress = func(p transfer.Progress) { @@ -409,7 +414,7 @@ func displayNode(w io.Writer, prefix string, nodes []*progressNode) int64 { name := prefix + pf + displayName(status.Name) switch status.Event { - case "downloading", "uploading": + case "downloading", "uploading", "extracting": var bar progress.Bar if status.Total > 0.0 { bar = progress.Bar(float64(status.Progress) / float64(status.Total)) @@ -425,7 +430,7 @@ func displayNode(w io.Writer, prefix string, nodes []*progressNode) int64 { name, status.Event, bar) - case "complete": + case "complete", "extracted": bar := progress.Bar(1.0) fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t\n", name, diff --git a/core/diff/apply/apply.go b/core/diff/apply/apply.go index 62e7a3623336..9acecea35cc4 100644 --- a/core/diff/apply/apply.go +++ b/core/diff/apply/apply.go @@ -71,10 +71,16 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ if err != nil { return emptyDesc, fmt.Errorf("failed to get reader from content store: %w", err) } - defer ra.Close() + var r io.ReadCloser + if config.Progress != nil { + r = newProgressReader(ra, config.Progress) + } else { + r = newReadCloser(ra) + } + defer r.Close() var processors []diff.StreamProcessor - processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) + processor := diff.NewProcessorChain(desc.MediaType, r) processors = append(processors, processor) for { if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { @@ -110,6 +116,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ } } } + return ocispec.Descriptor{ MediaType: ocispec.MediaTypeImageLayer, Size: rc.c, @@ -129,3 +136,46 @@ func (rc *readCounter) Read(p []byte) (n int, err error) { } return } + +type progressReader struct { + rc *readCounter + c io.Closer + p func(int64) +} + +func newProgressReader(ra content.ReaderAt, p func(int64)) io.ReadCloser { + return &progressReader{ + rc: &readCounter{ + r: content.NewReader(ra), + c: 0, + }, + c: ra, + p: p, + } +} + +func (pr *progressReader) Read(p []byte) (n int, err error) { + // Call the progress function with the current count, indicating + // the previously read content has been processed. Initial + // progress of 0 indicates start of processing. + pr.p(pr.rc.c) + n, err = pr.rc.Read(p) + return +} + +func (pr *progressReader) Close() error { + pr.p(pr.rc.c) + return pr.c.Close() +} + +type readCloser struct { + io.Reader + io.Closer +} + +func newReadCloser(ra content.ReaderAt) io.ReadCloser { + return &readCloser{ + Reader: content.NewReader(ra), + Closer: ra, + } +} diff --git a/core/diff/diff.go b/core/diff/diff.go index 4838a1c37e25..4d20044ca75d 100644 --- a/core/diff/diff.go +++ b/core/diff/diff.go @@ -69,6 +69,8 @@ type ApplyConfig struct { ProcessorPayloads map[string]typeurl.Any // SyncFs is to synchronize the underlying filesystem containing files SyncFs bool + // Progress is a function which reports status of processed read data + Progress func(int64) } // ApplyOpt is used to configure an Apply operation @@ -135,6 +137,18 @@ func WithSyncFs(sync bool) ApplyOpt { } } +// WithProgress is used to indicate process of the apply operation, should +// atleast expect a progress of 0 and of the final size. It is up to the applier +// how much progress it reports in between. +func WithProgress(f func(ocispec.Descriptor, int64)) ApplyOpt { + return func(_ context.Context, desc ocispec.Descriptor, c *ApplyConfig) error { + c.Progress = func(state int64) { + f(desc, state) + } + return nil + } +} + // WithSourceDateEpoch specifies the timestamp used to provide control for reproducibility. // See also https://reproducible-builds.org/docs/source-date-epoch/ . // diff --git a/core/diff/proxy/differ.go b/core/diff/proxy/differ.go index 8a5a4abd1ee9..f34e0d65d9ed 100644 --- a/core/diff/proxy/differ.go +++ b/core/diff/proxy/differ.go @@ -56,7 +56,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts for k, v := range config.ProcessorPayloads { payloads[k] = typeurl.MarshalProto(v) } - + if config.Progress != nil { + config.Progress(0) + } req := &diffapi.ApplyRequest{ Diff: oci.DescriptorToProto(desc), Mounts: mount.ToProto(mounts), @@ -67,6 +69,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts if err != nil { return ocispec.Descriptor{}, errgrpc.ToNative(err) } + if config.Progress != nil { + config.Progress(desc.Size) + } return oci.DescriptorFromProto(resp.Applied), nil } diff --git a/core/transfer/local/progress.go b/core/transfer/local/progress.go index e75c448da7a3..60f0cd359bf0 100644 --- a/core/transfer/local/progress.go +++ b/core/transfer/local/progress.go @@ -35,6 +35,7 @@ type ProgressTracker struct { root string transferState string added chan jobUpdate + extraction chan extractionUpdate waitC chan struct{} parents map[digest.Digest][]ocispec.Descriptor @@ -47,6 +48,8 @@ const ( jobAdded jobState = iota jobInProgress jobComplete + jobExtracting + jobExtracted ) type jobStatus struct { @@ -72,12 +75,18 @@ type StatusTracker interface { Check(context.Context, digest.Digest) (bool, error) } +type extractionUpdate struct { + desc ocispec.Descriptor + progress int64 +} + // NewProgressTracker tracks content download progress func NewProgressTracker(root, transferState string) *ProgressTracker { return &ProgressTracker{ root: root, transferState: transferState, added: make(chan jobUpdate, 1), + extraction: make(chan extractionUpdate, 1), waitC: make(chan struct{}), parents: map[digest.Digest][]ocispec.Descriptor{}, } @@ -97,7 +106,8 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre log.G(ctx).WithError(err).Error("failed to get statuses for progress") } for dgst, job := range jobs { - if job.state != jobComplete { + switch job.state { + case jobAdded, jobInProgress: status, ok := active.Status(job.name) if ok { if status.Offset > job.progress { @@ -130,6 +140,30 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre jobs[dgst] = job } } + case jobExtracting: + if job.progress == job.desc.Size { + pf(transfer.Progress{ + Event: "extracted", + Name: job.name, + Parents: job.parents, + Progress: job.desc.Size, + Total: job.desc.Size, + Desc: &job.desc, + }) + job.state = jobExtracted + jobs[dgst] = job + } else { + pf(transfer.Progress{ + Event: "extracting", + Name: job.name, + Parents: job.parents, + Progress: job.progress, + Total: job.desc.Size, + Desc: &job.desc, + }) + } + case jobComplete, jobExtracted: + // No progress to send } } } @@ -161,10 +195,9 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre } jobs[update.desc.Digest] = job pf(transfer.Progress{ - Event: "waiting", - Name: name, - Parents: parents, - //Digest: desc.Digest.String(), + Event: "waiting", + Name: name, + Parents: parents, Progress: 0, Total: update.desc.Size, Desc: &job.desc, @@ -181,7 +214,37 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre job.state = jobComplete job.progress = job.desc.Size } + case extraction := <-j.extraction: + job, ok := jobs[extraction.desc.Digest] + if !ok { + // Only captures the parents defined before, + // could handle parent updates in same thread + // if there is a synchronization issue + var parents []string + j.parentL.Lock() + for _, parent := range j.parents[extraction.desc.Digest] { + parents = append(parents, remotes.MakeRefKey(ctx, parent)) + } + j.parentL.Unlock() + if len(parents) == 0 { + parents = []string{j.root} + } + name := remotes.MakeRefKey(ctx, extraction.desc) + job = &jobStatus{ + state: jobExtracting, + name: name, + parents: parents, + progress: extraction.progress, + desc: extraction.desc, + } + jobs[extraction.desc.Digest] = job + } else { + if job.state != jobExtracting { + job.state = jobExtracting + } + job.progress = extraction.progress + } case <-tc.C: update() // Next timer? @@ -226,6 +289,16 @@ func (j *ProgressTracker) AddChildren(desc ocispec.Descriptor, children []ocispe } +func (j *ProgressTracker) ExtractProgress(desc ocispec.Descriptor, progress int64) { + if j == nil { + return + } + j.extraction <- extractionUpdate{ + desc: desc, + progress: progress, + } +} + func (j *ProgressTracker) Wait() { // timeout rather than rely on cancel timeout := time.After(10 * time.Second) diff --git a/core/transfer/local/pull.go b/core/transfer/local/pull.go index 766075206992..92085c67b588 100644 --- a/core/transfer/local/pull.go +++ b/core/transfer/local/pull.go @@ -26,6 +26,7 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/containerd/containerd/v2/core/content" + "github.com/containerd/containerd/v2/core/diff" "github.com/containerd/containerd/v2/core/images" "github.com/containerd/containerd/v2/core/remotes" "github.com/containerd/containerd/v2/core/remotes/docker" @@ -200,6 +201,9 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch if v, ok := mu.SnapshotterExports["enable_remote_snapshot_annotations"]; ok && v == "true" { enableRemoteSnapshotAnnotations = true } + if progressTracker != nil { + mu.ApplyOpts = append(mu.ApplyOpts, diff.WithProgress(progressTracker.ExtractProgress)) + } uopts = append(uopts, unpack.WithUnpackPlatform(mu)) } else { log.G(ctx).WithFields(log.Fields{