This commit is contained in:
Michael Yang 2023-10-20 13:11:25 -07:00
parent 4e09aab8b9
commit 12efcbb057

View file

@ -95,6 +95,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg
size = fi.Size() - offset size = fi.Size() - offset
} }
// set part.N to the current number of parts
b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size}) b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size})
offset += size offset += size
} }
@ -111,22 +112,22 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg
return nil return nil
} }
// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded
// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error.
func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
b.err = b.run(ctx, opts)
}
func (b *blobUpload) run(ctx context.Context, opts *RegistryOptions) error {
defer blobUploadManager.Delete(b.Digest) defer blobUploadManager.Delete(b.Digest)
ctx, b.CancelFunc = context.WithCancel(ctx) ctx, b.CancelFunc = context.WithCancel(ctx)
p, err := GetBlobsPath(b.Digest) p, err := GetBlobsPath(b.Digest)
if err != nil { if err != nil {
return err b.err = err
return
} }
f, err := os.Open(p) f, err := os.Open(p)
if err != nil { if err != nil {
return err b.err = err
return
} }
defer f.Close() defer f.Close()
@ -134,7 +135,9 @@ func (b *blobUpload) run(ctx context.Context, opts *RegistryOptions) error {
g.SetLimit(numUploadParts) g.SetLimit(numUploadParts)
for i := range b.Parts { for i := range b.Parts {
part := &b.Parts[i] part := &b.Parts[i]
requestURL := <-b.nextURL select {
case <-inner.Done():
case requestURL := <-b.nextURL:
g.Go(func() error { g.Go(func() error {
for try := 0; try < maxRetries; try++ { for try := 0; try < maxRetries; try++ {
r := io.NewSectionReader(f, part.Offset, part.Size) r := io.NewSectionReader(f, part.Offset, part.Size)
@ -155,9 +158,11 @@ func (b *blobUpload) run(ctx context.Context, opts *RegistryOptions) error {
return errMaxRetriesExceeded return errMaxRetriesExceeded
}) })
} }
}
if err := g.Wait(); err != nil { if err := g.Wait(); err != nil {
return err b.err = err
return
} }
requestURL := <-b.nextURL requestURL := <-b.nextURL
@ -172,12 +177,12 @@ func (b *blobUpload) run(ctx context.Context, opts *RegistryOptions) error {
resp, err := makeRequest(ctx, "PUT", requestURL, headers, nil, opts) resp, err := makeRequest(ctx, "PUT", requestURL, headers, nil, opts)
if err != nil { if err != nil {
return err b.err = err
return
} }
defer resp.Body.Close() defer resp.Body.Close()
b.done = true b.done = true
return nil
} }
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, rs io.ReadSeeker, part *blobUploadPart, opts *RegistryOptions) error { func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, rs io.ReadSeeker, part *blobUploadPart, opts *RegistryOptions) error {
@ -219,6 +224,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
for try := 0; try < maxRetries; try++ { for try := 0; try < maxRetries; try++ {
rs.Seek(0, io.SeekStart) rs.Seek(0, io.SeekStart)
b.Completed.Add(-buw.written) b.Completed.Add(-buw.written)
buw.written = 0
err := b.uploadChunk(ctx, http.MethodPut, redirectURL, rs, part, nil) err := b.uploadChunk(ctx, http.MethodPut, redirectURL, rs, part, nil)
switch { switch {
case errors.Is(err, context.Canceled): case errors.Is(err, context.Canceled):
@ -253,6 +259,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
rs.Seek(0, io.SeekStart) rs.Seek(0, io.SeekStart)
b.Completed.Add(-buw.written) b.Completed.Add(-buw.written)
buw.written = 0
return fmt.Errorf("http status %d %s: %s", resp.StatusCode, resp.Status, body) return fmt.Errorf("http status %d %s: %s", resp.StatusCode, resp.Status, body)
} }