10467: Abort S3 and release buffer if caller disconnects during S3 PUT request.
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/base64"
7         "encoding/hex"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/AdRoll/goamz/aws"
22         "github.com/AdRoll/goamz/s3"
23 )
24
25 var (
26         // ErrS3TrashDisabled is returned by Trash if that operation
27         // is impossible with the current config.
28         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
29
30         s3AccessKeyFile string
31         s3SecretKeyFile string
32         s3RegionName    string
33         s3Endpoint      string
34         s3Replication   int
35         s3UnsafeDelete  bool
36         s3RaceWindow    time.Duration
37
38         s3ACL = s3.Private
39 )
40
41 const (
42         maxClockSkew  = 600 * time.Second
43         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
44 )
45
46 type s3VolumeAdder struct {
47         *Config
48 }
49
50 // String implements flag.Value
51 func (s *s3VolumeAdder) String() string {
52         return "-"
53 }
54
55 func (s *s3VolumeAdder) Set(bucketName string) error {
56         if bucketName == "" {
57                 return fmt.Errorf("no container name given")
58         }
59         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
60                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
61         }
62         if deprecated.flagSerializeIO {
63                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
64         }
65         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
66                 Bucket:        bucketName,
67                 AccessKeyFile: s3AccessKeyFile,
68                 SecretKeyFile: s3SecretKeyFile,
69                 Endpoint:      s3Endpoint,
70                 Region:        s3RegionName,
71                 RaceWindow:    arvados.Duration(s3RaceWindow),
72                 S3Replication: s3Replication,
73                 UnsafeDelete:  s3UnsafeDelete,
74                 ReadOnly:      deprecated.flagReadonly,
75                 IndexPageSize: 1000,
76         })
77         return nil
78 }
79
80 func s3regions() (okList []string) {
81         for r := range aws.Regions {
82                 okList = append(okList, r)
83         }
84         return
85 }
86
87 func init() {
88         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
89
90         flag.Var(&s3VolumeAdder{theConfig},
91                 "s3-bucket-volume",
92                 "Use the given bucket as a storage volume. Can be given multiple times.")
93         flag.StringVar(
94                 &s3RegionName,
95                 "s3-region",
96                 "",
97                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
98         flag.StringVar(
99                 &s3Endpoint,
100                 "s3-endpoint",
101                 "",
102                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
103         flag.StringVar(
104                 &s3AccessKeyFile,
105                 "s3-access-key-file",
106                 "",
107                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
108         flag.StringVar(
109                 &s3SecretKeyFile,
110                 "s3-secret-key-file",
111                 "",
112                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
113         flag.DurationVar(
114                 &s3RaceWindow,
115                 "s3-race-window",
116                 24*time.Hour,
117                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
118         flag.IntVar(
119                 &s3Replication,
120                 "s3-replication",
121                 2,
122                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
123         flag.BoolVar(
124                 &s3UnsafeDelete,
125                 "s3-unsafe-delete",
126                 false,
127                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
128 }
129
130 // S3Volume implements Volume using an S3 bucket.
131 type S3Volume struct {
132         AccessKeyFile      string
133         SecretKeyFile      string
134         Endpoint           string
135         Region             string
136         Bucket             string
137         LocationConstraint bool
138         IndexPageSize      int
139         S3Replication      int
140         ConnectTimeout     arvados.Duration
141         ReadTimeout        arvados.Duration
142         RaceWindow         arvados.Duration
143         ReadOnly           bool
144         UnsafeDelete       bool
145
146         bucket *s3.Bucket
147
148         startOnce sync.Once
149 }
150
151 // Examples implements VolumeWithExamples.
152 func (*S3Volume) Examples() []Volume {
153         return []Volume{
154                 &S3Volume{
155                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
156                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
157                         Endpoint:       "",
158                         Region:         "us-east-1",
159                         Bucket:         "example-bucket-name",
160                         IndexPageSize:  1000,
161                         S3Replication:  2,
162                         RaceWindow:     arvados.Duration(24 * time.Hour),
163                         ConnectTimeout: arvados.Duration(time.Minute),
164                         ReadTimeout:    arvados.Duration(5 * time.Minute),
165                 },
166                 &S3Volume{
167                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
168                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
169                         Endpoint:       "https://storage.googleapis.com",
170                         Region:         "",
171                         Bucket:         "example-bucket-name",
172                         IndexPageSize:  1000,
173                         S3Replication:  2,
174                         RaceWindow:     arvados.Duration(24 * time.Hour),
175                         ConnectTimeout: arvados.Duration(time.Minute),
176                         ReadTimeout:    arvados.Duration(5 * time.Minute),
177                 },
178         }
179 }
180
181 // Type implements Volume.
182 func (*S3Volume) Type() string {
183         return "S3"
184 }
185
186 // Start populates private fields and verifies the configuration is
187 // valid.
188 func (v *S3Volume) Start() error {
189         region, ok := aws.Regions[v.Region]
190         if v.Endpoint == "" {
191                 if !ok {
192                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
193                 }
194         } else if ok {
195                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
196                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
197         } else {
198                 region = aws.Region{
199                         Name:                 v.Region,
200                         S3Endpoint:           v.Endpoint,
201                         S3LocationConstraint: v.LocationConstraint,
202                 }
203         }
204
205         var err error
206         var auth aws.Auth
207         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
208         if err != nil {
209                 return err
210         }
211         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
212         if err != nil {
213                 return err
214         }
215
216         // Zero timeouts mean "wait forever", which is a bad
217         // default. Default to long timeouts instead.
218         if v.ConnectTimeout == 0 {
219                 v.ConnectTimeout = arvados.Duration(time.Minute)
220         }
221         if v.ReadTimeout == 0 {
222                 v.ReadTimeout = arvados.Duration(10 * time.Minute)
223         }
224
225         client := s3.New(auth, region)
226         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
227         client.ReadTimeout = time.Duration(v.ReadTimeout)
228         v.bucket = &s3.Bucket{
229                 S3:   client,
230                 Name: v.Bucket,
231         }
232         return nil
233 }
234
235 // getReader wraps (Bucket)GetReader.
236 //
237 // In situations where (Bucket)GetReader would fail because the block
238 // disappeared in a Trash race, getReader calls fixRace to recover the
239 // data, and tries again.
240 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
241         rdr, err = v.bucket.GetReader(loc)
242         err = v.translateError(err)
243         if err == nil || !os.IsNotExist(err) {
244                 return
245         }
246         _, err = v.bucket.Head("recent/"+loc, nil)
247         err = v.translateError(err)
248         if err != nil {
249                 // If we can't read recent/X, there's no point in
250                 // trying fixRace. Give up.
251                 return
252         }
253         if !v.fixRace(loc) {
254                 err = os.ErrNotExist
255                 return
256         }
257         rdr, err = v.bucket.GetReader(loc)
258         if err != nil {
259                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
260                 err = v.translateError(err)
261         }
262         return
263 }
264
265 // Get a block: copy the block data into buf, and return the number of
266 // bytes copied.
267 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
268         ready := make(chan bool)
269         var rdr io.ReadCloser
270         var err error
271         go func() {
272                 rdr, err = v.getReader(loc)
273                 close(ready)
274         }()
275         select {
276         case <-ctx.Done():
277                 theConfig.debugLogf("s3: abandoning getReader() because %s", ctx.Err())
278                 return 0, ctx.Err()
279         case <-ready:
280                 if err != nil {
281                         return 0, err
282                 }
283         }
284
285         var n int
286         ready = make(chan bool)
287         go func() {
288                 defer close(ready)
289
290                 defer rdr.Close()
291                 n, err = io.ReadFull(rdr, buf)
292
293                 switch err {
294                 case nil, io.EOF, io.ErrUnexpectedEOF:
295                         err = nil
296                 default:
297                         err = v.translateError(err)
298                 }
299         }()
300         select {
301         case <-ctx.Done():
302                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
303                 rdr.Close()
304                 // Must wait for ReadFull to return, to ensure it
305                 // doesn't write to buf after we return.
306                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
307                 <-ready
308                 return 0, ctx.Err()
309         case <-ready:
310                 return n, err
311         }
312 }
313
314 // Compare the given data with the stored data.
315 func (v *S3Volume) Compare(loc string, expect []byte) error {
316         rdr, err := v.getReader(loc)
317         if err != nil {
318                 return err
319         }
320         defer rdr.Close()
321         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
322 }
323
324 // Put writes a block.
325 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
326         if v.ReadOnly {
327                 return MethodDisabledError
328         }
329         var opts s3.Options
330         size := len(block)
331         if size > 0 {
332                 md5, err := hex.DecodeString(loc)
333                 if err != nil {
334                         return err
335                 }
336                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
337         }
338
339         // Send the block data through a pipe, so that (if we need to)
340         // we can close the pipe early and abandon our PutReader()
341         // goroutine, without worrying about PutReader() accessing our
342         // block buffer after we release it.
343         bufr, bufw := io.Pipe()
344         go func() {
345                 io.Copy(bufw, bytes.NewReader(block))
346                 bufw.Close()
347         }()
348
349         var err error
350         ready := make(chan bool)
351         go func() {
352                 defer func() {
353                         select {
354                         case <-ctx.Done():
355                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
356                         default:
357                         }
358                 }()
359                 defer close(ready)
360                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
361                 if err != nil {
362                         err = v.translateError(err)
363                         return
364                 }
365                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
366                 err = v.translateError(err)
367         }()
368         select {
369         case <-ctx.Done():
370                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
371                 // Our pipe might be stuck in Write(), waiting for
372                 // io.Copy() to read. If so, un-stick it. This means
373                 // PutReader will get corrupt data, but that's OK: the
374                 // size and MD5 won't match, so the write will fail.
375                 go io.Copy(ioutil.Discard, bufr)
376                 // CloseWithError() will return once pending I/O is done.
377                 bufw.CloseWithError(ctx.Err())
378                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
379                 return ctx.Err()
380         case <-ready:
381                 return err
382         }
383 }
384
385 // Touch sets the timestamp for the given locator to the current time.
386 func (v *S3Volume) Touch(loc string) error {
387         if v.ReadOnly {
388                 return MethodDisabledError
389         }
390         _, err := v.bucket.Head(loc, nil)
391         err = v.translateError(err)
392         if os.IsNotExist(err) && v.fixRace(loc) {
393                 // The data object got trashed in a race, but fixRace
394                 // rescued it.
395         } else if err != nil {
396                 return err
397         }
398         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
399         return v.translateError(err)
400 }
401
402 // Mtime returns the stored timestamp for the given locator.
403 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
404         _, err := v.bucket.Head(loc, nil)
405         if err != nil {
406                 return zeroTime, v.translateError(err)
407         }
408         resp, err := v.bucket.Head("recent/"+loc, nil)
409         err = v.translateError(err)
410         if os.IsNotExist(err) {
411                 // The data object X exists, but recent/X is missing.
412                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
413                 if err != nil {
414                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
415                         return zeroTime, v.translateError(err)
416                 }
417                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
418                 resp, err = v.bucket.Head("recent/"+loc, nil)
419                 if err != nil {
420                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
421                         return zeroTime, v.translateError(err)
422                 }
423         } else if err != nil {
424                 // HEAD recent/X failed for some other reason.
425                 return zeroTime, err
426         }
427         return v.lastModified(resp)
428 }
429
430 // IndexTo writes a complete list of locators with the given prefix
431 // for which Get() can retrieve data.
432 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
433         // Use a merge sort to find matching sets of X and recent/X.
434         dataL := s3Lister{
435                 Bucket:   v.bucket,
436                 Prefix:   prefix,
437                 PageSize: v.IndexPageSize,
438         }
439         recentL := s3Lister{
440                 Bucket:   v.bucket,
441                 Prefix:   "recent/" + prefix,
442                 PageSize: v.IndexPageSize,
443         }
444         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
445                 if data.Key >= "g" {
446                         // Conveniently, "recent/*" and "trash/*" are
447                         // lexically greater than all hex-encoded data
448                         // hashes, so stopping here avoids iterating
449                         // over all of them needlessly with dataL.
450                         break
451                 }
452                 if !v.isKeepBlock(data.Key) {
453                         continue
454                 }
455
456                 // stamp is the list entry we should use to report the
457                 // last-modified time for this data block: it will be
458                 // the recent/X entry if one exists, otherwise the
459                 // entry for the data block itself.
460                 stamp := data
461
462                 // Advance to the corresponding recent/X marker, if any
463                 for recent != nil {
464                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
465                                 recent = recentL.Next()
466                                 continue
467                         } else if cmp == 0 {
468                                 stamp = recent
469                                 recent = recentL.Next()
470                                 break
471                         } else {
472                                 // recent/X marker is missing: we'll
473                                 // use the timestamp on the data
474                                 // object.
475                                 break
476                         }
477                 }
478                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
479                 if err != nil {
480                         return err
481                 }
482                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
483         }
484         return nil
485 }
486
487 // Trash a Keep block.
488 func (v *S3Volume) Trash(loc string) error {
489         if v.ReadOnly {
490                 return MethodDisabledError
491         }
492         if t, err := v.Mtime(loc); err != nil {
493                 return err
494         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
495                 return nil
496         }
497         if theConfig.TrashLifetime == 0 {
498                 if !s3UnsafeDelete {
499                         return ErrS3TrashDisabled
500                 }
501                 return v.bucket.Del(loc)
502         }
503         err := v.checkRaceWindow(loc)
504         if err != nil {
505                 return err
506         }
507         err = v.safeCopy("trash/"+loc, loc)
508         if err != nil {
509                 return err
510         }
511         return v.translateError(v.bucket.Del(loc))
512 }
513
514 // checkRaceWindow returns a non-nil error if trash/loc is, or might
515 // be, in the race window (i.e., it's not safe to trash loc).
516 func (v *S3Volume) checkRaceWindow(loc string) error {
517         resp, err := v.bucket.Head("trash/"+loc, nil)
518         err = v.translateError(err)
519         if os.IsNotExist(err) {
520                 // OK, trash/X doesn't exist so we're not in the race
521                 // window
522                 return nil
523         } else if err != nil {
524                 // Error looking up trash/X. We don't know whether
525                 // we're in the race window
526                 return err
527         }
528         t, err := v.lastModified(resp)
529         if err != nil {
530                 // Can't parse timestamp
531                 return err
532         }
533         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
534         if safeWindow <= 0 {
535                 // We can't count on "touch trash/X" to prolong
536                 // trash/X's lifetime. The new timestamp might not
537                 // become visible until now+raceWindow, and EmptyTrash
538                 // is allowed to delete trash/X before then.
539                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
540         }
541         // trash/X exists, but it won't be eligible for deletion until
542         // after now+raceWindow, so it's safe to overwrite it.
543         return nil
544 }
545
546 // safeCopy calls PutCopy, and checks the response to make sure the
547 // copy succeeded and updated the timestamp on the destination object
548 // (PutCopy returns 200 OK if the request was received, even if the
549 // copy failed).
550 func (v *S3Volume) safeCopy(dst, src string) error {
551         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
552                 ContentType:       "application/octet-stream",
553                 MetadataDirective: "REPLACE",
554         }, v.bucket.Name+"/"+src)
555         err = v.translateError(err)
556         if err != nil {
557                 return err
558         }
559         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
560                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
561         } else if time.Now().Sub(t) > maxClockSkew {
562                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
563         }
564         return nil
565 }
566
567 // Get the LastModified header from resp, and parse it as RFC1123 or
568 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
569 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
570         s := resp.Header.Get("Last-Modified")
571         t, err = time.Parse(time.RFC1123, s)
572         if err != nil && s != "" {
573                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
574                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
575                 // as required by HTTP spec. If it's not a valid HTTP
576                 // header value, it's probably AWS (or s3test) giving
577                 // us a nearly-RFC1123 timestamp.
578                 t, err = time.Parse(nearlyRFC1123, s)
579         }
580         return
581 }
582
583 // Untrash moves block from trash back into store
584 func (v *S3Volume) Untrash(loc string) error {
585         err := v.safeCopy(loc, "trash/"+loc)
586         if err != nil {
587                 return err
588         }
589         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
590         return v.translateError(err)
591 }
592
593 // Status returns a *VolumeStatus representing the current in-use
594 // storage capacity and a fake available capacity that doesn't make
595 // the volume seem full or nearly-full.
596 func (v *S3Volume) Status() *VolumeStatus {
597         return &VolumeStatus{
598                 DeviceNum: 1,
599                 BytesFree: BlockSize * 1000,
600                 BytesUsed: 1,
601         }
602 }
603
604 // String implements fmt.Stringer.
605 func (v *S3Volume) String() string {
606         return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
607 }
608
609 // Writable returns false if all future Put, Mtime, and Delete calls
610 // are expected to fail.
611 func (v *S3Volume) Writable() bool {
612         return !v.ReadOnly
613 }
614
615 // Replication returns the storage redundancy of the underlying
616 // device. Configured via command line flag.
617 func (v *S3Volume) Replication() int {
618         return v.S3Replication
619 }
620
621 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
622
623 func (v *S3Volume) isKeepBlock(s string) bool {
624         return s3KeepBlockRegexp.MatchString(s)
625 }
626
627 // fixRace(X) is called when "recent/X" exists but "X" doesn't
628 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
629 // there was a race between Put and Trash, fixRace recovers from the
630 // race by Untrashing the block.
631 func (v *S3Volume) fixRace(loc string) bool {
632         trash, err := v.bucket.Head("trash/"+loc, nil)
633         if err != nil {
634                 if !os.IsNotExist(v.translateError(err)) {
635                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
636                 }
637                 return false
638         }
639         trashTime, err := v.lastModified(trash)
640         if err != nil {
641                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
642                 return false
643         }
644
645         recent, err := v.bucket.Head("recent/"+loc, nil)
646         if err != nil {
647                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
648                 return false
649         }
650         recentTime, err := v.lastModified(recent)
651         if err != nil {
652                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
653                 return false
654         }
655
656         ageWhenTrashed := trashTime.Sub(recentTime)
657         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
658                 // No evidence of a race: block hasn't been written
659                 // since it became eligible for Trash. No fix needed.
660                 return false
661         }
662
663         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
664         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
665         err = v.safeCopy(loc, "trash/"+loc)
666         if err != nil {
667                 log.Printf("error: fixRace: %s", err)
668                 return false
669         }
670         return true
671 }
672
673 func (v *S3Volume) translateError(err error) error {
674         switch err := err.(type) {
675         case *s3.Error:
676                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
677                         strings.Contains(err.Error(), "Not Found") {
678                         return os.ErrNotExist
679                 }
680                 // Other 404 errors like NoSuchVersion and
681                 // NoSuchBucket are different problems which should
682                 // get called out downstream, so we don't convert them
683                 // to os.ErrNotExist.
684         }
685         return err
686 }
687
688 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
689 // and deletes them from the volume.
690 func (v *S3Volume) EmptyTrash() {
691         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
692
693         // Use a merge sort to find matching sets of trash/X and recent/X.
694         trashL := s3Lister{
695                 Bucket:   v.bucket,
696                 Prefix:   "trash/",
697                 PageSize: v.IndexPageSize,
698         }
699         // Define "ready to delete" as "...when EmptyTrash started".
700         startT := time.Now()
701         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
702                 loc := trash.Key[6:]
703                 if !v.isKeepBlock(loc) {
704                         continue
705                 }
706                 bytesInTrash += trash.Size
707                 blocksInTrash++
708
709                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
710                 if err != nil {
711                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
712                         continue
713                 }
714                 recent, err := v.bucket.Head("recent/"+loc, nil)
715                 if err != nil && os.IsNotExist(v.translateError(err)) {
716                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
717                         err = v.Untrash(loc)
718                         if err != nil {
719                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
720                         }
721                         continue
722                 } else if err != nil {
723                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
724                         continue
725                 }
726                 recentT, err := v.lastModified(recent)
727                 if err != nil {
728                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
729                         continue
730                 }
731                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
732                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
733                                 // recent/loc is too old to protect
734                                 // loc from being Trashed again during
735                                 // the raceWindow that starts if we
736                                 // delete trash/X now.
737                                 //
738                                 // Note this means (TrashCheckInterval
739                                 // < BlobSignatureTTL - raceWindow) is
740                                 // necessary to avoid starvation.
741                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
742                                 v.fixRace(loc)
743                                 v.Touch(loc)
744                                 continue
745                         } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
746                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
747                                 v.fixRace(loc)
748                                 continue
749                         } else if err != nil {
750                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
751                                 continue
752                         }
753                 }
754                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
755                         continue
756                 }
757                 err = v.bucket.Del(trash.Key)
758                 if err != nil {
759                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
760                         continue
761                 }
762                 bytesDeleted += trash.Size
763                 blocksDeleted++
764
765                 _, err = v.bucket.Head(loc, nil)
766                 if os.IsNotExist(err) {
767                         err = v.bucket.Del("recent/" + loc)
768                         if err != nil {
769                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
770                         }
771                 } else if err != nil {
772                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
773                 }
774         }
775         if err := trashL.Error(); err != nil {
776                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
777         }
778         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
779 }
780
781 type s3Lister struct {
782         Bucket     *s3.Bucket
783         Prefix     string
784         PageSize   int
785         nextMarker string
786         buf        []s3.Key
787         err        error
788 }
789
790 // First fetches the first page and returns the first item. It returns
791 // nil if the response is the empty set or an error occurs.
792 func (lister *s3Lister) First() *s3.Key {
793         lister.getPage()
794         return lister.pop()
795 }
796
797 // Next returns the next item, fetching the next page if necessary. It
798 // returns nil if the last available item has already been fetched, or
799 // an error occurs.
800 func (lister *s3Lister) Next() *s3.Key {
801         if len(lister.buf) == 0 && lister.nextMarker != "" {
802                 lister.getPage()
803         }
804         return lister.pop()
805 }
806
807 // Return the most recent error encountered by First or Next.
808 func (lister *s3Lister) Error() error {
809         return lister.err
810 }
811
812 func (lister *s3Lister) getPage() {
813         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
814         lister.nextMarker = ""
815         if err != nil {
816                 lister.err = err
817                 return
818         }
819         if resp.IsTruncated {
820                 lister.nextMarker = resp.NextMarker
821         }
822         lister.buf = make([]s3.Key, 0, len(resp.Contents))
823         for _, key := range resp.Contents {
824                 if !strings.HasPrefix(key.Key, lister.Prefix) {
825                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
826                         continue
827                 }
828                 lister.buf = append(lister.buf, key)
829         }
830 }
831
832 func (lister *s3Lister) pop() (k *s3.Key) {
833         if len(lister.buf) > 0 {
834                 k = &lister.buf[0]
835                 lister.buf = lister.buf[1:]
836         }
837         return
838 }