10467: Move http request context setup out to func.
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "context"
5         "encoding/base64"
6         "encoding/hex"
7         "flag"
8         "fmt"
9         "io"
10         "log"
11         "net/http"
12         "os"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/AdRoll/goamz/aws"
20         "github.com/AdRoll/goamz/s3"
21 )
22
23 var (
24         // ErrS3TrashDisabled is returned by Trash if that operation
25         // is impossible with the current config.
26         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
27
28         s3AccessKeyFile string
29         s3SecretKeyFile string
30         s3RegionName    string
31         s3Endpoint      string
32         s3Replication   int
33         s3UnsafeDelete  bool
34         s3RaceWindow    time.Duration
35
36         s3ACL = s3.Private
37 )
38
39 const (
40         maxClockSkew  = 600 * time.Second
41         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
42 )
43
44 type s3VolumeAdder struct {
45         *Config
46 }
47
48 // String implements flag.Value
49 func (s *s3VolumeAdder) String() string {
50         return "-"
51 }
52
53 func (s *s3VolumeAdder) Set(bucketName string) error {
54         if bucketName == "" {
55                 return fmt.Errorf("no container name given")
56         }
57         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
58                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
59         }
60         if deprecated.flagSerializeIO {
61                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
62         }
63         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
64                 Bucket:        bucketName,
65                 AccessKeyFile: s3AccessKeyFile,
66                 SecretKeyFile: s3SecretKeyFile,
67                 Endpoint:      s3Endpoint,
68                 Region:        s3RegionName,
69                 RaceWindow:    arvados.Duration(s3RaceWindow),
70                 S3Replication: s3Replication,
71                 UnsafeDelete:  s3UnsafeDelete,
72                 ReadOnly:      deprecated.flagReadonly,
73                 IndexPageSize: 1000,
74         })
75         return nil
76 }
77
78 func s3regions() (okList []string) {
79         for r := range aws.Regions {
80                 okList = append(okList, r)
81         }
82         return
83 }
84
85 func init() {
86         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
87
88         flag.Var(&s3VolumeAdder{theConfig},
89                 "s3-bucket-volume",
90                 "Use the given bucket as a storage volume. Can be given multiple times.")
91         flag.StringVar(
92                 &s3RegionName,
93                 "s3-region",
94                 "",
95                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
96         flag.StringVar(
97                 &s3Endpoint,
98                 "s3-endpoint",
99                 "",
100                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
101         flag.StringVar(
102                 &s3AccessKeyFile,
103                 "s3-access-key-file",
104                 "",
105                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
106         flag.StringVar(
107                 &s3SecretKeyFile,
108                 "s3-secret-key-file",
109                 "",
110                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
111         flag.DurationVar(
112                 &s3RaceWindow,
113                 "s3-race-window",
114                 24*time.Hour,
115                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
116         flag.IntVar(
117                 &s3Replication,
118                 "s3-replication",
119                 2,
120                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
121         flag.BoolVar(
122                 &s3UnsafeDelete,
123                 "s3-unsafe-delete",
124                 false,
125                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
126 }
127
128 // S3Volume implements Volume using an S3 bucket.
129 type S3Volume struct {
130         AccessKeyFile      string
131         SecretKeyFile      string
132         Endpoint           string
133         Region             string
134         Bucket             string
135         LocationConstraint bool
136         IndexPageSize      int
137         S3Replication      int
138         ConnectTimeout     arvados.Duration
139         ReadTimeout        arvados.Duration
140         RaceWindow         arvados.Duration
141         ReadOnly           bool
142         UnsafeDelete       bool
143
144         bucket *s3.Bucket
145
146         startOnce sync.Once
147 }
148
149 // Examples implements VolumeWithExamples.
150 func (*S3Volume) Examples() []Volume {
151         return []Volume{
152                 &S3Volume{
153                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
154                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
155                         Endpoint:       "",
156                         Region:         "us-east-1",
157                         Bucket:         "example-bucket-name",
158                         IndexPageSize:  1000,
159                         S3Replication:  2,
160                         RaceWindow:     arvados.Duration(24 * time.Hour),
161                         ConnectTimeout: arvados.Duration(time.Minute),
162                         ReadTimeout:    arvados.Duration(5 * time.Minute),
163                 },
164                 &S3Volume{
165                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
166                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
167                         Endpoint:       "https://storage.googleapis.com",
168                         Region:         "",
169                         Bucket:         "example-bucket-name",
170                         IndexPageSize:  1000,
171                         S3Replication:  2,
172                         RaceWindow:     arvados.Duration(24 * time.Hour),
173                         ConnectTimeout: arvados.Duration(time.Minute),
174                         ReadTimeout:    arvados.Duration(5 * time.Minute),
175                 },
176         }
177 }
178
179 // Type implements Volume.
180 func (*S3Volume) Type() string {
181         return "S3"
182 }
183
184 // Start populates private fields and verifies the configuration is
185 // valid.
186 func (v *S3Volume) Start() error {
187         region, ok := aws.Regions[v.Region]
188         if v.Endpoint == "" {
189                 if !ok {
190                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
191                 }
192         } else if ok {
193                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
194                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
195         } else {
196                 region = aws.Region{
197                         Name:                 v.Region,
198                         S3Endpoint:           v.Endpoint,
199                         S3LocationConstraint: v.LocationConstraint,
200                 }
201         }
202
203         var err error
204         var auth aws.Auth
205         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
206         if err != nil {
207                 return err
208         }
209         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
210         if err != nil {
211                 return err
212         }
213
214         // Zero timeouts mean "wait forever", which is a bad
215         // default. Default to long timeouts instead.
216         if v.ConnectTimeout == 0 {
217                 v.ConnectTimeout = arvados.Duration(time.Minute)
218         }
219         if v.ReadTimeout == 0 {
220                 v.ReadTimeout = arvados.Duration(10 * time.Minute)
221         }
222
223         client := s3.New(auth, region)
224         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
225         client.ReadTimeout = time.Duration(v.ReadTimeout)
226         v.bucket = &s3.Bucket{
227                 S3:   client,
228                 Name: v.Bucket,
229         }
230         return nil
231 }
232
233 // getReader wraps (Bucket)GetReader.
234 //
235 // In situations where (Bucket)GetReader would fail because the block
236 // disappeared in a Trash race, getReader calls fixRace to recover the
237 // data, and tries again.
238 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
239         rdr, err = v.bucket.GetReader(loc)
240         err = v.translateError(err)
241         if err == nil || !os.IsNotExist(err) {
242                 return
243         }
244         _, err = v.bucket.Head("recent/"+loc, nil)
245         err = v.translateError(err)
246         if err != nil {
247                 // If we can't read recent/X, there's no point in
248                 // trying fixRace. Give up.
249                 return
250         }
251         if !v.fixRace(loc) {
252                 err = os.ErrNotExist
253                 return
254         }
255         rdr, err = v.bucket.GetReader(loc)
256         if err != nil {
257                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
258                 err = v.translateError(err)
259         }
260         return
261 }
262
263 // Get a block: copy the block data into buf, and return the number of
264 // bytes copied.
265 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
266         ready := make(chan bool)
267         var rdr io.ReadCloser
268         var err error
269         go func() {
270                 rdr, err = v.getReader(loc)
271                 close(ready)
272         }()
273         select {
274         case <-ctx.Done():
275                 theConfig.debugLogf("s3: abandoning getReader() because %s", ctx.Err())
276                 return 0, ctx.Err()
277         case <-ready:
278                 if err != nil {
279                         return 0, err
280                 }
281         }
282
283         var n int
284         ready = make(chan bool)
285         go func() {
286                 defer close(ready)
287
288                 defer rdr.Close()
289                 n, err = io.ReadFull(rdr, buf)
290
291                 switch err {
292                 case nil, io.EOF, io.ErrUnexpectedEOF:
293                         err = nil
294                 default:
295                         err = v.translateError(err)
296                 }
297         }()
298         select {
299         case <-ctx.Done():
300                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
301                 rdr.Close()
302                 // Must wait for ReadFull to return, to ensure it
303                 // doesn't write to buf after we return.
304                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
305                 <-ready
306                 return 0, ctx.Err()
307         case <-ready:
308                 return n, err
309         }
310 }
311
312 // Compare the given data with the stored data.
313 func (v *S3Volume) Compare(loc string, expect []byte) error {
314         rdr, err := v.getReader(loc)
315         if err != nil {
316                 return err
317         }
318         defer rdr.Close()
319         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
320 }
321
322 // Put writes a block.
323 func (v *S3Volume) Put(loc string, block []byte) error {
324         if v.ReadOnly {
325                 return MethodDisabledError
326         }
327         var opts s3.Options
328         if len(block) > 0 {
329                 md5, err := hex.DecodeString(loc)
330                 if err != nil {
331                         return err
332                 }
333                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
334         }
335         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
336         if err != nil {
337                 return v.translateError(err)
338         }
339         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
340         return v.translateError(err)
341 }
342
343 // Touch sets the timestamp for the given locator to the current time.
344 func (v *S3Volume) Touch(loc string) error {
345         if v.ReadOnly {
346                 return MethodDisabledError
347         }
348         _, err := v.bucket.Head(loc, nil)
349         err = v.translateError(err)
350         if os.IsNotExist(err) && v.fixRace(loc) {
351                 // The data object got trashed in a race, but fixRace
352                 // rescued it.
353         } else if err != nil {
354                 return err
355         }
356         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
357         return v.translateError(err)
358 }
359
360 // Mtime returns the stored timestamp for the given locator.
361 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
362         _, err := v.bucket.Head(loc, nil)
363         if err != nil {
364                 return zeroTime, v.translateError(err)
365         }
366         resp, err := v.bucket.Head("recent/"+loc, nil)
367         err = v.translateError(err)
368         if os.IsNotExist(err) {
369                 // The data object X exists, but recent/X is missing.
370                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
371                 if err != nil {
372                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
373                         return zeroTime, v.translateError(err)
374                 }
375                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
376                 resp, err = v.bucket.Head("recent/"+loc, nil)
377                 if err != nil {
378                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
379                         return zeroTime, v.translateError(err)
380                 }
381         } else if err != nil {
382                 // HEAD recent/X failed for some other reason.
383                 return zeroTime, err
384         }
385         return v.lastModified(resp)
386 }
387
388 // IndexTo writes a complete list of locators with the given prefix
389 // for which Get() can retrieve data.
390 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
391         // Use a merge sort to find matching sets of X and recent/X.
392         dataL := s3Lister{
393                 Bucket:   v.bucket,
394                 Prefix:   prefix,
395                 PageSize: v.IndexPageSize,
396         }
397         recentL := s3Lister{
398                 Bucket:   v.bucket,
399                 Prefix:   "recent/" + prefix,
400                 PageSize: v.IndexPageSize,
401         }
402         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
403                 if data.Key >= "g" {
404                         // Conveniently, "recent/*" and "trash/*" are
405                         // lexically greater than all hex-encoded data
406                         // hashes, so stopping here avoids iterating
407                         // over all of them needlessly with dataL.
408                         break
409                 }
410                 if !v.isKeepBlock(data.Key) {
411                         continue
412                 }
413
414                 // stamp is the list entry we should use to report the
415                 // last-modified time for this data block: it will be
416                 // the recent/X entry if one exists, otherwise the
417                 // entry for the data block itself.
418                 stamp := data
419
420                 // Advance to the corresponding recent/X marker, if any
421                 for recent != nil {
422                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
423                                 recent = recentL.Next()
424                                 continue
425                         } else if cmp == 0 {
426                                 stamp = recent
427                                 recent = recentL.Next()
428                                 break
429                         } else {
430                                 // recent/X marker is missing: we'll
431                                 // use the timestamp on the data
432                                 // object.
433                                 break
434                         }
435                 }
436                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
437                 if err != nil {
438                         return err
439                 }
440                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
441         }
442         return nil
443 }
444
445 // Trash a Keep block.
446 func (v *S3Volume) Trash(loc string) error {
447         if v.ReadOnly {
448                 return MethodDisabledError
449         }
450         if t, err := v.Mtime(loc); err != nil {
451                 return err
452         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
453                 return nil
454         }
455         if theConfig.TrashLifetime == 0 {
456                 if !s3UnsafeDelete {
457                         return ErrS3TrashDisabled
458                 }
459                 return v.bucket.Del(loc)
460         }
461         err := v.checkRaceWindow(loc)
462         if err != nil {
463                 return err
464         }
465         err = v.safeCopy("trash/"+loc, loc)
466         if err != nil {
467                 return err
468         }
469         return v.translateError(v.bucket.Del(loc))
470 }
471
472 // checkRaceWindow returns a non-nil error if trash/loc is, or might
473 // be, in the race window (i.e., it's not safe to trash loc).
474 func (v *S3Volume) checkRaceWindow(loc string) error {
475         resp, err := v.bucket.Head("trash/"+loc, nil)
476         err = v.translateError(err)
477         if os.IsNotExist(err) {
478                 // OK, trash/X doesn't exist so we're not in the race
479                 // window
480                 return nil
481         } else if err != nil {
482                 // Error looking up trash/X. We don't know whether
483                 // we're in the race window
484                 return err
485         }
486         t, err := v.lastModified(resp)
487         if err != nil {
488                 // Can't parse timestamp
489                 return err
490         }
491         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
492         if safeWindow <= 0 {
493                 // We can't count on "touch trash/X" to prolong
494                 // trash/X's lifetime. The new timestamp might not
495                 // become visible until now+raceWindow, and EmptyTrash
496                 // is allowed to delete trash/X before then.
497                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
498         }
499         // trash/X exists, but it won't be eligible for deletion until
500         // after now+raceWindow, so it's safe to overwrite it.
501         return nil
502 }
503
504 // safeCopy calls PutCopy, and checks the response to make sure the
505 // copy succeeded and updated the timestamp on the destination object
506 // (PutCopy returns 200 OK if the request was received, even if the
507 // copy failed).
508 func (v *S3Volume) safeCopy(dst, src string) error {
509         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
510                 ContentType:       "application/octet-stream",
511                 MetadataDirective: "REPLACE",
512         }, v.bucket.Name+"/"+src)
513         err = v.translateError(err)
514         if err != nil {
515                 return err
516         }
517         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
518                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
519         } else if time.Now().Sub(t) > maxClockSkew {
520                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
521         }
522         return nil
523 }
524
525 // Get the LastModified header from resp, and parse it as RFC1123 or
526 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
527 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
528         s := resp.Header.Get("Last-Modified")
529         t, err = time.Parse(time.RFC1123, s)
530         if err != nil && s != "" {
531                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
532                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
533                 // as required by HTTP spec. If it's not a valid HTTP
534                 // header value, it's probably AWS (or s3test) giving
535                 // us a nearly-RFC1123 timestamp.
536                 t, err = time.Parse(nearlyRFC1123, s)
537         }
538         return
539 }
540
541 // Untrash moves block from trash back into store
542 func (v *S3Volume) Untrash(loc string) error {
543         err := v.safeCopy(loc, "trash/"+loc)
544         if err != nil {
545                 return err
546         }
547         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
548         return v.translateError(err)
549 }
550
551 // Status returns a *VolumeStatus representing the current in-use
552 // storage capacity and a fake available capacity that doesn't make
553 // the volume seem full or nearly-full.
554 func (v *S3Volume) Status() *VolumeStatus {
555         return &VolumeStatus{
556                 DeviceNum: 1,
557                 BytesFree: BlockSize * 1000,
558                 BytesUsed: 1,
559         }
560 }
561
562 // String implements fmt.Stringer.
563 func (v *S3Volume) String() string {
564         return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
565 }
566
567 // Writable returns false if all future Put, Mtime, and Delete calls
568 // are expected to fail.
569 func (v *S3Volume) Writable() bool {
570         return !v.ReadOnly
571 }
572
573 // Replication returns the storage redundancy of the underlying
574 // device. Configured via command line flag.
575 func (v *S3Volume) Replication() int {
576         return v.S3Replication
577 }
578
579 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
580
581 func (v *S3Volume) isKeepBlock(s string) bool {
582         return s3KeepBlockRegexp.MatchString(s)
583 }
584
585 // fixRace(X) is called when "recent/X" exists but "X" doesn't
586 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
587 // there was a race between Put and Trash, fixRace recovers from the
588 // race by Untrashing the block.
589 func (v *S3Volume) fixRace(loc string) bool {
590         trash, err := v.bucket.Head("trash/"+loc, nil)
591         if err != nil {
592                 if !os.IsNotExist(v.translateError(err)) {
593                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
594                 }
595                 return false
596         }
597         trashTime, err := v.lastModified(trash)
598         if err != nil {
599                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
600                 return false
601         }
602
603         recent, err := v.bucket.Head("recent/"+loc, nil)
604         if err != nil {
605                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
606                 return false
607         }
608         recentTime, err := v.lastModified(recent)
609         if err != nil {
610                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
611                 return false
612         }
613
614         ageWhenTrashed := trashTime.Sub(recentTime)
615         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
616                 // No evidence of a race: block hasn't been written
617                 // since it became eligible for Trash. No fix needed.
618                 return false
619         }
620
621         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
622         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
623         err = v.safeCopy(loc, "trash/"+loc)
624         if err != nil {
625                 log.Printf("error: fixRace: %s", err)
626                 return false
627         }
628         return true
629 }
630
631 func (v *S3Volume) translateError(err error) error {
632         switch err := err.(type) {
633         case *s3.Error:
634                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
635                         strings.Contains(err.Error(), "Not Found") {
636                         return os.ErrNotExist
637                 }
638                 // Other 404 errors like NoSuchVersion and
639                 // NoSuchBucket are different problems which should
640                 // get called out downstream, so we don't convert them
641                 // to os.ErrNotExist.
642         }
643         return err
644 }
645
646 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
647 // and deletes them from the volume.
648 func (v *S3Volume) EmptyTrash() {
649         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
650
651         // Use a merge sort to find matching sets of trash/X and recent/X.
652         trashL := s3Lister{
653                 Bucket:   v.bucket,
654                 Prefix:   "trash/",
655                 PageSize: v.IndexPageSize,
656         }
657         // Define "ready to delete" as "...when EmptyTrash started".
658         startT := time.Now()
659         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
660                 loc := trash.Key[6:]
661                 if !v.isKeepBlock(loc) {
662                         continue
663                 }
664                 bytesInTrash += trash.Size
665                 blocksInTrash++
666
667                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
668                 if err != nil {
669                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
670                         continue
671                 }
672                 recent, err := v.bucket.Head("recent/"+loc, nil)
673                 if err != nil && os.IsNotExist(v.translateError(err)) {
674                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
675                         err = v.Untrash(loc)
676                         if err != nil {
677                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
678                         }
679                         continue
680                 } else if err != nil {
681                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
682                         continue
683                 }
684                 recentT, err := v.lastModified(recent)
685                 if err != nil {
686                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
687                         continue
688                 }
689                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
690                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
691                                 // recent/loc is too old to protect
692                                 // loc from being Trashed again during
693                                 // the raceWindow that starts if we
694                                 // delete trash/X now.
695                                 //
696                                 // Note this means (TrashCheckInterval
697                                 // < BlobSignatureTTL - raceWindow) is
698                                 // necessary to avoid starvation.
699                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
700                                 v.fixRace(loc)
701                                 v.Touch(loc)
702                                 continue
703                         } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
704                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
705                                 v.fixRace(loc)
706                                 continue
707                         } else if err != nil {
708                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
709                                 continue
710                         }
711                 }
712                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
713                         continue
714                 }
715                 err = v.bucket.Del(trash.Key)
716                 if err != nil {
717                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
718                         continue
719                 }
720                 bytesDeleted += trash.Size
721                 blocksDeleted++
722
723                 _, err = v.bucket.Head(loc, nil)
724                 if os.IsNotExist(err) {
725                         err = v.bucket.Del("recent/" + loc)
726                         if err != nil {
727                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
728                         }
729                 } else if err != nil {
730                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
731                 }
732         }
733         if err := trashL.Error(); err != nil {
734                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
735         }
736         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
737 }
738
739 type s3Lister struct {
740         Bucket     *s3.Bucket
741         Prefix     string
742         PageSize   int
743         nextMarker string
744         buf        []s3.Key
745         err        error
746 }
747
748 // First fetches the first page and returns the first item. It returns
749 // nil if the response is the empty set or an error occurs.
750 func (lister *s3Lister) First() *s3.Key {
751         lister.getPage()
752         return lister.pop()
753 }
754
755 // Next returns the next item, fetching the next page if necessary. It
756 // returns nil if the last available item has already been fetched, or
757 // an error occurs.
758 func (lister *s3Lister) Next() *s3.Key {
759         if len(lister.buf) == 0 && lister.nextMarker != "" {
760                 lister.getPage()
761         }
762         return lister.pop()
763 }
764
765 // Return the most recent error encountered by First or Next.
766 func (lister *s3Lister) Error() error {
767         return lister.err
768 }
769
770 func (lister *s3Lister) getPage() {
771         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
772         lister.nextMarker = ""
773         if err != nil {
774                 lister.err = err
775                 return
776         }
777         if resp.IsTruncated {
778                 lister.nextMarker = resp.NextMarker
779         }
780         lister.buf = make([]s3.Key, 0, len(resp.Contents))
781         for _, key := range resp.Contents {
782                 if !strings.HasPrefix(key.Key, lister.Prefix) {
783                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
784                         continue
785                 }
786                 lister.buf = append(lister.buf, key)
787         }
788 }
789
790 func (lister *s3Lister) pop() (k *s3.Key) {
791         if len(lister.buf) > 0 {
792                 k = &lister.buf[0]
793                 lister.buf = lister.buf[1:]
794         }
795         return
796 }