diff --git a/server/upload.go b/server/upload.go index 3666d7d5..f4dda888 100644 --- a/server/upload.go +++ b/server/upload.go @@ -123,19 +123,6 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { defer blobUploadManager.Delete(b.Digest) ctx, b.CancelFunc = context.WithCancel(ctx) - p, err := GetBlobsPath(b.Digest) - if err != nil { - b.err = err - return - } - - f, err := os.Open(p) - if err != nil { - b.err = err - return - } - defer f.Close() - g, inner := errgroup.WithContext(ctx) g.SetLimit(numUploadParts) for i := range b.Parts { @@ -146,7 +133,6 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { g.Go(func() error { var err error for try := 0; try < maxRetries; try++ { - part.ReadSeeker = io.NewSectionReader(f, part.Offset, part.Size) err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts) switch { case errors.Is(err, context.Canceled): @@ -209,18 +195,27 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { } func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error { - part.Reset() - headers := make(http.Header) headers.Set("Content-Type", "application/octet-stream") headers.Set("Content-Length", fmt.Sprintf("%d", part.Size)) - headers.Set("X-Redirect-Uploads", "1") if method == http.MethodPatch { + headers.Set("X-Redirect-Uploads", "1") headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1)) } - resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(part.ReadSeeker, io.MultiWriter(part, part.Hash)), opts) + p, err := GetBlobsPath(b.Digest) + if err != nil { + return err + } + + f, err := os.Open(p) + if err != nil { + return err + } + defer f.Close() + + resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(io.NewSectionReader(f, part.Offset, part.Size), part), opts) if err != nil { return err } @@ -335,7 +330,6 @@ type blobUploadPart struct { written int64 - io.ReadSeeker *blobUpload } @@ -343,14 +337,20 @@ func (p *blobUploadPart) Write(b []byte) (n int, err error) { n = len(b) p.written += int64(n) p.Completed.Add(int64(n)) + + if p.Hash == nil { + p.Hash = md5.New() + } + + p.Hash.Write(b) + return n, nil } func (p *blobUploadPart) Reset() { - p.Seek(0, io.SeekStart) p.Completed.Add(-int64(p.written)) p.written = 0 - p.Hash = md5.New() + p.Hash.Reset() } func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {