Merge branch 'crunch-job_finds_newer_docker_hashes' of https://github.com/tmooney...
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "encoding/base64"
5         "encoding/hex"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "os"
12         "regexp"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/sdk/go/arvados"
18         "github.com/AdRoll/goamz/aws"
19         "github.com/AdRoll/goamz/s3"
20 )
21
22 var (
23         // ErrS3TrashDisabled is returned by Trash if that operation
24         // is impossible with the current config.
25         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
26
27         s3AccessKeyFile string
28         s3SecretKeyFile string
29         s3RegionName    string
30         s3Endpoint      string
31         s3Replication   int
32         s3UnsafeDelete  bool
33         s3RaceWindow    time.Duration
34
35         s3ACL = s3.Private
36 )
37
38 const (
39         maxClockSkew  = 600 * time.Second
40         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
41 )
42
43 type s3VolumeAdder struct {
44         *Config
45 }
46
47 // String implements flag.Value
48 func (s *s3VolumeAdder) String() string {
49         return "-"
50 }
51
52 func (s *s3VolumeAdder) Set(bucketName string) error {
53         if bucketName == "" {
54                 return fmt.Errorf("no container name given")
55         }
56         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
57                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
58         }
59         if deprecated.flagSerializeIO {
60                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
61         }
62         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
63                 Bucket:        bucketName,
64                 AccessKeyFile: s3AccessKeyFile,
65                 SecretKeyFile: s3SecretKeyFile,
66                 Endpoint:      s3Endpoint,
67                 Region:        s3RegionName,
68                 RaceWindow:    arvados.Duration(s3RaceWindow),
69                 S3Replication: s3Replication,
70                 UnsafeDelete:  s3UnsafeDelete,
71                 ReadOnly:      deprecated.flagReadonly,
72                 IndexPageSize: 1000,
73         })
74         return nil
75 }
76
77 func s3regions() (okList []string) {
78         for r := range aws.Regions {
79                 okList = append(okList, r)
80         }
81         return
82 }
83
84 func init() {
85         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
86
87         flag.Var(&s3VolumeAdder{theConfig},
88                 "s3-bucket-volume",
89                 "Use the given bucket as a storage volume. Can be given multiple times.")
90         flag.StringVar(
91                 &s3RegionName,
92                 "s3-region",
93                 "",
94                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
95         flag.StringVar(
96                 &s3Endpoint,
97                 "s3-endpoint",
98                 "",
99                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
100         flag.StringVar(
101                 &s3AccessKeyFile,
102                 "s3-access-key-file",
103                 "",
104                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
105         flag.StringVar(
106                 &s3SecretKeyFile,
107                 "s3-secret-key-file",
108                 "",
109                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
110         flag.DurationVar(
111                 &s3RaceWindow,
112                 "s3-race-window",
113                 24*time.Hour,
114                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
115         flag.IntVar(
116                 &s3Replication,
117                 "s3-replication",
118                 2,
119                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
120         flag.BoolVar(
121                 &s3UnsafeDelete,
122                 "s3-unsafe-delete",
123                 false,
124                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
125 }
126
127 // S3Volume implements Volume using an S3 bucket.
128 type S3Volume struct {
129         AccessKeyFile      string
130         SecretKeyFile      string
131         Endpoint           string
132         Region             string
133         Bucket             string
134         LocationConstraint bool
135         IndexPageSize      int
136         S3Replication      int
137         RaceWindow         arvados.Duration
138         ReadOnly           bool
139         UnsafeDelete       bool
140
141         bucket *s3.Bucket
142
143         startOnce sync.Once
144 }
145
146 // Examples implements VolumeWithExamples.
147 func (*S3Volume) Examples() []Volume {
148         return []Volume{
149                 &S3Volume{
150                         AccessKeyFile: "/etc/aws_s3_access_key.txt",
151                         SecretKeyFile: "/etc/aws_s3_secret_key.txt",
152                         Endpoint:      "",
153                         Region:        "us-east-1",
154                         Bucket:        "example-bucket-name",
155                         IndexPageSize: 1000,
156                         S3Replication: 2,
157                         RaceWindow:    arvados.Duration(24 * time.Hour),
158                 },
159                 &S3Volume{
160                         AccessKeyFile: "/etc/gce_s3_access_key.txt",
161                         SecretKeyFile: "/etc/gce_s3_secret_key.txt",
162                         Endpoint:      "https://storage.googleapis.com",
163                         Region:        "",
164                         Bucket:        "example-bucket-name",
165                         IndexPageSize: 1000,
166                         S3Replication: 2,
167                         RaceWindow:    arvados.Duration(24 * time.Hour),
168                 },
169         }
170 }
171
172 // Type implements Volume.
173 func (*S3Volume) Type() string {
174         return "S3"
175 }
176
177 // Start populates private fields and verifies the configuration is
178 // valid.
179 func (v *S3Volume) Start() error {
180         region, ok := aws.Regions[v.Region]
181         if v.Endpoint == "" {
182                 if !ok {
183                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
184                 }
185         } else if ok {
186                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
187                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
188         } else {
189                 region = aws.Region{
190                         Name:                 v.Region,
191                         S3Endpoint:           v.Endpoint,
192                         S3LocationConstraint: v.LocationConstraint,
193                 }
194         }
195
196         var err error
197         var auth aws.Auth
198         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
199         if err != nil {
200                 return err
201         }
202         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
203         if err != nil {
204                 return err
205         }
206         v.bucket = &s3.Bucket{
207                 S3:   s3.New(auth, region),
208                 Name: v.Bucket,
209         }
210         return nil
211 }
212
213 // getReader wraps (Bucket)GetReader.
214 //
215 // In situations where (Bucket)GetReader would fail because the block
216 // disappeared in a Trash race, getReader calls fixRace to recover the
217 // data, and tries again.
218 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
219         rdr, err = v.bucket.GetReader(loc)
220         err = v.translateError(err)
221         if err == nil || !os.IsNotExist(err) {
222                 return
223         }
224         _, err = v.bucket.Head("recent/"+loc, nil)
225         err = v.translateError(err)
226         if err != nil {
227                 // If we can't read recent/X, there's no point in
228                 // trying fixRace. Give up.
229                 return
230         }
231         if !v.fixRace(loc) {
232                 err = os.ErrNotExist
233                 return
234         }
235         rdr, err = v.bucket.GetReader(loc)
236         if err != nil {
237                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
238                 err = v.translateError(err)
239         }
240         return
241 }
242
243 // Get a block: copy the block data into buf, and return the number of
244 // bytes copied.
245 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
246         rdr, err := v.getReader(loc)
247         if err != nil {
248                 return 0, err
249         }
250         defer rdr.Close()
251         n, err := io.ReadFull(rdr, buf)
252         switch err {
253         case nil, io.EOF, io.ErrUnexpectedEOF:
254                 return n, nil
255         default:
256                 return 0, v.translateError(err)
257         }
258 }
259
260 // Compare the given data with the stored data.
261 func (v *S3Volume) Compare(loc string, expect []byte) error {
262         rdr, err := v.getReader(loc)
263         if err != nil {
264                 return err
265         }
266         defer rdr.Close()
267         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
268 }
269
270 // Put writes a block.
271 func (v *S3Volume) Put(loc string, block []byte) error {
272         if v.ReadOnly {
273                 return MethodDisabledError
274         }
275         var opts s3.Options
276         if len(block) > 0 {
277                 md5, err := hex.DecodeString(loc)
278                 if err != nil {
279                         return err
280                 }
281                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
282         }
283         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
284         if err != nil {
285                 return v.translateError(err)
286         }
287         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
288         return v.translateError(err)
289 }
290
291 // Touch sets the timestamp for the given locator to the current time.
292 func (v *S3Volume) Touch(loc string) error {
293         if v.ReadOnly {
294                 return MethodDisabledError
295         }
296         _, err := v.bucket.Head(loc, nil)
297         err = v.translateError(err)
298         if os.IsNotExist(err) && v.fixRace(loc) {
299                 // The data object got trashed in a race, but fixRace
300                 // rescued it.
301         } else if err != nil {
302                 return err
303         }
304         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
305         return v.translateError(err)
306 }
307
308 // Mtime returns the stored timestamp for the given locator.
309 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
310         _, err := v.bucket.Head(loc, nil)
311         if err != nil {
312                 return zeroTime, v.translateError(err)
313         }
314         resp, err := v.bucket.Head("recent/"+loc, nil)
315         err = v.translateError(err)
316         if os.IsNotExist(err) {
317                 // The data object X exists, but recent/X is missing.
318                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
319                 if err != nil {
320                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
321                         return zeroTime, v.translateError(err)
322                 }
323                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
324                 resp, err = v.bucket.Head("recent/"+loc, nil)
325                 if err != nil {
326                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
327                         return zeroTime, v.translateError(err)
328                 }
329         } else if err != nil {
330                 // HEAD recent/X failed for some other reason.
331                 return zeroTime, err
332         }
333         return v.lastModified(resp)
334 }
335
336 // IndexTo writes a complete list of locators with the given prefix
337 // for which Get() can retrieve data.
338 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
339         // Use a merge sort to find matching sets of X and recent/X.
340         dataL := s3Lister{
341                 Bucket:   v.bucket,
342                 Prefix:   prefix,
343                 PageSize: v.IndexPageSize,
344         }
345         recentL := s3Lister{
346                 Bucket:   v.bucket,
347                 Prefix:   "recent/" + prefix,
348                 PageSize: v.IndexPageSize,
349         }
350         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
351                 if data.Key >= "g" {
352                         // Conveniently, "recent/*" and "trash/*" are
353                         // lexically greater than all hex-encoded data
354                         // hashes, so stopping here avoids iterating
355                         // over all of them needlessly with dataL.
356                         break
357                 }
358                 if !v.isKeepBlock(data.Key) {
359                         continue
360                 }
361
362                 // stamp is the list entry we should use to report the
363                 // last-modified time for this data block: it will be
364                 // the recent/X entry if one exists, otherwise the
365                 // entry for the data block itself.
366                 stamp := data
367
368                 // Advance to the corresponding recent/X marker, if any
369                 for recent != nil {
370                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
371                                 recent = recentL.Next()
372                                 continue
373                         } else if cmp == 0 {
374                                 stamp = recent
375                                 recent = recentL.Next()
376                                 break
377                         } else {
378                                 // recent/X marker is missing: we'll
379                                 // use the timestamp on the data
380                                 // object.
381                                 break
382                         }
383                 }
384                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
385                 if err != nil {
386                         return err
387                 }
388                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
389         }
390         return nil
391 }
392
393 // Trash a Keep block.
394 func (v *S3Volume) Trash(loc string) error {
395         if v.ReadOnly {
396                 return MethodDisabledError
397         }
398         if t, err := v.Mtime(loc); err != nil {
399                 return err
400         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
401                 return nil
402         }
403         if theConfig.TrashLifetime == 0 {
404                 if !s3UnsafeDelete {
405                         return ErrS3TrashDisabled
406                 }
407                 return v.bucket.Del(loc)
408         }
409         err := v.checkRaceWindow(loc)
410         if err != nil {
411                 return err
412         }
413         err = v.safeCopy("trash/"+loc, loc)
414         if err != nil {
415                 return err
416         }
417         return v.translateError(v.bucket.Del(loc))
418 }
419
420 // checkRaceWindow returns a non-nil error if trash/loc is, or might
421 // be, in the race window (i.e., it's not safe to trash loc).
422 func (v *S3Volume) checkRaceWindow(loc string) error {
423         resp, err := v.bucket.Head("trash/"+loc, nil)
424         err = v.translateError(err)
425         if os.IsNotExist(err) {
426                 // OK, trash/X doesn't exist so we're not in the race
427                 // window
428                 return nil
429         } else if err != nil {
430                 // Error looking up trash/X. We don't know whether
431                 // we're in the race window
432                 return err
433         }
434         t, err := v.lastModified(resp)
435         if err != nil {
436                 // Can't parse timestamp
437                 return err
438         }
439         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
440         if safeWindow <= 0 {
441                 // We can't count on "touch trash/X" to prolong
442                 // trash/X's lifetime. The new timestamp might not
443                 // become visible until now+raceWindow, and EmptyTrash
444                 // is allowed to delete trash/X before then.
445                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
446         }
447         // trash/X exists, but it won't be eligible for deletion until
448         // after now+raceWindow, so it's safe to overwrite it.
449         return nil
450 }
451
452 // safeCopy calls PutCopy, and checks the response to make sure the
453 // copy succeeded and updated the timestamp on the destination object
454 // (PutCopy returns 200 OK if the request was received, even if the
455 // copy failed).
456 func (v *S3Volume) safeCopy(dst, src string) error {
457         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
458                 ContentType:       "application/octet-stream",
459                 MetadataDirective: "REPLACE",
460         }, v.bucket.Name+"/"+src)
461         err = v.translateError(err)
462         if err != nil {
463                 return err
464         }
465         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
466                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
467         } else if time.Now().Sub(t) > maxClockSkew {
468                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
469         }
470         return nil
471 }
472
473 // Get the LastModified header from resp, and parse it as RFC1123 or
474 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
475 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
476         s := resp.Header.Get("Last-Modified")
477         t, err = time.Parse(time.RFC1123, s)
478         if err != nil && s != "" {
479                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
480                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
481                 // as required by HTTP spec. If it's not a valid HTTP
482                 // header value, it's probably AWS (or s3test) giving
483                 // us a nearly-RFC1123 timestamp.
484                 t, err = time.Parse(nearlyRFC1123, s)
485         }
486         return
487 }
488
489 // Untrash moves block from trash back into store
490 func (v *S3Volume) Untrash(loc string) error {
491         err := v.safeCopy(loc, "trash/"+loc)
492         if err != nil {
493                 return err
494         }
495         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
496         return v.translateError(err)
497 }
498
499 // Status returns a *VolumeStatus representing the current in-use
500 // storage capacity and a fake available capacity that doesn't make
501 // the volume seem full or nearly-full.
502 func (v *S3Volume) Status() *VolumeStatus {
503         return &VolumeStatus{
504                 DeviceNum: 1,
505                 BytesFree: BlockSize * 1000,
506                 BytesUsed: 1,
507         }
508 }
509
510 // String implements fmt.Stringer.
511 func (v *S3Volume) String() string {
512         return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
513 }
514
515 // Writable returns false if all future Put, Mtime, and Delete calls
516 // are expected to fail.
517 func (v *S3Volume) Writable() bool {
518         return !v.ReadOnly
519 }
520
521 // Replication returns the storage redundancy of the underlying
522 // device. Configured via command line flag.
523 func (v *S3Volume) Replication() int {
524         return v.S3Replication
525 }
526
527 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
528
529 func (v *S3Volume) isKeepBlock(s string) bool {
530         return s3KeepBlockRegexp.MatchString(s)
531 }
532
533 // fixRace(X) is called when "recent/X" exists but "X" doesn't
534 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
535 // there was a race between Put and Trash, fixRace recovers from the
536 // race by Untrashing the block.
537 func (v *S3Volume) fixRace(loc string) bool {
538         trash, err := v.bucket.Head("trash/"+loc, nil)
539         if err != nil {
540                 if !os.IsNotExist(v.translateError(err)) {
541                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
542                 }
543                 return false
544         }
545         trashTime, err := v.lastModified(trash)
546         if err != nil {
547                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
548                 return false
549         }
550
551         recent, err := v.bucket.Head("recent/"+loc, nil)
552         if err != nil {
553                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
554                 return false
555         }
556         recentTime, err := v.lastModified(recent)
557         if err != nil {
558                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
559                 return false
560         }
561
562         ageWhenTrashed := trashTime.Sub(recentTime)
563         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
564                 // No evidence of a race: block hasn't been written
565                 // since it became eligible for Trash. No fix needed.
566                 return false
567         }
568
569         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
570         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
571         err = v.safeCopy(loc, "trash/"+loc)
572         if err != nil {
573                 log.Printf("error: fixRace: %s", err)
574                 return false
575         }
576         return true
577 }
578
579 func (v *S3Volume) translateError(err error) error {
580         switch err := err.(type) {
581         case *s3.Error:
582                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
583                         strings.Contains(err.Error(), "Not Found") {
584                         return os.ErrNotExist
585                 }
586                 // Other 404 errors like NoSuchVersion and
587                 // NoSuchBucket are different problems which should
588                 // get called out downstream, so we don't convert them
589                 // to os.ErrNotExist.
590         }
591         return err
592 }
593
594 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
595 // and deletes them from the volume.
596 func (v *S3Volume) EmptyTrash() {
597         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
598
599         // Use a merge sort to find matching sets of trash/X and recent/X.
600         trashL := s3Lister{
601                 Bucket:   v.bucket,
602                 Prefix:   "trash/",
603                 PageSize: v.IndexPageSize,
604         }
605         // Define "ready to delete" as "...when EmptyTrash started".
606         startT := time.Now()
607         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
608                 loc := trash.Key[6:]
609                 if !v.isKeepBlock(loc) {
610                         continue
611                 }
612                 bytesInTrash += trash.Size
613                 blocksInTrash++
614
615                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
616                 if err != nil {
617                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
618                         continue
619                 }
620                 recent, err := v.bucket.Head("recent/"+loc, nil)
621                 if err != nil && os.IsNotExist(v.translateError(err)) {
622                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
623                         err = v.Untrash(loc)
624                         if err != nil {
625                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
626                         }
627                         continue
628                 } else if err != nil {
629                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
630                         continue
631                 }
632                 recentT, err := v.lastModified(recent)
633                 if err != nil {
634                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
635                         continue
636                 }
637                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
638                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
639                                 // recent/loc is too old to protect
640                                 // loc from being Trashed again during
641                                 // the raceWindow that starts if we
642                                 // delete trash/X now.
643                                 //
644                                 // Note this means (TrashCheckInterval
645                                 // < BlobSignatureTTL - raceWindow) is
646                                 // necessary to avoid starvation.
647                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
648                                 v.fixRace(loc)
649                                 v.Touch(loc)
650                                 continue
651                         } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
652                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
653                                 v.fixRace(loc)
654                                 continue
655                         } else if err != nil {
656                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
657                                 continue
658                         }
659                 }
660                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
661                         continue
662                 }
663                 err = v.bucket.Del(trash.Key)
664                 if err != nil {
665                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
666                         continue
667                 }
668                 bytesDeleted += trash.Size
669                 blocksDeleted++
670
671                 _, err = v.bucket.Head(loc, nil)
672                 if os.IsNotExist(err) {
673                         err = v.bucket.Del("recent/" + loc)
674                         if err != nil {
675                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
676                         }
677                 } else if err != nil {
678                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
679                 }
680         }
681         if err := trashL.Error(); err != nil {
682                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
683         }
684         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
685 }
686
687 type s3Lister struct {
688         Bucket     *s3.Bucket
689         Prefix     string
690         PageSize   int
691         nextMarker string
692         buf        []s3.Key
693         err        error
694 }
695
696 // First fetches the first page and returns the first item. It returns
697 // nil if the response is the empty set or an error occurs.
698 func (lister *s3Lister) First() *s3.Key {
699         lister.getPage()
700         return lister.pop()
701 }
702
703 // Next returns the next item, fetching the next page if necessary. It
704 // returns nil if the last available item has already been fetched, or
705 // an error occurs.
706 func (lister *s3Lister) Next() *s3.Key {
707         if len(lister.buf) == 0 && lister.nextMarker != "" {
708                 lister.getPage()
709         }
710         return lister.pop()
711 }
712
713 // Return the most recent error encountered by First or Next.
714 func (lister *s3Lister) Error() error {
715         return lister.err
716 }
717
718 func (lister *s3Lister) getPage() {
719         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
720         lister.nextMarker = ""
721         if err != nil {
722                 lister.err = err
723                 return
724         }
725         if resp.IsTruncated {
726                 lister.nextMarker = resp.NextMarker
727         }
728         lister.buf = make([]s3.Key, 0, len(resp.Contents))
729         for _, key := range resp.Contents {
730                 if !strings.HasPrefix(key.Key, lister.Prefix) {
731                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
732                         continue
733                 }
734                 lister.buf = append(lister.buf, key)
735         }
736 }
737
738 func (lister *s3Lister) pop() (k *s3.Key) {
739         if len(lister.buf) > 0 {
740                 k = &lister.buf[0]
741                 lister.buf = lister.buf[1:]
742         }
743         return
744 }