11469: Docker-managed volumes go in "Volumes" not "Binds".
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/base64"
7         "encoding/hex"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         "os"
14         "regexp"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/AdRoll/goamz/aws"
21         "github.com/AdRoll/goamz/s3"
22         log "github.com/Sirupsen/logrus"
23 )
24
25 const (
26         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
27         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
28 )
29
30 var (
31         // ErrS3TrashDisabled is returned by Trash if that operation
32         // is impossible with the current config.
33         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
34
35         s3AccessKeyFile string
36         s3SecretKeyFile string
37         s3RegionName    string
38         s3Endpoint      string
39         s3Replication   int
40         s3UnsafeDelete  bool
41         s3RaceWindow    time.Duration
42
43         s3ACL = s3.Private
44 )
45
46 const (
47         maxClockSkew  = 600 * time.Second
48         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
49 )
50
51 type s3VolumeAdder struct {
52         *Config
53 }
54
55 // String implements flag.Value
56 func (s *s3VolumeAdder) String() string {
57         return "-"
58 }
59
60 func (s *s3VolumeAdder) Set(bucketName string) error {
61         if bucketName == "" {
62                 return fmt.Errorf("no container name given")
63         }
64         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
65                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
66         }
67         if deprecated.flagSerializeIO {
68                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
69         }
70         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
71                 Bucket:        bucketName,
72                 AccessKeyFile: s3AccessKeyFile,
73                 SecretKeyFile: s3SecretKeyFile,
74                 Endpoint:      s3Endpoint,
75                 Region:        s3RegionName,
76                 RaceWindow:    arvados.Duration(s3RaceWindow),
77                 S3Replication: s3Replication,
78                 UnsafeDelete:  s3UnsafeDelete,
79                 ReadOnly:      deprecated.flagReadonly,
80                 IndexPageSize: 1000,
81         })
82         return nil
83 }
84
85 func s3regions() (okList []string) {
86         for r := range aws.Regions {
87                 okList = append(okList, r)
88         }
89         return
90 }
91
92 func init() {
93         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
94
95         flag.Var(&s3VolumeAdder{theConfig},
96                 "s3-bucket-volume",
97                 "Use the given bucket as a storage volume. Can be given multiple times.")
98         flag.StringVar(
99                 &s3RegionName,
100                 "s3-region",
101                 "",
102                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
103         flag.StringVar(
104                 &s3Endpoint,
105                 "s3-endpoint",
106                 "",
107                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
108         flag.StringVar(
109                 &s3AccessKeyFile,
110                 "s3-access-key-file",
111                 "",
112                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
113         flag.StringVar(
114                 &s3SecretKeyFile,
115                 "s3-secret-key-file",
116                 "",
117                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
118         flag.DurationVar(
119                 &s3RaceWindow,
120                 "s3-race-window",
121                 24*time.Hour,
122                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
123         flag.IntVar(
124                 &s3Replication,
125                 "s3-replication",
126                 2,
127                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
128         flag.BoolVar(
129                 &s3UnsafeDelete,
130                 "s3-unsafe-delete",
131                 false,
132                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
133 }
134
135 // S3Volume implements Volume using an S3 bucket.
136 type S3Volume struct {
137         AccessKeyFile      string
138         SecretKeyFile      string
139         Endpoint           string
140         Region             string
141         Bucket             string
142         LocationConstraint bool
143         IndexPageSize      int
144         S3Replication      int
145         ConnectTimeout     arvados.Duration
146         ReadTimeout        arvados.Duration
147         RaceWindow         arvados.Duration
148         ReadOnly           bool
149         UnsafeDelete       bool
150
151         bucket *s3bucket
152
153         startOnce sync.Once
154 }
155
156 // Examples implements VolumeWithExamples.
157 func (*S3Volume) Examples() []Volume {
158         return []Volume{
159                 &S3Volume{
160                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
161                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
162                         Endpoint:       "",
163                         Region:         "us-east-1",
164                         Bucket:         "example-bucket-name",
165                         IndexPageSize:  1000,
166                         S3Replication:  2,
167                         RaceWindow:     arvados.Duration(24 * time.Hour),
168                         ConnectTimeout: arvados.Duration(time.Minute),
169                         ReadTimeout:    arvados.Duration(5 * time.Minute),
170                 },
171                 &S3Volume{
172                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
173                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
174                         Endpoint:       "https://storage.googleapis.com",
175                         Region:         "",
176                         Bucket:         "example-bucket-name",
177                         IndexPageSize:  1000,
178                         S3Replication:  2,
179                         RaceWindow:     arvados.Duration(24 * time.Hour),
180                         ConnectTimeout: arvados.Duration(time.Minute),
181                         ReadTimeout:    arvados.Duration(5 * time.Minute),
182                 },
183         }
184 }
185
186 // Type implements Volume.
187 func (*S3Volume) Type() string {
188         return "S3"
189 }
190
191 // Start populates private fields and verifies the configuration is
192 // valid.
193 func (v *S3Volume) Start() error {
194         region, ok := aws.Regions[v.Region]
195         if v.Endpoint == "" {
196                 if !ok {
197                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
198                 }
199         } else if ok {
200                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
201                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
202         } else {
203                 region = aws.Region{
204                         Name:                 v.Region,
205                         S3Endpoint:           v.Endpoint,
206                         S3LocationConstraint: v.LocationConstraint,
207                 }
208         }
209
210         var err error
211         var auth aws.Auth
212         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
213         if err != nil {
214                 return err
215         }
216         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
217         if err != nil {
218                 return err
219         }
220
221         // Zero timeouts mean "wait forever", which is a bad
222         // default. Default to long timeouts instead.
223         if v.ConnectTimeout == 0 {
224                 v.ConnectTimeout = s3DefaultConnectTimeout
225         }
226         if v.ReadTimeout == 0 {
227                 v.ReadTimeout = s3DefaultReadTimeout
228         }
229
230         client := s3.New(auth, region)
231         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
232         client.ReadTimeout = time.Duration(v.ReadTimeout)
233         v.bucket = &s3bucket{
234                 Bucket: &s3.Bucket{
235                         S3:   client,
236                         Name: v.Bucket,
237                 },
238         }
239         return nil
240 }
241
242 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
243         ready := make(chan bool)
244         go func() {
245                 rdr, err = v.getReader(loc)
246                 close(ready)
247         }()
248         select {
249         case <-ready:
250                 return
251         case <-ctx.Done():
252                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
253                 go func() {
254                         <-ready
255                         if err == nil {
256                                 rdr.Close()
257                         }
258                 }()
259                 return nil, ctx.Err()
260         }
261 }
262
263 // getReader wraps (Bucket)GetReader.
264 //
265 // In situations where (Bucket)GetReader would fail because the block
266 // disappeared in a Trash race, getReader calls fixRace to recover the
267 // data, and tries again.
268 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
269         rdr, err = v.bucket.GetReader(loc)
270         err = v.translateError(err)
271         if err == nil || !os.IsNotExist(err) {
272                 return
273         }
274
275         _, err = v.bucket.Head("recent/"+loc, nil)
276         err = v.translateError(err)
277         if err != nil {
278                 // If we can't read recent/X, there's no point in
279                 // trying fixRace. Give up.
280                 return
281         }
282         if !v.fixRace(loc) {
283                 err = os.ErrNotExist
284                 return
285         }
286
287         rdr, err = v.bucket.GetReader(loc)
288         if err != nil {
289                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
290                 err = v.translateError(err)
291         }
292         return
293 }
294
295 // Get a block: copy the block data into buf, and return the number of
296 // bytes copied.
297 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
298         rdr, err := v.getReaderWithContext(ctx, loc)
299         if err != nil {
300                 return 0, err
301         }
302
303         var n int
304         ready := make(chan bool)
305         go func() {
306                 defer close(ready)
307
308                 defer rdr.Close()
309                 n, err = io.ReadFull(rdr, buf)
310
311                 switch err {
312                 case nil, io.EOF, io.ErrUnexpectedEOF:
313                         err = nil
314                 default:
315                         err = v.translateError(err)
316                 }
317         }()
318         select {
319         case <-ctx.Done():
320                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
321                 rdr.Close()
322                 // Must wait for ReadFull to return, to ensure it
323                 // doesn't write to buf after we return.
324                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
325                 <-ready
326                 return 0, ctx.Err()
327         case <-ready:
328                 return n, err
329         }
330 }
331
332 // Compare the given data with the stored data.
333 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
334         rdr, err := v.getReaderWithContext(ctx, loc)
335         if err != nil {
336                 return err
337         }
338         defer rdr.Close()
339         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
340 }
341
342 // Put writes a block.
343 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
344         if v.ReadOnly {
345                 return MethodDisabledError
346         }
347         var opts s3.Options
348         size := len(block)
349         if size > 0 {
350                 md5, err := hex.DecodeString(loc)
351                 if err != nil {
352                         return err
353                 }
354                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
355         }
356
357         // Send the block data through a pipe, so that (if we need to)
358         // we can close the pipe early and abandon our PutReader()
359         // goroutine, without worrying about PutReader() accessing our
360         // block buffer after we release it.
361         bufr, bufw := io.Pipe()
362         go func() {
363                 io.Copy(bufw, bytes.NewReader(block))
364                 bufw.Close()
365         }()
366
367         var err error
368         ready := make(chan bool)
369         go func() {
370                 defer func() {
371                         if ctx.Err() != nil {
372                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
373                         }
374                 }()
375                 defer close(ready)
376                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
377                 if err != nil {
378                         return
379                 }
380                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
381         }()
382         select {
383         case <-ctx.Done():
384                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
385                 // Our pipe might be stuck in Write(), waiting for
386                 // io.Copy() to read. If so, un-stick it. This means
387                 // PutReader will get corrupt data, but that's OK: the
388                 // size and MD5 won't match, so the write will fail.
389                 go io.Copy(ioutil.Discard, bufr)
390                 // CloseWithError() will return once pending I/O is done.
391                 bufw.CloseWithError(ctx.Err())
392                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
393                 return ctx.Err()
394         case <-ready:
395                 return v.translateError(err)
396         }
397 }
398
399 // Touch sets the timestamp for the given locator to the current time.
400 func (v *S3Volume) Touch(loc string) error {
401         if v.ReadOnly {
402                 return MethodDisabledError
403         }
404         _, err := v.bucket.Head(loc, nil)
405         err = v.translateError(err)
406         if os.IsNotExist(err) && v.fixRace(loc) {
407                 // The data object got trashed in a race, but fixRace
408                 // rescued it.
409         } else if err != nil {
410                 return err
411         }
412         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
413         return v.translateError(err)
414 }
415
416 // Mtime returns the stored timestamp for the given locator.
417 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
418         _, err := v.bucket.Head(loc, nil)
419         if err != nil {
420                 return zeroTime, v.translateError(err)
421         }
422         resp, err := v.bucket.Head("recent/"+loc, nil)
423         err = v.translateError(err)
424         if os.IsNotExist(err) {
425                 // The data object X exists, but recent/X is missing.
426                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
427                 if err != nil {
428                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
429                         return zeroTime, v.translateError(err)
430                 }
431                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
432                 resp, err = v.bucket.Head("recent/"+loc, nil)
433                 if err != nil {
434                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
435                         return zeroTime, v.translateError(err)
436                 }
437         } else if err != nil {
438                 // HEAD recent/X failed for some other reason.
439                 return zeroTime, err
440         }
441         return v.lastModified(resp)
442 }
443
444 // IndexTo writes a complete list of locators with the given prefix
445 // for which Get() can retrieve data.
446 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
447         // Use a merge sort to find matching sets of X and recent/X.
448         dataL := s3Lister{
449                 Bucket:   v.bucket.Bucket,
450                 Prefix:   prefix,
451                 PageSize: v.IndexPageSize,
452         }
453         recentL := s3Lister{
454                 Bucket:   v.bucket.Bucket,
455                 Prefix:   "recent/" + prefix,
456                 PageSize: v.IndexPageSize,
457         }
458         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
459         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
460         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
461                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
462                 if data.Key >= "g" {
463                         // Conveniently, "recent/*" and "trash/*" are
464                         // lexically greater than all hex-encoded data
465                         // hashes, so stopping here avoids iterating
466                         // over all of them needlessly with dataL.
467                         break
468                 }
469                 if !v.isKeepBlock(data.Key) {
470                         continue
471                 }
472
473                 // stamp is the list entry we should use to report the
474                 // last-modified time for this data block: it will be
475                 // the recent/X entry if one exists, otherwise the
476                 // entry for the data block itself.
477                 stamp := data
478
479                 // Advance to the corresponding recent/X marker, if any
480                 for recent != nil {
481                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
482                                 recent = recentL.Next()
483                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
484                                 continue
485                         } else if cmp == 0 {
486                                 stamp = recent
487                                 recent = recentL.Next()
488                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
489                                 break
490                         } else {
491                                 // recent/X marker is missing: we'll
492                                 // use the timestamp on the data
493                                 // object.
494                                 break
495                         }
496                 }
497                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
498                 if err != nil {
499                         return err
500                 }
501                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
502         }
503         return nil
504 }
505
506 // Trash a Keep block.
507 func (v *S3Volume) Trash(loc string) error {
508         if v.ReadOnly {
509                 return MethodDisabledError
510         }
511         if t, err := v.Mtime(loc); err != nil {
512                 return err
513         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
514                 return nil
515         }
516         if theConfig.TrashLifetime == 0 {
517                 if !s3UnsafeDelete {
518                         return ErrS3TrashDisabled
519                 }
520                 return v.translateError(v.bucket.Del(loc))
521         }
522         err := v.checkRaceWindow(loc)
523         if err != nil {
524                 return err
525         }
526         err = v.safeCopy("trash/"+loc, loc)
527         if err != nil {
528                 return err
529         }
530         return v.translateError(v.bucket.Del(loc))
531 }
532
533 // checkRaceWindow returns a non-nil error if trash/loc is, or might
534 // be, in the race window (i.e., it's not safe to trash loc).
535 func (v *S3Volume) checkRaceWindow(loc string) error {
536         resp, err := v.bucket.Head("trash/"+loc, nil)
537         err = v.translateError(err)
538         if os.IsNotExist(err) {
539                 // OK, trash/X doesn't exist so we're not in the race
540                 // window
541                 return nil
542         } else if err != nil {
543                 // Error looking up trash/X. We don't know whether
544                 // we're in the race window
545                 return err
546         }
547         t, err := v.lastModified(resp)
548         if err != nil {
549                 // Can't parse timestamp
550                 return err
551         }
552         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
553         if safeWindow <= 0 {
554                 // We can't count on "touch trash/X" to prolong
555                 // trash/X's lifetime. The new timestamp might not
556                 // become visible until now+raceWindow, and EmptyTrash
557                 // is allowed to delete trash/X before then.
558                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
559         }
560         // trash/X exists, but it won't be eligible for deletion until
561         // after now+raceWindow, so it's safe to overwrite it.
562         return nil
563 }
564
565 // safeCopy calls PutCopy, and checks the response to make sure the
566 // copy succeeded and updated the timestamp on the destination object
567 // (PutCopy returns 200 OK if the request was received, even if the
568 // copy failed).
569 func (v *S3Volume) safeCopy(dst, src string) error {
570         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
571                 ContentType:       "application/octet-stream",
572                 MetadataDirective: "REPLACE",
573         }, v.bucket.Name+"/"+src)
574         err = v.translateError(err)
575         if err != nil {
576                 return err
577         }
578         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
579                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
580         } else if time.Now().Sub(t) > maxClockSkew {
581                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
582         }
583         return nil
584 }
585
586 // Get the LastModified header from resp, and parse it as RFC1123 or
587 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
588 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
589         s := resp.Header.Get("Last-Modified")
590         t, err = time.Parse(time.RFC1123, s)
591         if err != nil && s != "" {
592                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
593                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
594                 // as required by HTTP spec. If it's not a valid HTTP
595                 // header value, it's probably AWS (or s3test) giving
596                 // us a nearly-RFC1123 timestamp.
597                 t, err = time.Parse(nearlyRFC1123, s)
598         }
599         return
600 }
601
602 // Untrash moves block from trash back into store
603 func (v *S3Volume) Untrash(loc string) error {
604         err := v.safeCopy(loc, "trash/"+loc)
605         if err != nil {
606                 return err
607         }
608         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
609         return v.translateError(err)
610 }
611
612 // Status returns a *VolumeStatus representing the current in-use
613 // storage capacity and a fake available capacity that doesn't make
614 // the volume seem full or nearly-full.
615 func (v *S3Volume) Status() *VolumeStatus {
616         return &VolumeStatus{
617                 DeviceNum: 1,
618                 BytesFree: BlockSize * 1000,
619                 BytesUsed: 1,
620         }
621 }
622
623 // InternalStats returns bucket I/O and API call counters.
624 func (v *S3Volume) InternalStats() interface{} {
625         return &v.bucket.stats
626 }
627
628 // String implements fmt.Stringer.
629 func (v *S3Volume) String() string {
630         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
631 }
632
633 // Writable returns false if all future Put, Mtime, and Delete calls
634 // are expected to fail.
635 func (v *S3Volume) Writable() bool {
636         return !v.ReadOnly
637 }
638
639 // Replication returns the storage redundancy of the underlying
640 // device. Configured via command line flag.
641 func (v *S3Volume) Replication() int {
642         return v.S3Replication
643 }
644
645 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
646
647 func (v *S3Volume) isKeepBlock(s string) bool {
648         return s3KeepBlockRegexp.MatchString(s)
649 }
650
651 // fixRace(X) is called when "recent/X" exists but "X" doesn't
652 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
653 // there was a race between Put and Trash, fixRace recovers from the
654 // race by Untrashing the block.
655 func (v *S3Volume) fixRace(loc string) bool {
656         trash, err := v.bucket.Head("trash/"+loc, nil)
657         if err != nil {
658                 if !os.IsNotExist(v.translateError(err)) {
659                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
660                 }
661                 return false
662         }
663         trashTime, err := v.lastModified(trash)
664         if err != nil {
665                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
666                 return false
667         }
668
669         recent, err := v.bucket.Head("recent/"+loc, nil)
670         if err != nil {
671                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
672                 return false
673         }
674         recentTime, err := v.lastModified(recent)
675         if err != nil {
676                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
677                 return false
678         }
679
680         ageWhenTrashed := trashTime.Sub(recentTime)
681         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
682                 // No evidence of a race: block hasn't been written
683                 // since it became eligible for Trash. No fix needed.
684                 return false
685         }
686
687         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
688         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
689         err = v.safeCopy(loc, "trash/"+loc)
690         if err != nil {
691                 log.Printf("error: fixRace: %s", err)
692                 return false
693         }
694         return true
695 }
696
697 func (v *S3Volume) translateError(err error) error {
698         switch err := err.(type) {
699         case *s3.Error:
700                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
701                         strings.Contains(err.Error(), "Not Found") {
702                         return os.ErrNotExist
703                 }
704                 // Other 404 errors like NoSuchVersion and
705                 // NoSuchBucket are different problems which should
706                 // get called out downstream, so we don't convert them
707                 // to os.ErrNotExist.
708         }
709         return err
710 }
711
712 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
713 // and deletes them from the volume.
714 func (v *S3Volume) EmptyTrash() {
715         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
716
717         // Use a merge sort to find matching sets of trash/X and recent/X.
718         trashL := s3Lister{
719                 Bucket:   v.bucket.Bucket,
720                 Prefix:   "trash/",
721                 PageSize: v.IndexPageSize,
722         }
723         // Define "ready to delete" as "...when EmptyTrash started".
724         startT := time.Now()
725         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
726                 loc := trash.Key[6:]
727                 if !v.isKeepBlock(loc) {
728                         continue
729                 }
730                 bytesInTrash += trash.Size
731                 blocksInTrash++
732
733                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
734                 if err != nil {
735                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
736                         continue
737                 }
738                 recent, err := v.bucket.Head("recent/"+loc, nil)
739                 if err != nil && os.IsNotExist(v.translateError(err)) {
740                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
741                         err = v.Untrash(loc)
742                         if err != nil {
743                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
744                         }
745                         continue
746                 } else if err != nil {
747                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
748                         continue
749                 }
750                 recentT, err := v.lastModified(recent)
751                 if err != nil {
752                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
753                         continue
754                 }
755                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
756                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
757                                 // recent/loc is too old to protect
758                                 // loc from being Trashed again during
759                                 // the raceWindow that starts if we
760                                 // delete trash/X now.
761                                 //
762                                 // Note this means (TrashCheckInterval
763                                 // < BlobSignatureTTL - raceWindow) is
764                                 // necessary to avoid starvation.
765                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
766                                 v.fixRace(loc)
767                                 v.Touch(loc)
768                                 continue
769                         }
770                         _, err := v.bucket.Head(loc, nil)
771                         if os.IsNotExist(err) {
772                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
773                                 v.fixRace(loc)
774                                 continue
775                         } else if err != nil {
776                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
777                                 continue
778                         }
779                 }
780                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
781                         continue
782                 }
783                 err = v.bucket.Del(trash.Key)
784                 if err != nil {
785                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
786                         continue
787                 }
788                 bytesDeleted += trash.Size
789                 blocksDeleted++
790
791                 _, err = v.bucket.Head(loc, nil)
792                 if os.IsNotExist(err) {
793                         err = v.bucket.Del("recent/" + loc)
794                         if err != nil {
795                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
796                         }
797                 } else if err != nil {
798                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
799                 }
800         }
801         if err := trashL.Error(); err != nil {
802                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
803         }
804         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
805 }
806
807 type s3Lister struct {
808         Bucket     *s3.Bucket
809         Prefix     string
810         PageSize   int
811         nextMarker string
812         buf        []s3.Key
813         err        error
814 }
815
816 // First fetches the first page and returns the first item. It returns
817 // nil if the response is the empty set or an error occurs.
818 func (lister *s3Lister) First() *s3.Key {
819         lister.getPage()
820         return lister.pop()
821 }
822
823 // Next returns the next item, fetching the next page if necessary. It
824 // returns nil if the last available item has already been fetched, or
825 // an error occurs.
826 func (lister *s3Lister) Next() *s3.Key {
827         if len(lister.buf) == 0 && lister.nextMarker != "" {
828                 lister.getPage()
829         }
830         return lister.pop()
831 }
832
833 // Return the most recent error encountered by First or Next.
834 func (lister *s3Lister) Error() error {
835         return lister.err
836 }
837
838 func (lister *s3Lister) getPage() {
839         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
840         lister.nextMarker = ""
841         if err != nil {
842                 lister.err = err
843                 return
844         }
845         if resp.IsTruncated {
846                 lister.nextMarker = resp.NextMarker
847         }
848         lister.buf = make([]s3.Key, 0, len(resp.Contents))
849         for _, key := range resp.Contents {
850                 if !strings.HasPrefix(key.Key, lister.Prefix) {
851                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
852                         continue
853                 }
854                 lister.buf = append(lister.buf, key)
855         }
856 }
857
858 func (lister *s3Lister) pop() (k *s3.Key) {
859         if len(lister.buf) > 0 {
860                 k = &lister.buf[0]
861                 lister.buf = lister.buf[1:]
862         }
863         return
864 }
865
866 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
867 type s3bucket struct {
868         *s3.Bucket
869         stats s3bucketStats
870 }
871
872 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
873         rdr, err := b.Bucket.GetReader(path)
874         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
875         b.stats.TickErr(err)
876         return NewCountingReader(rdr, b.stats.TickInBytes), err
877 }
878
879 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
880         resp, err := b.Bucket.Head(path, headers)
881         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
882         b.stats.TickErr(err)
883         return resp, err
884 }
885
886 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
887         err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.TickOutBytes), length, contType, perm, options)
888         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
889         b.stats.TickErr(err)
890         return err
891 }
892
893 func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
894         err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.TickOutBytes), int64(len(data)), contType, perm, options)
895         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
896         b.stats.TickErr(err)
897         return err
898 }
899
900 func (b *s3bucket) Del(path string) error {
901         err := b.Bucket.Del(path)
902         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
903         b.stats.TickErr(err)
904         return err
905 }
906
907 type s3bucketStats struct {
908         statsTicker
909         Ops     uint64
910         GetOps  uint64
911         PutOps  uint64
912         HeadOps uint64
913         DelOps  uint64
914         ListOps uint64
915 }
916
917 func (s *s3bucketStats) TickErr(err error) {
918         if err == nil {
919                 return
920         }
921         errType := fmt.Sprintf("%T", err)
922         if err, ok := err.(*s3.Error); ok {
923                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
924         }
925         s.statsTicker.TickErr(err, errType)
926 }