Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions cmd/ctr/commands/images/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,14 @@ func ProgressHandler(ctx context.Context, out io.Writer) (transfer.ProgressFunc,
statuses = map[string]*progressNode{}
roots = []*progressNode{}
progress transfer.ProgressFunc
pc = make(chan transfer.Progress, 1)
status string
closeC = make(chan struct{})
// Use a buffered channel for progress to allow multiple completed
// progress updates to be processed before shutting down the progress
// handler. Currently the progress stream does not have an explicit
// end, however, done indicates the server has already commpleted
// sending all progress.
pc = make(chan transfer.Progress, 5)
status string
closeC = make(chan struct{})
)

progress = func(p transfer.Progress) {
Expand Down Expand Up @@ -409,7 +414,7 @@ func displayNode(w io.Writer, prefix string, nodes []*progressNode) int64 {
name := prefix + pf + displayName(status.Name)

switch status.Event {
case "downloading", "uploading":
case "downloading", "uploading", "extracting":
var bar progress.Bar
if status.Total > 0.0 {
bar = progress.Bar(float64(status.Progress) / float64(status.Total))
Expand All @@ -425,7 +430,7 @@ func displayNode(w io.Writer, prefix string, nodes []*progressNode) int64 {
name,
status.Event,
bar)
case "complete":
case "complete", "extracted":
bar := progress.Bar(1.0)
fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t\n",
name,
Expand Down
54 changes: 52 additions & 2 deletions core/diff/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,16 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
if err != nil {
return emptyDesc, fmt.Errorf("failed to get reader from content store: %w", err)
}
defer ra.Close()
var r io.ReadCloser
if config.Progress != nil {
r = newProgressReader(ra, config.Progress)
} else {
r = newReadCloser(ra)
}
defer r.Close()

var processors []diff.StreamProcessor
processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra))
processor := diff.NewProcessorChain(desc.MediaType, r)
processors = append(processors, processor)
for {
if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil {
Expand Down Expand Up @@ -110,6 +116,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [
}
}
}

return ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayer,
Size: rc.c,
Expand All @@ -129,3 +136,46 @@ func (rc *readCounter) Read(p []byte) (n int, err error) {
}
return
}

type progressReader struct {
rc *readCounter
c io.Closer
p func(int64)
}

func newProgressReader(ra content.ReaderAt, p func(int64)) io.ReadCloser {
return &progressReader{
rc: &readCounter{
r: content.NewReader(ra),
c: 0,
},
c: ra,
p: p,
}
}

func (pr *progressReader) Read(p []byte) (n int, err error) {
// Call the progress function with the current count, indicating
// the previously read content has been processed. Initial
// progress of 0 indicates start of processing.
pr.p(pr.rc.c)
n, err = pr.rc.Read(p)
return
}

func (pr *progressReader) Close() error {
pr.p(pr.rc.c)
return pr.c.Close()
}

type readCloser struct {
io.Reader
io.Closer
}

func newReadCloser(ra content.ReaderAt) io.ReadCloser {
return &readCloser{
Reader: content.NewReader(ra),
Closer: ra,
}
}
14 changes: 14 additions & 0 deletions core/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type ApplyConfig struct {
ProcessorPayloads map[string]typeurl.Any
// SyncFs is to synchronize the underlying filesystem containing files
SyncFs bool
// Progress is a function which reports status of processed read data
Progress func(int64)
}

// ApplyOpt is used to configure an Apply operation
Expand Down Expand Up @@ -135,6 +137,18 @@ func WithSyncFs(sync bool) ApplyOpt {
}
}

// WithProgress is used to indicate process of the apply operation, should
// atleast expect a progress of 0 and of the final size. It is up to the applier
// how much progress it reports in between.
func WithProgress(f func(ocispec.Descriptor, int64)) ApplyOpt {
return func(_ context.Context, desc ocispec.Descriptor, c *ApplyConfig) error {
c.Progress = func(state int64) {
f(desc, state)
}
return nil
}
}

// WithSourceDateEpoch specifies the timestamp used to provide control for reproducibility.
// See also https://reproducible-builds.org/docs/source-date-epoch/ .
//
Expand Down
7 changes: 6 additions & 1 deletion core/diff/proxy/differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
for k, v := range config.ProcessorPayloads {
payloads[k] = typeurl.MarshalProto(v)
}

if config.Progress != nil {
config.Progress(0)
}
req := &diffapi.ApplyRequest{
Diff: oci.DescriptorToProto(desc),
Mounts: mount.ToProto(mounts),
Expand All @@ -67,6 +69,9 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts
if err != nil {
return ocispec.Descriptor{}, errgrpc.ToNative(err)
}
if config.Progress != nil {
config.Progress(desc.Size)
}
return oci.DescriptorFromProto(resp.Applied), nil
}

Expand Down
83 changes: 78 additions & 5 deletions core/transfer/local/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ProgressTracker struct {
root string
transferState string
added chan jobUpdate
extraction chan extractionUpdate
waitC chan struct{}

parents map[digest.Digest][]ocispec.Descriptor
Expand All @@ -47,6 +48,8 @@ const (
jobAdded jobState = iota
jobInProgress
jobComplete
jobExtracting
jobExtracted
)

type jobStatus struct {
Expand All @@ -72,12 +75,18 @@ type StatusTracker interface {
Check(context.Context, digest.Digest) (bool, error)
}

type extractionUpdate struct {
desc ocispec.Descriptor
progress int64
}

// NewProgressTracker tracks content download progress
func NewProgressTracker(root, transferState string) *ProgressTracker {
return &ProgressTracker{
root: root,
transferState: transferState,
added: make(chan jobUpdate, 1),
extraction: make(chan extractionUpdate, 1),
waitC: make(chan struct{}),
parents: map[digest.Digest][]ocispec.Descriptor{},
}
Expand All @@ -97,7 +106,8 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre
log.G(ctx).WithError(err).Error("failed to get statuses for progress")
}
for dgst, job := range jobs {
if job.state != jobComplete {
switch job.state {
case jobAdded, jobInProgress:
status, ok := active.Status(job.name)
if ok {
if status.Offset > job.progress {
Expand Down Expand Up @@ -130,6 +140,30 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre
jobs[dgst] = job
}
}
case jobExtracting:
if job.progress == job.desc.Size {
pf(transfer.Progress{
Event: "extracted",
Name: job.name,
Parents: job.parents,
Progress: job.desc.Size,
Total: job.desc.Size,
Desc: &job.desc,
})
job.state = jobExtracted
jobs[dgst] = job
} else {
pf(transfer.Progress{
Event: "extracting",
Name: job.name,
Parents: job.parents,
Progress: job.progress,
Total: job.desc.Size,
Desc: &job.desc,
})
}
case jobComplete, jobExtracted:
// No progress to send
}
}
}
Expand Down Expand Up @@ -161,10 +195,9 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre
}
jobs[update.desc.Digest] = job
pf(transfer.Progress{
Event: "waiting",
Name: name,
Parents: parents,
//Digest: desc.Digest.String(),
Event: "waiting",
Name: name,
Parents: parents,
Progress: 0,
Total: update.desc.Size,
Desc: &job.desc,
Expand All @@ -181,7 +214,37 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre
job.state = jobComplete
job.progress = job.desc.Size
}
case extraction := <-j.extraction:
job, ok := jobs[extraction.desc.Digest]
if !ok {
// Only captures the parents defined before,
// could handle parent updates in same thread
// if there is a synchronization issue
var parents []string
j.parentL.Lock()
for _, parent := range j.parents[extraction.desc.Digest] {
parents = append(parents, remotes.MakeRefKey(ctx, parent))
}
j.parentL.Unlock()
if len(parents) == 0 {
parents = []string{j.root}
}
name := remotes.MakeRefKey(ctx, extraction.desc)

job = &jobStatus{
state: jobExtracting,
name: name,
parents: parents,
progress: extraction.progress,
desc: extraction.desc,
}
jobs[extraction.desc.Digest] = job
} else {
if job.state != jobExtracting {
job.state = jobExtracting
}
job.progress = extraction.progress
}
case <-tc.C:
update()
// Next timer?
Expand Down Expand Up @@ -226,6 +289,16 @@ func (j *ProgressTracker) AddChildren(desc ocispec.Descriptor, children []ocispe

}

func (j *ProgressTracker) ExtractProgress(desc ocispec.Descriptor, progress int64) {
if j == nil {
return
}
j.extraction <- extractionUpdate{
desc: desc,
progress: progress,
}
}

func (j *ProgressTracker) Wait() {
// timeout rather than rely on cancel
timeout := time.After(10 * time.Second)
Expand Down
4 changes: 4 additions & 0 deletions core/transfer/local/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"

"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/diff"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
"github.com/containerd/containerd/v2/core/remotes/docker"
Expand Down Expand Up @@ -200,6 +201,9 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
if v, ok := mu.SnapshotterExports["enable_remote_snapshot_annotations"]; ok && v == "true" {
enableRemoteSnapshotAnnotations = true
}
if progressTracker != nil {
mu.ApplyOpts = append(mu.ApplyOpts, diff.WithProgress(progressTracker.ExtractProgress))
}
uopts = append(uopts, unpack.WithUnpackPlatform(mu))
} else {
log.G(ctx).WithFields(log.Fields{
Expand Down
Loading