10467: Abort S3 request and release buffer if caller disconnects while server is...
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "context"
5         "encoding/base64"
6         "encoding/hex"
7         "flag"
8         "fmt"
9         "io"
10         "log"
11         "net/http"
12         "os"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.curoverse.com/arvados.git/sdk/go/arvados"
19         "github.com/AdRoll/goamz/aws"
20         "github.com/AdRoll/goamz/s3"
21 )
22
23 var (
24         // ErrS3TrashDisabled is returned by Trash if that operation
25         // is impossible with the current config.
26         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
27
28         s3AccessKeyFile string
29         s3SecretKeyFile string
30         s3RegionName    string
31         s3Endpoint      string
32         s3Replication   int
33         s3UnsafeDelete  bool
34         s3RaceWindow    time.Duration
35
36         s3ACL = s3.Private
37 )
38
39 const (
40         maxClockSkew  = 600 * time.Second
41         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
42 )
43
44 type s3VolumeAdder struct {
45         *Config
46 }
47
48 // String implements flag.Value
49 func (s *s3VolumeAdder) String() string {
50         return "-"
51 }
52
53 func (s *s3VolumeAdder) Set(bucketName string) error {
54         if bucketName == "" {
55                 return fmt.Errorf("no container name given")
56         }
57         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
58                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
59         }
60         if deprecated.flagSerializeIO {
61                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
62         }
63         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
64                 Bucket:        bucketName,
65                 AccessKeyFile: s3AccessKeyFile,
66                 SecretKeyFile: s3SecretKeyFile,
67                 Endpoint:      s3Endpoint,
68                 Region:        s3RegionName,
69                 RaceWindow:    arvados.Duration(s3RaceWindow),
70                 S3Replication: s3Replication,
71                 UnsafeDelete:  s3UnsafeDelete,
72                 ReadOnly:      deprecated.flagReadonly,
73                 IndexPageSize: 1000,
74         })
75         return nil
76 }
77
78 func s3regions() (okList []string) {
79         for r := range aws.Regions {
80                 okList = append(okList, r)
81         }
82         return
83 }
84
85 func init() {
86         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
87
88         flag.Var(&s3VolumeAdder{theConfig},
89                 "s3-bucket-volume",
90                 "Use the given bucket as a storage volume. Can be given multiple times.")
91         flag.StringVar(
92                 &s3RegionName,
93                 "s3-region",
94                 "",
95                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
96         flag.StringVar(
97                 &s3Endpoint,
98                 "s3-endpoint",
99                 "",
100                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
101         flag.StringVar(
102                 &s3AccessKeyFile,
103                 "s3-access-key-file",
104                 "",
105                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
106         flag.StringVar(
107                 &s3SecretKeyFile,
108                 "s3-secret-key-file",
109                 "",
110                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
111         flag.DurationVar(
112                 &s3RaceWindow,
113                 "s3-race-window",
114                 24*time.Hour,
115                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
116         flag.IntVar(
117                 &s3Replication,
118                 "s3-replication",
119                 2,
120                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
121         flag.BoolVar(
122                 &s3UnsafeDelete,
123                 "s3-unsafe-delete",
124                 false,
125                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
126 }
127
128 // S3Volume implements Volume using an S3 bucket.
129 type S3Volume struct {
130         AccessKeyFile      string
131         SecretKeyFile      string
132         Endpoint           string
133         Region             string
134         Bucket             string
135         LocationConstraint bool
136         IndexPageSize      int
137         S3Replication      int
138         ConnectTimeout     arvados.Duration
139         ReadTimeout        arvados.Duration
140         RaceWindow         arvados.Duration
141         ReadOnly           bool
142         UnsafeDelete       bool
143
144         bucket *s3.Bucket
145
146         startOnce sync.Once
147 }
148
149 // Examples implements VolumeWithExamples.
150 func (*S3Volume) Examples() []Volume {
151         return []Volume{
152                 &S3Volume{
153                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
154                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
155                         Endpoint:       "",
156                         Region:         "us-east-1",
157                         Bucket:         "example-bucket-name",
158                         IndexPageSize:  1000,
159                         S3Replication:  2,
160                         RaceWindow:     arvados.Duration(24 * time.Hour),
161                         ConnectTimeout: arvados.Duration(time.Minute),
162                         ReadTimeout:    arvados.Duration(5 * time.Minute),
163                 },
164                 &S3Volume{
165                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
166                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
167                         Endpoint:       "https://storage.googleapis.com",
168                         Region:         "",
169                         Bucket:         "example-bucket-name",
170                         IndexPageSize:  1000,
171                         S3Replication:  2,
172                         RaceWindow:     arvados.Duration(24 * time.Hour),
173                         ConnectTimeout: arvados.Duration(time.Minute),
174                         ReadTimeout:    arvados.Duration(5 * time.Minute),
175                 },
176         }
177 }
178
179 // Type implements Volume.
180 func (*S3Volume) Type() string {
181         return "S3"
182 }
183
184 // Start populates private fields and verifies the configuration is
185 // valid.
186 func (v *S3Volume) Start() error {
187         region, ok := aws.Regions[v.Region]
188         if v.Endpoint == "" {
189                 if !ok {
190                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
191                 }
192         } else if ok {
193                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
194                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
195         } else {
196                 region = aws.Region{
197                         Name:                 v.Region,
198                         S3Endpoint:           v.Endpoint,
199                         S3LocationConstraint: v.LocationConstraint,
200                 }
201         }
202
203         var err error
204         var auth aws.Auth
205         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
206         if err != nil {
207                 return err
208         }
209         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
210         if err != nil {
211                 return err
212         }
213
214         // Zero timeouts mean "wait forever", which is a bad
215         // default. Default to long timeouts instead.
216         if v.ConnectTimeout == 0 {
217                 v.ConnectTimeout = arvados.Duration(time.Minute)
218         }
219         if v.ReadTimeout == 0 {
220                 v.ReadTimeout = arvados.Duration(10 * time.Minute)
221         }
222
223         client := s3.New(auth, region)
224         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
225         client.ReadTimeout = time.Duration(v.ReadTimeout)
226         v.bucket = &s3.Bucket{
227                 S3:   client,
228                 Name: v.Bucket,
229         }
230         return nil
231 }
232
233 // getReader wraps (Bucket)GetReader.
234 //
235 // In situations where (Bucket)GetReader would fail because the block
236 // disappeared in a Trash race, getReader calls fixRace to recover the
237 // data, and tries again.
238 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
239         rdr, err = v.bucket.GetReader(loc)
240         err = v.translateError(err)
241         if err == nil || !os.IsNotExist(err) {
242                 return
243         }
244         _, err = v.bucket.Head("recent/"+loc, nil)
245         err = v.translateError(err)
246         if err != nil {
247                 // If we can't read recent/X, there's no point in
248                 // trying fixRace. Give up.
249                 return
250         }
251         if !v.fixRace(loc) {
252                 err = os.ErrNotExist
253                 return
254         }
255         rdr, err = v.bucket.GetReader(loc)
256         if err != nil {
257                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
258                 err = v.translateError(err)
259         }
260         return
261 }
262
263 // Get a block: copy the block data into buf, and return the number of
264 // bytes copied.
265 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
266         ready := make(chan bool)
267         var rdr io.ReadCloser
268         var err error
269         go func() {
270                 rdr, err = v.getReader(loc)
271                 close(ready)
272         }()
273         select {
274         case <-ctx.Done():
275                 // Client hung up before we could even send our S3 request
276                 return 0, ctx.Err()
277         case <-ready:
278                 if err != nil {
279                         return 0, err
280                 }
281         }
282
283         var n int
284         ready = make(chan bool)
285         go func() {
286                 defer close(ready)
287
288                 defer rdr.Close()
289                 n, err = io.ReadFull(rdr, buf)
290
291                 switch err {
292                 case nil, io.EOF, io.ErrUnexpectedEOF:
293                         err = nil
294                 default:
295                         err = v.translateError(err)
296                 }
297         }()
298         select {
299         case <-ctx.Done():
300                 rdr.Close()
301                 // Must wait for ReadFull to return, to ensure it
302                 // doesn't write to buf after we return.
303                 <-ready
304                 return 0, ctx.Err()
305         case <-ready:
306                 return n, err
307         }
308 }
309
310 // Compare the given data with the stored data.
311 func (v *S3Volume) Compare(loc string, expect []byte) error {
312         rdr, err := v.getReader(loc)
313         if err != nil {
314                 return err
315         }
316         defer rdr.Close()
317         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
318 }
319
320 // Put writes a block.
321 func (v *S3Volume) Put(loc string, block []byte) error {
322         if v.ReadOnly {
323                 return MethodDisabledError
324         }
325         var opts s3.Options
326         if len(block) > 0 {
327                 md5, err := hex.DecodeString(loc)
328                 if err != nil {
329                         return err
330                 }
331                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
332         }
333         err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
334         if err != nil {
335                 return v.translateError(err)
336         }
337         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
338         return v.translateError(err)
339 }
340
341 // Touch sets the timestamp for the given locator to the current time.
342 func (v *S3Volume) Touch(loc string) error {
343         if v.ReadOnly {
344                 return MethodDisabledError
345         }
346         _, err := v.bucket.Head(loc, nil)
347         err = v.translateError(err)
348         if os.IsNotExist(err) && v.fixRace(loc) {
349                 // The data object got trashed in a race, but fixRace
350                 // rescued it.
351         } else if err != nil {
352                 return err
353         }
354         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
355         return v.translateError(err)
356 }
357
358 // Mtime returns the stored timestamp for the given locator.
359 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
360         _, err := v.bucket.Head(loc, nil)
361         if err != nil {
362                 return zeroTime, v.translateError(err)
363         }
364         resp, err := v.bucket.Head("recent/"+loc, nil)
365         err = v.translateError(err)
366         if os.IsNotExist(err) {
367                 // The data object X exists, but recent/X is missing.
368                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
369                 if err != nil {
370                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
371                         return zeroTime, v.translateError(err)
372                 }
373                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
374                 resp, err = v.bucket.Head("recent/"+loc, nil)
375                 if err != nil {
376                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
377                         return zeroTime, v.translateError(err)
378                 }
379         } else if err != nil {
380                 // HEAD recent/X failed for some other reason.
381                 return zeroTime, err
382         }
383         return v.lastModified(resp)
384 }
385
386 // IndexTo writes a complete list of locators with the given prefix
387 // for which Get() can retrieve data.
388 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
389         // Use a merge sort to find matching sets of X and recent/X.
390         dataL := s3Lister{
391                 Bucket:   v.bucket,
392                 Prefix:   prefix,
393                 PageSize: v.IndexPageSize,
394         }
395         recentL := s3Lister{
396                 Bucket:   v.bucket,
397                 Prefix:   "recent/" + prefix,
398                 PageSize: v.IndexPageSize,
399         }
400         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
401                 if data.Key >= "g" {
402                         // Conveniently, "recent/*" and "trash/*" are
403                         // lexically greater than all hex-encoded data
404                         // hashes, so stopping here avoids iterating
405                         // over all of them needlessly with dataL.
406                         break
407                 }
408                 if !v.isKeepBlock(data.Key) {
409                         continue
410                 }
411
412                 // stamp is the list entry we should use to report the
413                 // last-modified time for this data block: it will be
414                 // the recent/X entry if one exists, otherwise the
415                 // entry for the data block itself.
416                 stamp := data
417
418                 // Advance to the corresponding recent/X marker, if any
419                 for recent != nil {
420                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
421                                 recent = recentL.Next()
422                                 continue
423                         } else if cmp == 0 {
424                                 stamp = recent
425                                 recent = recentL.Next()
426                                 break
427                         } else {
428                                 // recent/X marker is missing: we'll
429                                 // use the timestamp on the data
430                                 // object.
431                                 break
432                         }
433                 }
434                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
435                 if err != nil {
436                         return err
437                 }
438                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
439         }
440         return nil
441 }
442
443 // Trash a Keep block.
444 func (v *S3Volume) Trash(loc string) error {
445         if v.ReadOnly {
446                 return MethodDisabledError
447         }
448         if t, err := v.Mtime(loc); err != nil {
449                 return err
450         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
451                 return nil
452         }
453         if theConfig.TrashLifetime == 0 {
454                 if !s3UnsafeDelete {
455                         return ErrS3TrashDisabled
456                 }
457                 return v.bucket.Del(loc)
458         }
459         err := v.checkRaceWindow(loc)
460         if err != nil {
461                 return err
462         }
463         err = v.safeCopy("trash/"+loc, loc)
464         if err != nil {
465                 return err
466         }
467         return v.translateError(v.bucket.Del(loc))
468 }
469
470 // checkRaceWindow returns a non-nil error if trash/loc is, or might
471 // be, in the race window (i.e., it's not safe to trash loc).
472 func (v *S3Volume) checkRaceWindow(loc string) error {
473         resp, err := v.bucket.Head("trash/"+loc, nil)
474         err = v.translateError(err)
475         if os.IsNotExist(err) {
476                 // OK, trash/X doesn't exist so we're not in the race
477                 // window
478                 return nil
479         } else if err != nil {
480                 // Error looking up trash/X. We don't know whether
481                 // we're in the race window
482                 return err
483         }
484         t, err := v.lastModified(resp)
485         if err != nil {
486                 // Can't parse timestamp
487                 return err
488         }
489         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
490         if safeWindow <= 0 {
491                 // We can't count on "touch trash/X" to prolong
492                 // trash/X's lifetime. The new timestamp might not
493                 // become visible until now+raceWindow, and EmptyTrash
494                 // is allowed to delete trash/X before then.
495                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
496         }
497         // trash/X exists, but it won't be eligible for deletion until
498         // after now+raceWindow, so it's safe to overwrite it.
499         return nil
500 }
501
502 // safeCopy calls PutCopy, and checks the response to make sure the
503 // copy succeeded and updated the timestamp on the destination object
504 // (PutCopy returns 200 OK if the request was received, even if the
505 // copy failed).
506 func (v *S3Volume) safeCopy(dst, src string) error {
507         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
508                 ContentType:       "application/octet-stream",
509                 MetadataDirective: "REPLACE",
510         }, v.bucket.Name+"/"+src)
511         err = v.translateError(err)
512         if err != nil {
513                 return err
514         }
515         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
516                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
517         } else if time.Now().Sub(t) > maxClockSkew {
518                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
519         }
520         return nil
521 }
522
523 // Get the LastModified header from resp, and parse it as RFC1123 or
524 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
525 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
526         s := resp.Header.Get("Last-Modified")
527         t, err = time.Parse(time.RFC1123, s)
528         if err != nil && s != "" {
529                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
530                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
531                 // as required by HTTP spec. If it's not a valid HTTP
532                 // header value, it's probably AWS (or s3test) giving
533                 // us a nearly-RFC1123 timestamp.
534                 t, err = time.Parse(nearlyRFC1123, s)
535         }
536         return
537 }
538
539 // Untrash moves block from trash back into store
540 func (v *S3Volume) Untrash(loc string) error {
541         err := v.safeCopy(loc, "trash/"+loc)
542         if err != nil {
543                 return err
544         }
545         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
546         return v.translateError(err)
547 }
548
549 // Status returns a *VolumeStatus representing the current in-use
550 // storage capacity and a fake available capacity that doesn't make
551 // the volume seem full or nearly-full.
552 func (v *S3Volume) Status() *VolumeStatus {
553         return &VolumeStatus{
554                 DeviceNum: 1,
555                 BytesFree: BlockSize * 1000,
556                 BytesUsed: 1,
557         }
558 }
559
560 // String implements fmt.Stringer.
561 func (v *S3Volume) String() string {
562         return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
563 }
564
565 // Writable returns false if all future Put, Mtime, and Delete calls
566 // are expected to fail.
567 func (v *S3Volume) Writable() bool {
568         return !v.ReadOnly
569 }
570
571 // Replication returns the storage redundancy of the underlying
572 // device. Configured via command line flag.
573 func (v *S3Volume) Replication() int {
574         return v.S3Replication
575 }
576
577 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
578
579 func (v *S3Volume) isKeepBlock(s string) bool {
580         return s3KeepBlockRegexp.MatchString(s)
581 }
582
583 // fixRace(X) is called when "recent/X" exists but "X" doesn't
584 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
585 // there was a race between Put and Trash, fixRace recovers from the
586 // race by Untrashing the block.
587 func (v *S3Volume) fixRace(loc string) bool {
588         trash, err := v.bucket.Head("trash/"+loc, nil)
589         if err != nil {
590                 if !os.IsNotExist(v.translateError(err)) {
591                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
592                 }
593                 return false
594         }
595         trashTime, err := v.lastModified(trash)
596         if err != nil {
597                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
598                 return false
599         }
600
601         recent, err := v.bucket.Head("recent/"+loc, nil)
602         if err != nil {
603                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
604                 return false
605         }
606         recentTime, err := v.lastModified(recent)
607         if err != nil {
608                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
609                 return false
610         }
611
612         ageWhenTrashed := trashTime.Sub(recentTime)
613         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
614                 // No evidence of a race: block hasn't been written
615                 // since it became eligible for Trash. No fix needed.
616                 return false
617         }
618
619         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
620         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
621         err = v.safeCopy(loc, "trash/"+loc)
622         if err != nil {
623                 log.Printf("error: fixRace: %s", err)
624                 return false
625         }
626         return true
627 }
628
629 func (v *S3Volume) translateError(err error) error {
630         switch err := err.(type) {
631         case *s3.Error:
632                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
633                         strings.Contains(err.Error(), "Not Found") {
634                         return os.ErrNotExist
635                 }
636                 // Other 404 errors like NoSuchVersion and
637                 // NoSuchBucket are different problems which should
638                 // get called out downstream, so we don't convert them
639                 // to os.ErrNotExist.
640         }
641         return err
642 }
643
644 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
645 // and deletes them from the volume.
646 func (v *S3Volume) EmptyTrash() {
647         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
648
649         // Use a merge sort to find matching sets of trash/X and recent/X.
650         trashL := s3Lister{
651                 Bucket:   v.bucket,
652                 Prefix:   "trash/",
653                 PageSize: v.IndexPageSize,
654         }
655         // Define "ready to delete" as "...when EmptyTrash started".
656         startT := time.Now()
657         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
658                 loc := trash.Key[6:]
659                 if !v.isKeepBlock(loc) {
660                         continue
661                 }
662                 bytesInTrash += trash.Size
663                 blocksInTrash++
664
665                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
666                 if err != nil {
667                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
668                         continue
669                 }
670                 recent, err := v.bucket.Head("recent/"+loc, nil)
671                 if err != nil && os.IsNotExist(v.translateError(err)) {
672                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
673                         err = v.Untrash(loc)
674                         if err != nil {
675                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
676                         }
677                         continue
678                 } else if err != nil {
679                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
680                         continue
681                 }
682                 recentT, err := v.lastModified(recent)
683                 if err != nil {
684                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
685                         continue
686                 }
687                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
688                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
689                                 // recent/loc is too old to protect
690                                 // loc from being Trashed again during
691                                 // the raceWindow that starts if we
692                                 // delete trash/X now.
693                                 //
694                                 // Note this means (TrashCheckInterval
695                                 // < BlobSignatureTTL - raceWindow) is
696                                 // necessary to avoid starvation.
697                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
698                                 v.fixRace(loc)
699                                 v.Touch(loc)
700                                 continue
701                         } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
702                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
703                                 v.fixRace(loc)
704                                 continue
705                         } else if err != nil {
706                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
707                                 continue
708                         }
709                 }
710                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
711                         continue
712                 }
713                 err = v.bucket.Del(trash.Key)
714                 if err != nil {
715                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
716                         continue
717                 }
718                 bytesDeleted += trash.Size
719                 blocksDeleted++
720
721                 _, err = v.bucket.Head(loc, nil)
722                 if os.IsNotExist(err) {
723                         err = v.bucket.Del("recent/" + loc)
724                         if err != nil {
725                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
726                         }
727                 } else if err != nil {
728                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
729                 }
730         }
731         if err := trashL.Error(); err != nil {
732                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
733         }
734         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
735 }
736
737 type s3Lister struct {
738         Bucket     *s3.Bucket
739         Prefix     string
740         PageSize   int
741         nextMarker string
742         buf        []s3.Key
743         err        error
744 }
745
746 // First fetches the first page and returns the first item. It returns
747 // nil if the response is the empty set or an error occurs.
748 func (lister *s3Lister) First() *s3.Key {
749         lister.getPage()
750         return lister.pop()
751 }
752
753 // Next returns the next item, fetching the next page if necessary. It
754 // returns nil if the last available item has already been fetched, or
755 // an error occurs.
756 func (lister *s3Lister) Next() *s3.Key {
757         if len(lister.buf) == 0 && lister.nextMarker != "" {
758                 lister.getPage()
759         }
760         return lister.pop()
761 }
762
763 // Return the most recent error encountered by First or Next.
764 func (lister *s3Lister) Error() error {
765         return lister.err
766 }
767
768 func (lister *s3Lister) getPage() {
769         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
770         lister.nextMarker = ""
771         if err != nil {
772                 lister.err = err
773                 return
774         }
775         if resp.IsTruncated {
776                 lister.nextMarker = resp.NextMarker
777         }
778         lister.buf = make([]s3.Key, 0, len(resp.Contents))
779         for _, key := range resp.Contents {
780                 if !strings.HasPrefix(key.Key, lister.Prefix) {
781                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
782                         continue
783                 }
784                 lister.buf = append(lister.buf, key)
785         }
786 }
787
788 func (lister *s3Lister) pop() (k *s3.Key) {
789         if len(lister.buf) > 0 {
790                 k = &lister.buf[0]
791                 lister.buf = lister.buf[1:]
792         }
793         return
794 }