10467: Fix context error not propagated.
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/base64"
7         "encoding/hex"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/AdRoll/goamz/aws"
22         "github.com/AdRoll/goamz/s3"
23 )
24
25 var (
26         // ErrS3TrashDisabled is returned by Trash if that operation
27         // is impossible with the current config.
28         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
29
30         s3AccessKeyFile string
31         s3SecretKeyFile string
32         s3RegionName    string
33         s3Endpoint      string
34         s3Replication   int
35         s3UnsafeDelete  bool
36         s3RaceWindow    time.Duration
37
38         s3ACL = s3.Private
39 )
40
41 const (
42         maxClockSkew  = 600 * time.Second
43         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
44 )
45
46 type s3VolumeAdder struct {
47         *Config
48 }
49
50 // String implements flag.Value
51 func (s *s3VolumeAdder) String() string {
52         return "-"
53 }
54
55 func (s *s3VolumeAdder) Set(bucketName string) error {
56         if bucketName == "" {
57                 return fmt.Errorf("no container name given")
58         }
59         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
60                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
61         }
62         if deprecated.flagSerializeIO {
63                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
64         }
65         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
66                 Bucket:        bucketName,
67                 AccessKeyFile: s3AccessKeyFile,
68                 SecretKeyFile: s3SecretKeyFile,
69                 Endpoint:      s3Endpoint,
70                 Region:        s3RegionName,
71                 RaceWindow:    arvados.Duration(s3RaceWindow),
72                 S3Replication: s3Replication,
73                 UnsafeDelete:  s3UnsafeDelete,
74                 ReadOnly:      deprecated.flagReadonly,
75                 IndexPageSize: 1000,
76         })
77         return nil
78 }
79
80 func s3regions() (okList []string) {
81         for r := range aws.Regions {
82                 okList = append(okList, r)
83         }
84         return
85 }
86
87 func init() {
88         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
89
90         flag.Var(&s3VolumeAdder{theConfig},
91                 "s3-bucket-volume",
92                 "Use the given bucket as a storage volume. Can be given multiple times.")
93         flag.StringVar(
94                 &s3RegionName,
95                 "s3-region",
96                 "",
97                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
98         flag.StringVar(
99                 &s3Endpoint,
100                 "s3-endpoint",
101                 "",
102                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
103         flag.StringVar(
104                 &s3AccessKeyFile,
105                 "s3-access-key-file",
106                 "",
107                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
108         flag.StringVar(
109                 &s3SecretKeyFile,
110                 "s3-secret-key-file",
111                 "",
112                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
113         flag.DurationVar(
114                 &s3RaceWindow,
115                 "s3-race-window",
116                 24*time.Hour,
117                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
118         flag.IntVar(
119                 &s3Replication,
120                 "s3-replication",
121                 2,
122                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
123         flag.BoolVar(
124                 &s3UnsafeDelete,
125                 "s3-unsafe-delete",
126                 false,
127                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
128 }
129
130 // S3Volume implements Volume using an S3 bucket.
131 type S3Volume struct {
132         AccessKeyFile      string
133         SecretKeyFile      string
134         Endpoint           string
135         Region             string
136         Bucket             string
137         LocationConstraint bool
138         IndexPageSize      int
139         S3Replication      int
140         ConnectTimeout     arvados.Duration
141         ReadTimeout        arvados.Duration
142         RaceWindow         arvados.Duration
143         ReadOnly           bool
144         UnsafeDelete       bool
145
146         bucket *s3.Bucket
147
148         startOnce sync.Once
149 }
150
151 // Examples implements VolumeWithExamples.
152 func (*S3Volume) Examples() []Volume {
153         return []Volume{
154                 &S3Volume{
155                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
156                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
157                         Endpoint:       "",
158                         Region:         "us-east-1",
159                         Bucket:         "example-bucket-name",
160                         IndexPageSize:  1000,
161                         S3Replication:  2,
162                         RaceWindow:     arvados.Duration(24 * time.Hour),
163                         ConnectTimeout: arvados.Duration(time.Minute),
164                         ReadTimeout:    arvados.Duration(5 * time.Minute),
165                 },
166                 &S3Volume{
167                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
168                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
169                         Endpoint:       "https://storage.googleapis.com",
170                         Region:         "",
171                         Bucket:         "example-bucket-name",
172                         IndexPageSize:  1000,
173                         S3Replication:  2,
174                         RaceWindow:     arvados.Duration(24 * time.Hour),
175                         ConnectTimeout: arvados.Duration(time.Minute),
176                         ReadTimeout:    arvados.Duration(5 * time.Minute),
177                 },
178         }
179 }
180
181 // Type implements Volume.
182 func (*S3Volume) Type() string {
183         return "S3"
184 }
185
186 // Start populates private fields and verifies the configuration is
187 // valid.
188 func (v *S3Volume) Start() error {
189         region, ok := aws.Regions[v.Region]
190         if v.Endpoint == "" {
191                 if !ok {
192                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
193                 }
194         } else if ok {
195                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
196                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
197         } else {
198                 region = aws.Region{
199                         Name:                 v.Region,
200                         S3Endpoint:           v.Endpoint,
201                         S3LocationConstraint: v.LocationConstraint,
202                 }
203         }
204
205         var err error
206         var auth aws.Auth
207         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
208         if err != nil {
209                 return err
210         }
211         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
212         if err != nil {
213                 return err
214         }
215
216         // Zero timeouts mean "wait forever", which is a bad
217         // default. Default to long timeouts instead.
218         if v.ConnectTimeout == 0 {
219                 v.ConnectTimeout = arvados.Duration(time.Minute)
220         }
221         if v.ReadTimeout == 0 {
222                 v.ReadTimeout = arvados.Duration(10 * time.Minute)
223         }
224
225         client := s3.New(auth, region)
226         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
227         client.ReadTimeout = time.Duration(v.ReadTimeout)
228         v.bucket = &s3.Bucket{
229                 S3:   client,
230                 Name: v.Bucket,
231         }
232         return nil
233 }
234
235 // getReader wraps (Bucket)GetReader.
236 //
237 // In situations where (Bucket)GetReader would fail because the block
238 // disappeared in a Trash race, getReader calls fixRace to recover the
239 // data, and tries again.
240 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
241         rdr, err = v.bucket.GetReader(loc)
242         err = v.translateError(err)
243         if err == nil || !os.IsNotExist(err) {
244                 return
245         }
246         _, err = v.bucket.Head("recent/"+loc, nil)
247         err = v.translateError(err)
248         if err != nil {
249                 // If we can't read recent/X, there's no point in
250                 // trying fixRace. Give up.
251                 return
252         }
253         if !v.fixRace(loc) {
254                 err = os.ErrNotExist
255                 return
256         }
257         rdr, err = v.bucket.GetReader(loc)
258         if err != nil {
259                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
260                 err = v.translateError(err)
261         }
262         return
263 }
264
265 // Get a block: copy the block data into buf, and return the number of
266 // bytes copied.
267 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
268         ready := make(chan bool)
269         var rdr io.ReadCloser
270         var err error
271         go func() {
272                 rdr, err = v.getReader(loc)
273                 close(ready)
274         }()
275         select {
276         case <-ctx.Done():
277                 theConfig.debugLogf("s3: abandoning getReader() because %s", ctx.Err())
278                 return 0, ctx.Err()
279         case <-ready:
280                 if err != nil {
281                         return 0, err
282                 }
283         }
284
285         var n int
286         ready = make(chan bool)
287         go func() {
288                 defer close(ready)
289
290                 defer rdr.Close()
291                 n, err = io.ReadFull(rdr, buf)
292
293                 switch err {
294                 case nil, io.EOF, io.ErrUnexpectedEOF:
295                         err = nil
296                 default:
297                         err = v.translateError(err)
298                 }
299         }()
300         select {
301         case <-ctx.Done():
302                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
303                 rdr.Close()
304                 // Must wait for ReadFull to return, to ensure it
305                 // doesn't write to buf after we return.
306                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
307                 <-ready
308                 return 0, ctx.Err()
309         case <-ready:
310                 return n, err
311         }
312 }
313
314 // Compare the given data with the stored data.
315 func (v *S3Volume) Compare(loc string, expect []byte) error {
316         rdr, err := v.getReader(loc)
317         if err != nil {
318                 return err
319         }
320         defer rdr.Close()
321         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
322 }
323
324 // Put writes a block.
325 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
326         if v.ReadOnly {
327                 return MethodDisabledError
328         }
329         var opts s3.Options
330         size := len(block)
331         if size > 0 {
332                 md5, err := hex.DecodeString(loc)
333                 if err != nil {
334                         return err
335                 }
336                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
337         }
338
339         // Send the block data through a pipe, so that (if we need to)
340         // we can close the pipe early and abandon our PutReader()
341         // goroutine, without worrying about PutReader() accessing our
342         // block buffer after we release it.
343         bufr, bufw := io.Pipe()
344         go func() {
345                 io.Copy(bufw, bytes.NewReader(block))
346                 bufw.Close()
347         }()
348
349         var err error
350         ready := make(chan bool)
351         go func() {
352                 defer func() {
353                         if ctx.Err() != nil {
354                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
355                         }
356                 }()
357                 defer close(ready)
358                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
359                 if err != nil {
360                         return
361                 }
362                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
363         }()
364         select {
365         case <-ctx.Done():
366                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
367                 // Our pipe might be stuck in Write(), waiting for
368                 // io.Copy() to read. If so, un-stick it. This means
369                 // PutReader will get corrupt data, but that's OK: the
370                 // size and MD5 won't match, so the write will fail.
371                 go io.Copy(ioutil.Discard, bufr)
372                 // CloseWithError() will return once pending I/O is done.
373                 bufw.CloseWithError(ctx.Err())
374                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
375                 return ctx.Err()
376         case <-ready:
377                 return v.translateError(err)
378         }
379 }
380
381 // Touch sets the timestamp for the given locator to the current time.
382 func (v *S3Volume) Touch(loc string) error {
383         if v.ReadOnly {
384                 return MethodDisabledError
385         }
386         _, err := v.bucket.Head(loc, nil)
387         err = v.translateError(err)
388         if os.IsNotExist(err) && v.fixRace(loc) {
389                 // The data object got trashed in a race, but fixRace
390                 // rescued it.
391         } else if err != nil {
392                 return err
393         }
394         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
395         return v.translateError(err)
396 }
397
398 // Mtime returns the stored timestamp for the given locator.
399 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
400         _, err := v.bucket.Head(loc, nil)
401         if err != nil {
402                 return zeroTime, v.translateError(err)
403         }
404         resp, err := v.bucket.Head("recent/"+loc, nil)
405         err = v.translateError(err)
406         if os.IsNotExist(err) {
407                 // The data object X exists, but recent/X is missing.
408                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
409                 if err != nil {
410                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
411                         return zeroTime, v.translateError(err)
412                 }
413                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
414                 resp, err = v.bucket.Head("recent/"+loc, nil)
415                 if err != nil {
416                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
417                         return zeroTime, v.translateError(err)
418                 }
419         } else if err != nil {
420                 // HEAD recent/X failed for some other reason.
421                 return zeroTime, err
422         }
423         return v.lastModified(resp)
424 }
425
426 // IndexTo writes a complete list of locators with the given prefix
427 // for which Get() can retrieve data.
428 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
429         // Use a merge sort to find matching sets of X and recent/X.
430         dataL := s3Lister{
431                 Bucket:   v.bucket,
432                 Prefix:   prefix,
433                 PageSize: v.IndexPageSize,
434         }
435         recentL := s3Lister{
436                 Bucket:   v.bucket,
437                 Prefix:   "recent/" + prefix,
438                 PageSize: v.IndexPageSize,
439         }
440         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
441                 if data.Key >= "g" {
442                         // Conveniently, "recent/*" and "trash/*" are
443                         // lexically greater than all hex-encoded data
444                         // hashes, so stopping here avoids iterating
445                         // over all of them needlessly with dataL.
446                         break
447                 }
448                 if !v.isKeepBlock(data.Key) {
449                         continue
450                 }
451
452                 // stamp is the list entry we should use to report the
453                 // last-modified time for this data block: it will be
454                 // the recent/X entry if one exists, otherwise the
455                 // entry for the data block itself.
456                 stamp := data
457
458                 // Advance to the corresponding recent/X marker, if any
459                 for recent != nil {
460                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
461                                 recent = recentL.Next()
462                                 continue
463                         } else if cmp == 0 {
464                                 stamp = recent
465                                 recent = recentL.Next()
466                                 break
467                         } else {
468                                 // recent/X marker is missing: we'll
469                                 // use the timestamp on the data
470                                 // object.
471                                 break
472                         }
473                 }
474                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
475                 if err != nil {
476                         return err
477                 }
478                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
479         }
480         return nil
481 }
482
483 // Trash a Keep block.
484 func (v *S3Volume) Trash(loc string) error {
485         if v.ReadOnly {
486                 return MethodDisabledError
487         }
488         if t, err := v.Mtime(loc); err != nil {
489                 return err
490         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
491                 return nil
492         }
493         if theConfig.TrashLifetime == 0 {
494                 if !s3UnsafeDelete {
495                         return ErrS3TrashDisabled
496                 }
497                 return v.bucket.Del(loc)
498         }
499         err := v.checkRaceWindow(loc)
500         if err != nil {
501                 return err
502         }
503         err = v.safeCopy("trash/"+loc, loc)
504         if err != nil {
505                 return err
506         }
507         return v.translateError(v.bucket.Del(loc))
508 }
509
510 // checkRaceWindow returns a non-nil error if trash/loc is, or might
511 // be, in the race window (i.e., it's not safe to trash loc).
512 func (v *S3Volume) checkRaceWindow(loc string) error {
513         resp, err := v.bucket.Head("trash/"+loc, nil)
514         err = v.translateError(err)
515         if os.IsNotExist(err) {
516                 // OK, trash/X doesn't exist so we're not in the race
517                 // window
518                 return nil
519         } else if err != nil {
520                 // Error looking up trash/X. We don't know whether
521                 // we're in the race window
522                 return err
523         }
524         t, err := v.lastModified(resp)
525         if err != nil {
526                 // Can't parse timestamp
527                 return err
528         }
529         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
530         if safeWindow <= 0 {
531                 // We can't count on "touch trash/X" to prolong
532                 // trash/X's lifetime. The new timestamp might not
533                 // become visible until now+raceWindow, and EmptyTrash
534                 // is allowed to delete trash/X before then.
535                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
536         }
537         // trash/X exists, but it won't be eligible for deletion until
538         // after now+raceWindow, so it's safe to overwrite it.
539         return nil
540 }
541
542 // safeCopy calls PutCopy, and checks the response to make sure the
543 // copy succeeded and updated the timestamp on the destination object
544 // (PutCopy returns 200 OK if the request was received, even if the
545 // copy failed).
546 func (v *S3Volume) safeCopy(dst, src string) error {
547         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
548                 ContentType:       "application/octet-stream",
549                 MetadataDirective: "REPLACE",
550         }, v.bucket.Name+"/"+src)
551         err = v.translateError(err)
552         if err != nil {
553                 return err
554         }
555         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
556                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
557         } else if time.Now().Sub(t) > maxClockSkew {
558                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
559         }
560         return nil
561 }
562
563 // Get the LastModified header from resp, and parse it as RFC1123 or
564 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
565 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
566         s := resp.Header.Get("Last-Modified")
567         t, err = time.Parse(time.RFC1123, s)
568         if err != nil && s != "" {
569                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
570                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
571                 // as required by HTTP spec. If it's not a valid HTTP
572                 // header value, it's probably AWS (or s3test) giving
573                 // us a nearly-RFC1123 timestamp.
574                 t, err = time.Parse(nearlyRFC1123, s)
575         }
576         return
577 }
578
579 // Untrash moves block from trash back into store
580 func (v *S3Volume) Untrash(loc string) error {
581         err := v.safeCopy(loc, "trash/"+loc)
582         if err != nil {
583                 return err
584         }
585         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
586         return v.translateError(err)
587 }
588
589 // Status returns a *VolumeStatus representing the current in-use
590 // storage capacity and a fake available capacity that doesn't make
591 // the volume seem full or nearly-full.
592 func (v *S3Volume) Status() *VolumeStatus {
593         return &VolumeStatus{
594                 DeviceNum: 1,
595                 BytesFree: BlockSize * 1000,
596                 BytesUsed: 1,
597         }
598 }
599
600 // String implements fmt.Stringer.
601 func (v *S3Volume) String() string {
602         return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
603 }
604
605 // Writable returns false if all future Put, Mtime, and Delete calls
606 // are expected to fail.
607 func (v *S3Volume) Writable() bool {
608         return !v.ReadOnly
609 }
610
611 // Replication returns the storage redundancy of the underlying
612 // device. Configured via command line flag.
613 func (v *S3Volume) Replication() int {
614         return v.S3Replication
615 }
616
617 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
618
619 func (v *S3Volume) isKeepBlock(s string) bool {
620         return s3KeepBlockRegexp.MatchString(s)
621 }
622
623 // fixRace(X) is called when "recent/X" exists but "X" doesn't
624 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
625 // there was a race between Put and Trash, fixRace recovers from the
626 // race by Untrashing the block.
627 func (v *S3Volume) fixRace(loc string) bool {
628         trash, err := v.bucket.Head("trash/"+loc, nil)
629         if err != nil {
630                 if !os.IsNotExist(v.translateError(err)) {
631                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
632                 }
633                 return false
634         }
635         trashTime, err := v.lastModified(trash)
636         if err != nil {
637                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
638                 return false
639         }
640
641         recent, err := v.bucket.Head("recent/"+loc, nil)
642         if err != nil {
643                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
644                 return false
645         }
646         recentTime, err := v.lastModified(recent)
647         if err != nil {
648                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
649                 return false
650         }
651
652         ageWhenTrashed := trashTime.Sub(recentTime)
653         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
654                 // No evidence of a race: block hasn't been written
655                 // since it became eligible for Trash. No fix needed.
656                 return false
657         }
658
659         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
660         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
661         err = v.safeCopy(loc, "trash/"+loc)
662         if err != nil {
663                 log.Printf("error: fixRace: %s", err)
664                 return false
665         }
666         return true
667 }
668
669 func (v *S3Volume) translateError(err error) error {
670         switch err := err.(type) {
671         case *s3.Error:
672                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
673                         strings.Contains(err.Error(), "Not Found") {
674                         return os.ErrNotExist
675                 }
676                 // Other 404 errors like NoSuchVersion and
677                 // NoSuchBucket are different problems which should
678                 // get called out downstream, so we don't convert them
679                 // to os.ErrNotExist.
680         }
681         return err
682 }
683
684 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
685 // and deletes them from the volume.
686 func (v *S3Volume) EmptyTrash() {
687         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
688
689         // Use a merge sort to find matching sets of trash/X and recent/X.
690         trashL := s3Lister{
691                 Bucket:   v.bucket,
692                 Prefix:   "trash/",
693                 PageSize: v.IndexPageSize,
694         }
695         // Define "ready to delete" as "...when EmptyTrash started".
696         startT := time.Now()
697         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
698                 loc := trash.Key[6:]
699                 if !v.isKeepBlock(loc) {
700                         continue
701                 }
702                 bytesInTrash += trash.Size
703                 blocksInTrash++
704
705                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
706                 if err != nil {
707                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
708                         continue
709                 }
710                 recent, err := v.bucket.Head("recent/"+loc, nil)
711                 if err != nil && os.IsNotExist(v.translateError(err)) {
712                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
713                         err = v.Untrash(loc)
714                         if err != nil {
715                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
716                         }
717                         continue
718                 } else if err != nil {
719                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
720                         continue
721                 }
722                 recentT, err := v.lastModified(recent)
723                 if err != nil {
724                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
725                         continue
726                 }
727                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
728                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
729                                 // recent/loc is too old to protect
730                                 // loc from being Trashed again during
731                                 // the raceWindow that starts if we
732                                 // delete trash/X now.
733                                 //
734                                 // Note this means (TrashCheckInterval
735                                 // < BlobSignatureTTL - raceWindow) is
736                                 // necessary to avoid starvation.
737                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
738                                 v.fixRace(loc)
739                                 v.Touch(loc)
740                                 continue
741                         } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
742                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
743                                 v.fixRace(loc)
744                                 continue
745                         } else if err != nil {
746                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
747                                 continue
748                         }
749                 }
750                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
751                         continue
752                 }
753                 err = v.bucket.Del(trash.Key)
754                 if err != nil {
755                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
756                         continue
757                 }
758                 bytesDeleted += trash.Size
759                 blocksDeleted++
760
761                 _, err = v.bucket.Head(loc, nil)
762                 if os.IsNotExist(err) {
763                         err = v.bucket.Del("recent/" + loc)
764                         if err != nil {
765                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
766                         }
767                 } else if err != nil {
768                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
769                 }
770         }
771         if err := trashL.Error(); err != nil {
772                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
773         }
774         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
775 }
776
777 type s3Lister struct {
778         Bucket     *s3.Bucket
779         Prefix     string
780         PageSize   int
781         nextMarker string
782         buf        []s3.Key
783         err        error
784 }
785
786 // First fetches the first page and returns the first item. It returns
787 // nil if the response is the empty set or an error occurs.
788 func (lister *s3Lister) First() *s3.Key {
789         lister.getPage()
790         return lister.pop()
791 }
792
793 // Next returns the next item, fetching the next page if necessary. It
794 // returns nil if the last available item has already been fetched, or
795 // an error occurs.
796 func (lister *s3Lister) Next() *s3.Key {
797         if len(lister.buf) == 0 && lister.nextMarker != "" {
798                 lister.getPage()
799         }
800         return lister.pop()
801 }
802
803 // Return the most recent error encountered by First or Next.
804 func (lister *s3Lister) Error() error {
805         return lister.err
806 }
807
808 func (lister *s3Lister) getPage() {
809         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
810         lister.nextMarker = ""
811         if err != nil {
812                 lister.err = err
813                 return
814         }
815         if resp.IsTruncated {
816                 lister.nextMarker = resp.NextMarker
817         }
818         lister.buf = make([]s3.Key, 0, len(resp.Contents))
819         for _, key := range resp.Contents {
820                 if !strings.HasPrefix(key.Key, lister.Prefix) {
821                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
822                         continue
823                 }
824                 lister.buf = append(lister.buf, key)
825         }
826 }
827
828 func (lister *s3Lister) pop() (k *s3.Key) {
829         if len(lister.buf) > 0 {
830                 k = &lister.buf[0]
831                 lister.buf = lister.buf[1:]
832         }
833         return
834 }