X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/71e686d5be7e425ab3358505b8edf5098a2f09f2..1d36bef0f6e0a64d4d7660f5db8e9625d99302c6:/services/keepstore/s3_volume.go?ds=sidebyside diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 532a0823e2..bdab58927b 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -429,7 +429,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { case <-ctx.Done(): theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err()) // Our pipe might be stuck in Write(), waiting for - // io.Copy() to read. If so, un-stick it. This means + // PutReader() to read. If so, un-stick it. This means // PutReader will get corrupt data, but that's OK: the // size and MD5 won't match, so the write will fail. go io.Copy(ioutil.Discard, bufr) @@ -438,6 +438,8 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { theConfig.debugLogf("%s: abandoning PutReader goroutine", v) return ctx.Err() case <-ready: + // Unblock pipe in case PutReader did not consume it. + io.Copy(ioutil.Discard, bufr) return v.translateError(err) } } @@ -835,18 +837,22 @@ func (v *S3Volume) EmptyTrash() { atomic.AddInt64(&blocksDeleted, 1) _, err = v.bucket.Head(loc, nil) - if os.IsNotExist(err) { - err = v.bucket.Del("recent/" + loc) - if err != nil { - log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err) - } - } else if err != nil { - log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err) + if err == nil { + log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc) + return + } + if !os.IsNotExist(v.translateError(err)) { + log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err) + return + } + err = v.bucket.Del("recent/" + loc) + if err != nil { + log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err) } } var wg sync.WaitGroup - todo := make(chan *s3.Key) + todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers) for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ { wg.Add(1) go func() {