case <-ctx.Done():
theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
// Our pipe might be stuck in Write(), waiting for
- // io.Copy() to read. If so, un-stick it. This means
+ // PutReader() to read. If so, un-stick it. This means
// PutReader will get corrupt data, but that's OK: the
// size and MD5 won't match, so the write will fail.
go io.Copy(ioutil.Discard, bufr)
theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
return ctx.Err()
case <-ready:
+ // Unblock pipe in case PutReader did not consume it.
+ io.Copy(ioutil.Discard, bufr)
return v.translateError(err)
}
}
atomic.AddInt64(&blocksDeleted, 1)
_, err = v.bucket.Head(loc, nil)
- if os.IsNotExist(err) {
- err = v.bucket.Del("recent/" + loc)
- if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
- }
- } else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ if err == nil {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+ return
+ }
+ if !os.IsNotExist(v.translateError(err)) {
+ log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ return
+ }
+ err = v.bucket.Del("recent/" + loc)
+ if err != nil {
+ log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
}
}
var wg sync.WaitGroup
- todo := make(chan *s3.Key)
+ todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
wg.Add(1)
go func() {