From 84725ec7e3195e0d9ef2dece5ff4f8d8db5fb472 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Wed, 1 Nov 2023 17:10:21 -0700 Subject: [PATCH 1/2] refactor part reset --- server/upload.go | 55 ++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/server/upload.go b/server/upload.go index 7cd7aadf..c6e0656b 100644 --- a/server/upload.go +++ b/server/upload.go @@ -40,14 +40,6 @@ type blobUpload struct { references atomic.Int32 } -type blobUploadPart struct { - // N is the part number - N int - Offset int64 - Size int64 - hash.Hash -} - const ( numUploadParts = 64 minUploadPartSize int64 = 95 * 1000 * 1000 @@ -100,7 +92,7 @@ func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *Reg } // set part.N to the current number of parts - b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size, Hash: md5.New()}) + b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size}) offset += size } @@ -144,8 +136,8 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { case requestURL := <-b.nextURL: g.Go(func() error { for try := 0; try < maxRetries; try++ { - r := io.NewSectionReader(f, part.Offset, part.Size) - err := b.uploadChunk(inner, http.MethodPatch, requestURL, r, part, opts) + part.ReadSeeker = io.NewSectionReader(f, part.Offset, part.Size) + err := b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts) switch { case errors.Is(err, context.Canceled): return err @@ -197,7 +189,9 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { b.done = true } -func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, rs io.ReadSeeker, part *blobUploadPart, opts *RegistryOptions) error { +func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error { + part.Reset() + headers := make(http.Header) headers.Set("Content-Type", "application/octet-stream") headers.Set("Content-Length", fmt.Sprintf("%d", part.Size)) @@ -207,8 +201,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1)) } - buw := blobUploadWriter{blobUpload: b} - resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(rs, io.MultiWriter(&buw, part.Hash)), opts) + resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(part.ReadSeeker, io.MultiWriter(part, part.Hash)), opts) if err != nil { return err } @@ -234,11 +227,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL } for try := 0; try < maxRetries; try++ { - rs.Seek(0, io.SeekStart) - b.Completed.Add(-buw.written) - buw.written = 0 - part.Hash = md5.New() - err := b.uploadChunk(ctx, http.MethodPut, redirectURL, rs, part, nil) + err := b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil) switch { case errors.Is(err, context.Canceled): return err @@ -270,9 +259,6 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL return err } - rs.Seek(0, io.SeekStart) - b.Completed.Add(-buw.written) - buw.written = 0 return fmt.Errorf("http status %d %s: %s", resp.StatusCode, resp.Status, body) } @@ -318,18 +304,33 @@ func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) er } } -type blobUploadWriter struct { +type blobUploadPart struct { + // N is the part number + N int + Offset int64 + Size int64 + hash.Hash + written int64 + + io.ReadSeeker *blobUpload } -func (b *blobUploadWriter) Write(p []byte) (n int, err error) { - n = len(p) - b.written += int64(n) - b.Completed.Add(int64(n)) +func (p *blobUploadPart) Write(b []byte) (n int, err error) { + n = len(b) + p.written += int64(n) + p.Completed.Add(int64(n)) return n, nil } +func (p *blobUploadPart) Reset() { + p.Seek(0, io.SeekStart) + p.Completed.Add(-int64(p.written)) + p.written = 0 + p.Hash = md5.New() +} + func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error { requestURL := mp.BaseURL() requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest) From 434a6f9d46aef19d50306adc6e7cba20538db989 Mon Sep 17 00:00:00 2001 From: Michael Yang Date: Fri, 3 Nov 2023 16:49:51 -0700 Subject: [PATCH 2/2] return last error --- server/download.go | 5 +++-- server/upload.go | 9 +++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/server/download.go b/server/download.go index 68798afd..f9d47e49 100644 --- a/server/download.go +++ b/server/download.go @@ -149,9 +149,10 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis i := i g.Go(func() error { + var err error for try := 0; try < maxRetries; try++ { w := io.NewOffsetWriter(file, part.StartsAt()) - err := b.downloadChunk(inner, requestURL, w, part, opts) + err = b.downloadChunk(inner, requestURL, w, part, opts) switch { case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC): // return immediately if the context is canceled or the device is out of space @@ -164,7 +165,7 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis } } - return errMaxRetriesExceeded + return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err) }) } diff --git a/server/upload.go b/server/upload.go index c6e0656b..04575560 100644 --- a/server/upload.go +++ b/server/upload.go @@ -135,9 +135,10 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { case <-inner.Done(): case requestURL := <-b.nextURL: g.Go(func() error { + var err error for try := 0; try < maxRetries; try++ { part.ReadSeeker = io.NewSectionReader(f, part.Offset, part.Size) - err := b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts) + err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts) switch { case errors.Is(err, context.Canceled): return err @@ -151,7 +152,7 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { return nil } - return errMaxRetriesExceeded + return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err) }) } } @@ -227,7 +228,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL } for try := 0; try < maxRetries; try++ { - err := b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil) + err = b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil) switch { case errors.Is(err, context.Canceled): return err @@ -241,7 +242,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL return nil } - return errMaxRetriesExceeded + return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err) case resp.StatusCode == http.StatusUnauthorized: auth := resp.Header.Get("www-authenticate")