Merge branch '8784-dir-listings'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/AdRoll/goamz/aws"
25         "github.com/AdRoll/goamz/s3"
26         log "github.com/Sirupsen/logrus"
27 )
28
29 const (
30         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
31         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
32 )
33
34 var (
35         // ErrS3TrashDisabled is returned by Trash if that operation
36         // is impossible with the current config.
37         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
38
39         s3AccessKeyFile string
40         s3SecretKeyFile string
41         s3RegionName    string
42         s3Endpoint      string
43         s3Replication   int
44         s3UnsafeDelete  bool
45         s3RaceWindow    time.Duration
46
47         s3ACL = s3.Private
48 )
49
50 const (
51         maxClockSkew  = 600 * time.Second
52         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
53 )
54
55 type s3VolumeAdder struct {
56         *Config
57 }
58
59 // String implements flag.Value
60 func (s *s3VolumeAdder) String() string {
61         return "-"
62 }
63
64 func (s *s3VolumeAdder) Set(bucketName string) error {
65         if bucketName == "" {
66                 return fmt.Errorf("no container name given")
67         }
68         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
69                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
70         }
71         if deprecated.flagSerializeIO {
72                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
73         }
74         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
75                 Bucket:        bucketName,
76                 AccessKeyFile: s3AccessKeyFile,
77                 SecretKeyFile: s3SecretKeyFile,
78                 Endpoint:      s3Endpoint,
79                 Region:        s3RegionName,
80                 RaceWindow:    arvados.Duration(s3RaceWindow),
81                 S3Replication: s3Replication,
82                 UnsafeDelete:  s3UnsafeDelete,
83                 ReadOnly:      deprecated.flagReadonly,
84                 IndexPageSize: 1000,
85         })
86         return nil
87 }
88
89 func s3regions() (okList []string) {
90         for r := range aws.Regions {
91                 okList = append(okList, r)
92         }
93         return
94 }
95
96 func init() {
97         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
98
99         flag.Var(&s3VolumeAdder{theConfig},
100                 "s3-bucket-volume",
101                 "Use the given bucket as a storage volume. Can be given multiple times.")
102         flag.StringVar(
103                 &s3RegionName,
104                 "s3-region",
105                 "",
106                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
107         flag.StringVar(
108                 &s3Endpoint,
109                 "s3-endpoint",
110                 "",
111                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
112         flag.StringVar(
113                 &s3AccessKeyFile,
114                 "s3-access-key-file",
115                 "",
116                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
117         flag.StringVar(
118                 &s3SecretKeyFile,
119                 "s3-secret-key-file",
120                 "",
121                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
122         flag.DurationVar(
123                 &s3RaceWindow,
124                 "s3-race-window",
125                 24*time.Hour,
126                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
127         flag.IntVar(
128                 &s3Replication,
129                 "s3-replication",
130                 2,
131                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
132         flag.BoolVar(
133                 &s3UnsafeDelete,
134                 "s3-unsafe-delete",
135                 false,
136                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
137 }
138
139 // S3Volume implements Volume using an S3 bucket.
140 type S3Volume struct {
141         AccessKeyFile      string
142         SecretKeyFile      string
143         Endpoint           string
144         Region             string
145         Bucket             string
146         LocationConstraint bool
147         IndexPageSize      int
148         S3Replication      int
149         ConnectTimeout     arvados.Duration
150         ReadTimeout        arvados.Duration
151         RaceWindow         arvados.Duration
152         ReadOnly           bool
153         UnsafeDelete       bool
154
155         bucket *s3bucket
156
157         startOnce sync.Once
158 }
159
160 // Examples implements VolumeWithExamples.
161 func (*S3Volume) Examples() []Volume {
162         return []Volume{
163                 &S3Volume{
164                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
165                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
166                         Endpoint:       "",
167                         Region:         "us-east-1",
168                         Bucket:         "example-bucket-name",
169                         IndexPageSize:  1000,
170                         S3Replication:  2,
171                         RaceWindow:     arvados.Duration(24 * time.Hour),
172                         ConnectTimeout: arvados.Duration(time.Minute),
173                         ReadTimeout:    arvados.Duration(5 * time.Minute),
174                 },
175                 &S3Volume{
176                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
177                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
178                         Endpoint:       "https://storage.googleapis.com",
179                         Region:         "",
180                         Bucket:         "example-bucket-name",
181                         IndexPageSize:  1000,
182                         S3Replication:  2,
183                         RaceWindow:     arvados.Duration(24 * time.Hour),
184                         ConnectTimeout: arvados.Duration(time.Minute),
185                         ReadTimeout:    arvados.Duration(5 * time.Minute),
186                 },
187         }
188 }
189
190 // Type implements Volume.
191 func (*S3Volume) Type() string {
192         return "S3"
193 }
194
195 // Start populates private fields and verifies the configuration is
196 // valid.
197 func (v *S3Volume) Start() error {
198         region, ok := aws.Regions[v.Region]
199         if v.Endpoint == "" {
200                 if !ok {
201                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
202                 }
203         } else if ok {
204                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
205                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
206         } else {
207                 region = aws.Region{
208                         Name:                 v.Region,
209                         S3Endpoint:           v.Endpoint,
210                         S3LocationConstraint: v.LocationConstraint,
211                 }
212         }
213
214         var err error
215         var auth aws.Auth
216         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
217         if err != nil {
218                 return err
219         }
220         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
221         if err != nil {
222                 return err
223         }
224
225         // Zero timeouts mean "wait forever", which is a bad
226         // default. Default to long timeouts instead.
227         if v.ConnectTimeout == 0 {
228                 v.ConnectTimeout = s3DefaultConnectTimeout
229         }
230         if v.ReadTimeout == 0 {
231                 v.ReadTimeout = s3DefaultReadTimeout
232         }
233
234         client := s3.New(auth, region)
235         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
236         client.ReadTimeout = time.Duration(v.ReadTimeout)
237         v.bucket = &s3bucket{
238                 Bucket: &s3.Bucket{
239                         S3:   client,
240                         Name: v.Bucket,
241                 },
242         }
243         return nil
244 }
245
246 // DeviceID returns a globally unique ID for the storage bucket.
247 func (v *S3Volume) DeviceID() string {
248         return "s3://" + v.Endpoint + "/" + v.Bucket
249 }
250
251 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
252         ready := make(chan bool)
253         go func() {
254                 rdr, err = v.getReader(loc)
255                 close(ready)
256         }()
257         select {
258         case <-ready:
259                 return
260         case <-ctx.Done():
261                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
262                 go func() {
263                         <-ready
264                         if err == nil {
265                                 rdr.Close()
266                         }
267                 }()
268                 return nil, ctx.Err()
269         }
270 }
271
272 // getReader wraps (Bucket)GetReader.
273 //
274 // In situations where (Bucket)GetReader would fail because the block
275 // disappeared in a Trash race, getReader calls fixRace to recover the
276 // data, and tries again.
277 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
278         rdr, err = v.bucket.GetReader(loc)
279         err = v.translateError(err)
280         if err == nil || !os.IsNotExist(err) {
281                 return
282         }
283
284         _, err = v.bucket.Head("recent/"+loc, nil)
285         err = v.translateError(err)
286         if err != nil {
287                 // If we can't read recent/X, there's no point in
288                 // trying fixRace. Give up.
289                 return
290         }
291         if !v.fixRace(loc) {
292                 err = os.ErrNotExist
293                 return
294         }
295
296         rdr, err = v.bucket.GetReader(loc)
297         if err != nil {
298                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
299                 err = v.translateError(err)
300         }
301         return
302 }
303
304 // Get a block: copy the block data into buf, and return the number of
305 // bytes copied.
306 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
307         rdr, err := v.getReaderWithContext(ctx, loc)
308         if err != nil {
309                 return 0, err
310         }
311
312         var n int
313         ready := make(chan bool)
314         go func() {
315                 defer close(ready)
316
317                 defer rdr.Close()
318                 n, err = io.ReadFull(rdr, buf)
319
320                 switch err {
321                 case nil, io.EOF, io.ErrUnexpectedEOF:
322                         err = nil
323                 default:
324                         err = v.translateError(err)
325                 }
326         }()
327         select {
328         case <-ctx.Done():
329                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
330                 rdr.Close()
331                 // Must wait for ReadFull to return, to ensure it
332                 // doesn't write to buf after we return.
333                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
334                 <-ready
335                 return 0, ctx.Err()
336         case <-ready:
337                 return n, err
338         }
339 }
340
341 // Compare the given data with the stored data.
342 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
343         rdr, err := v.getReaderWithContext(ctx, loc)
344         if err != nil {
345                 return err
346         }
347         defer rdr.Close()
348         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
349 }
350
351 // Put writes a block.
352 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
353         if v.ReadOnly {
354                 return MethodDisabledError
355         }
356         var opts s3.Options
357         size := len(block)
358         if size > 0 {
359                 md5, err := hex.DecodeString(loc)
360                 if err != nil {
361                         return err
362                 }
363                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
364         }
365
366         // Send the block data through a pipe, so that (if we need to)
367         // we can close the pipe early and abandon our PutReader()
368         // goroutine, without worrying about PutReader() accessing our
369         // block buffer after we release it.
370         bufr, bufw := io.Pipe()
371         go func() {
372                 io.Copy(bufw, bytes.NewReader(block))
373                 bufw.Close()
374         }()
375
376         var err error
377         ready := make(chan bool)
378         go func() {
379                 defer func() {
380                         if ctx.Err() != nil {
381                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
382                         }
383                 }()
384                 defer close(ready)
385                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
386                 if err != nil {
387                         return
388                 }
389                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
390         }()
391         select {
392         case <-ctx.Done():
393                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
394                 // Our pipe might be stuck in Write(), waiting for
395                 // io.Copy() to read. If so, un-stick it. This means
396                 // PutReader will get corrupt data, but that's OK: the
397                 // size and MD5 won't match, so the write will fail.
398                 go io.Copy(ioutil.Discard, bufr)
399                 // CloseWithError() will return once pending I/O is done.
400                 bufw.CloseWithError(ctx.Err())
401                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
402                 return ctx.Err()
403         case <-ready:
404                 return v.translateError(err)
405         }
406 }
407
408 // Touch sets the timestamp for the given locator to the current time.
409 func (v *S3Volume) Touch(loc string) error {
410         if v.ReadOnly {
411                 return MethodDisabledError
412         }
413         _, err := v.bucket.Head(loc, nil)
414         err = v.translateError(err)
415         if os.IsNotExist(err) && v.fixRace(loc) {
416                 // The data object got trashed in a race, but fixRace
417                 // rescued it.
418         } else if err != nil {
419                 return err
420         }
421         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
422         return v.translateError(err)
423 }
424
425 // Mtime returns the stored timestamp for the given locator.
426 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
427         _, err := v.bucket.Head(loc, nil)
428         if err != nil {
429                 return zeroTime, v.translateError(err)
430         }
431         resp, err := v.bucket.Head("recent/"+loc, nil)
432         err = v.translateError(err)
433         if os.IsNotExist(err) {
434                 // The data object X exists, but recent/X is missing.
435                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
436                 if err != nil {
437                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
438                         return zeroTime, v.translateError(err)
439                 }
440                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
441                 resp, err = v.bucket.Head("recent/"+loc, nil)
442                 if err != nil {
443                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
444                         return zeroTime, v.translateError(err)
445                 }
446         } else if err != nil {
447                 // HEAD recent/X failed for some other reason.
448                 return zeroTime, err
449         }
450         return v.lastModified(resp)
451 }
452
453 // IndexTo writes a complete list of locators with the given prefix
454 // for which Get() can retrieve data.
455 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
456         // Use a merge sort to find matching sets of X and recent/X.
457         dataL := s3Lister{
458                 Bucket:   v.bucket.Bucket,
459                 Prefix:   prefix,
460                 PageSize: v.IndexPageSize,
461         }
462         recentL := s3Lister{
463                 Bucket:   v.bucket.Bucket,
464                 Prefix:   "recent/" + prefix,
465                 PageSize: v.IndexPageSize,
466         }
467         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
468         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
469         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
470                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
471                 if data.Key >= "g" {
472                         // Conveniently, "recent/*" and "trash/*" are
473                         // lexically greater than all hex-encoded data
474                         // hashes, so stopping here avoids iterating
475                         // over all of them needlessly with dataL.
476                         break
477                 }
478                 if !v.isKeepBlock(data.Key) {
479                         continue
480                 }
481
482                 // stamp is the list entry we should use to report the
483                 // last-modified time for this data block: it will be
484                 // the recent/X entry if one exists, otherwise the
485                 // entry for the data block itself.
486                 stamp := data
487
488                 // Advance to the corresponding recent/X marker, if any
489                 for recent != nil {
490                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
491                                 recent = recentL.Next()
492                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
493                                 continue
494                         } else if cmp == 0 {
495                                 stamp = recent
496                                 recent = recentL.Next()
497                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
498                                 break
499                         } else {
500                                 // recent/X marker is missing: we'll
501                                 // use the timestamp on the data
502                                 // object.
503                                 break
504                         }
505                 }
506                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
507                 if err != nil {
508                         return err
509                 }
510                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
511         }
512         return nil
513 }
514
515 // Trash a Keep block.
516 func (v *S3Volume) Trash(loc string) error {
517         if v.ReadOnly {
518                 return MethodDisabledError
519         }
520         if t, err := v.Mtime(loc); err != nil {
521                 return err
522         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
523                 return nil
524         }
525         if theConfig.TrashLifetime == 0 {
526                 if !s3UnsafeDelete {
527                         return ErrS3TrashDisabled
528                 }
529                 return v.translateError(v.bucket.Del(loc))
530         }
531         err := v.checkRaceWindow(loc)
532         if err != nil {
533                 return err
534         }
535         err = v.safeCopy("trash/"+loc, loc)
536         if err != nil {
537                 return err
538         }
539         return v.translateError(v.bucket.Del(loc))
540 }
541
542 // checkRaceWindow returns a non-nil error if trash/loc is, or might
543 // be, in the race window (i.e., it's not safe to trash loc).
544 func (v *S3Volume) checkRaceWindow(loc string) error {
545         resp, err := v.bucket.Head("trash/"+loc, nil)
546         err = v.translateError(err)
547         if os.IsNotExist(err) {
548                 // OK, trash/X doesn't exist so we're not in the race
549                 // window
550                 return nil
551         } else if err != nil {
552                 // Error looking up trash/X. We don't know whether
553                 // we're in the race window
554                 return err
555         }
556         t, err := v.lastModified(resp)
557         if err != nil {
558                 // Can't parse timestamp
559                 return err
560         }
561         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
562         if safeWindow <= 0 {
563                 // We can't count on "touch trash/X" to prolong
564                 // trash/X's lifetime. The new timestamp might not
565                 // become visible until now+raceWindow, and EmptyTrash
566                 // is allowed to delete trash/X before then.
567                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
568         }
569         // trash/X exists, but it won't be eligible for deletion until
570         // after now+raceWindow, so it's safe to overwrite it.
571         return nil
572 }
573
574 // safeCopy calls PutCopy, and checks the response to make sure the
575 // copy succeeded and updated the timestamp on the destination object
576 // (PutCopy returns 200 OK if the request was received, even if the
577 // copy failed).
578 func (v *S3Volume) safeCopy(dst, src string) error {
579         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
580                 ContentType:       "application/octet-stream",
581                 MetadataDirective: "REPLACE",
582         }, v.bucket.Name+"/"+src)
583         err = v.translateError(err)
584         if err != nil {
585                 return err
586         }
587         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
588                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
589         } else if time.Now().Sub(t) > maxClockSkew {
590                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
591         }
592         return nil
593 }
594
595 // Get the LastModified header from resp, and parse it as RFC1123 or
596 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
597 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
598         s := resp.Header.Get("Last-Modified")
599         t, err = time.Parse(time.RFC1123, s)
600         if err != nil && s != "" {
601                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
602                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
603                 // as required by HTTP spec. If it's not a valid HTTP
604                 // header value, it's probably AWS (or s3test) giving
605                 // us a nearly-RFC1123 timestamp.
606                 t, err = time.Parse(nearlyRFC1123, s)
607         }
608         return
609 }
610
611 // Untrash moves block from trash back into store
612 func (v *S3Volume) Untrash(loc string) error {
613         err := v.safeCopy(loc, "trash/"+loc)
614         if err != nil {
615                 return err
616         }
617         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
618         return v.translateError(err)
619 }
620
621 // Status returns a *VolumeStatus representing the current in-use
622 // storage capacity and a fake available capacity that doesn't make
623 // the volume seem full or nearly-full.
624 func (v *S3Volume) Status() *VolumeStatus {
625         return &VolumeStatus{
626                 DeviceNum: 1,
627                 BytesFree: BlockSize * 1000,
628                 BytesUsed: 1,
629         }
630 }
631
632 // InternalStats returns bucket I/O and API call counters.
633 func (v *S3Volume) InternalStats() interface{} {
634         return &v.bucket.stats
635 }
636
637 // String implements fmt.Stringer.
638 func (v *S3Volume) String() string {
639         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
640 }
641
642 // Writable returns false if all future Put, Mtime, and Delete calls
643 // are expected to fail.
644 func (v *S3Volume) Writable() bool {
645         return !v.ReadOnly
646 }
647
648 // Replication returns the storage redundancy of the underlying
649 // device. Configured via command line flag.
650 func (v *S3Volume) Replication() int {
651         return v.S3Replication
652 }
653
654 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
655
656 func (v *S3Volume) isKeepBlock(s string) bool {
657         return s3KeepBlockRegexp.MatchString(s)
658 }
659
660 // fixRace(X) is called when "recent/X" exists but "X" doesn't
661 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
662 // there was a race between Put and Trash, fixRace recovers from the
663 // race by Untrashing the block.
664 func (v *S3Volume) fixRace(loc string) bool {
665         trash, err := v.bucket.Head("trash/"+loc, nil)
666         if err != nil {
667                 if !os.IsNotExist(v.translateError(err)) {
668                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
669                 }
670                 return false
671         }
672         trashTime, err := v.lastModified(trash)
673         if err != nil {
674                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
675                 return false
676         }
677
678         recent, err := v.bucket.Head("recent/"+loc, nil)
679         if err != nil {
680                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
681                 return false
682         }
683         recentTime, err := v.lastModified(recent)
684         if err != nil {
685                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
686                 return false
687         }
688
689         ageWhenTrashed := trashTime.Sub(recentTime)
690         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
691                 // No evidence of a race: block hasn't been written
692                 // since it became eligible for Trash. No fix needed.
693                 return false
694         }
695
696         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
697         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
698         err = v.safeCopy(loc, "trash/"+loc)
699         if err != nil {
700                 log.Printf("error: fixRace: %s", err)
701                 return false
702         }
703         return true
704 }
705
706 func (v *S3Volume) translateError(err error) error {
707         switch err := err.(type) {
708         case *s3.Error:
709                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
710                         strings.Contains(err.Error(), "Not Found") {
711                         return os.ErrNotExist
712                 }
713                 // Other 404 errors like NoSuchVersion and
714                 // NoSuchBucket are different problems which should
715                 // get called out downstream, so we don't convert them
716                 // to os.ErrNotExist.
717         }
718         return err
719 }
720
721 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
722 // and deletes them from the volume.
723 func (v *S3Volume) EmptyTrash() {
724         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
725
726         // Use a merge sort to find matching sets of trash/X and recent/X.
727         trashL := s3Lister{
728                 Bucket:   v.bucket.Bucket,
729                 Prefix:   "trash/",
730                 PageSize: v.IndexPageSize,
731         }
732         // Define "ready to delete" as "...when EmptyTrash started".
733         startT := time.Now()
734         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
735                 loc := trash.Key[6:]
736                 if !v.isKeepBlock(loc) {
737                         continue
738                 }
739                 bytesInTrash += trash.Size
740                 blocksInTrash++
741
742                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
743                 if err != nil {
744                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
745                         continue
746                 }
747                 recent, err := v.bucket.Head("recent/"+loc, nil)
748                 if err != nil && os.IsNotExist(v.translateError(err)) {
749                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
750                         err = v.Untrash(loc)
751                         if err != nil {
752                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
753                         }
754                         continue
755                 } else if err != nil {
756                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
757                         continue
758                 }
759                 recentT, err := v.lastModified(recent)
760                 if err != nil {
761                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
762                         continue
763                 }
764                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
765                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
766                                 // recent/loc is too old to protect
767                                 // loc from being Trashed again during
768                                 // the raceWindow that starts if we
769                                 // delete trash/X now.
770                                 //
771                                 // Note this means (TrashCheckInterval
772                                 // < BlobSignatureTTL - raceWindow) is
773                                 // necessary to avoid starvation.
774                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
775                                 v.fixRace(loc)
776                                 v.Touch(loc)
777                                 continue
778                         }
779                         _, err := v.bucket.Head(loc, nil)
780                         if os.IsNotExist(err) {
781                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
782                                 v.fixRace(loc)
783                                 continue
784                         } else if err != nil {
785                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
786                                 continue
787                         }
788                 }
789                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
790                         continue
791                 }
792                 err = v.bucket.Del(trash.Key)
793                 if err != nil {
794                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
795                         continue
796                 }
797                 bytesDeleted += trash.Size
798                 blocksDeleted++
799
800                 _, err = v.bucket.Head(loc, nil)
801                 if os.IsNotExist(err) {
802                         err = v.bucket.Del("recent/" + loc)
803                         if err != nil {
804                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
805                         }
806                 } else if err != nil {
807                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
808                 }
809         }
810         if err := trashL.Error(); err != nil {
811                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
812         }
813         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
814 }
815
816 type s3Lister struct {
817         Bucket     *s3.Bucket
818         Prefix     string
819         PageSize   int
820         nextMarker string
821         buf        []s3.Key
822         err        error
823 }
824
825 // First fetches the first page and returns the first item. It returns
826 // nil if the response is the empty set or an error occurs.
827 func (lister *s3Lister) First() *s3.Key {
828         lister.getPage()
829         return lister.pop()
830 }
831
832 // Next returns the next item, fetching the next page if necessary. It
833 // returns nil if the last available item has already been fetched, or
834 // an error occurs.
835 func (lister *s3Lister) Next() *s3.Key {
836         if len(lister.buf) == 0 && lister.nextMarker != "" {
837                 lister.getPage()
838         }
839         return lister.pop()
840 }
841
842 // Return the most recent error encountered by First or Next.
843 func (lister *s3Lister) Error() error {
844         return lister.err
845 }
846
847 func (lister *s3Lister) getPage() {
848         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
849         lister.nextMarker = ""
850         if err != nil {
851                 lister.err = err
852                 return
853         }
854         if resp.IsTruncated {
855                 lister.nextMarker = resp.NextMarker
856         }
857         lister.buf = make([]s3.Key, 0, len(resp.Contents))
858         for _, key := range resp.Contents {
859                 if !strings.HasPrefix(key.Key, lister.Prefix) {
860                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
861                         continue
862                 }
863                 lister.buf = append(lister.buf, key)
864         }
865 }
866
867 func (lister *s3Lister) pop() (k *s3.Key) {
868         if len(lister.buf) > 0 {
869                 k = &lister.buf[0]
870                 lister.buf = lister.buf[1:]
871         }
872         return
873 }
874
875 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
876 type s3bucket struct {
877         *s3.Bucket
878         stats s3bucketStats
879 }
880
881 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
882         rdr, err := b.Bucket.GetReader(path)
883         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
884         b.stats.TickErr(err)
885         return NewCountingReader(rdr, b.stats.TickInBytes), err
886 }
887
888 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
889         resp, err := b.Bucket.Head(path, headers)
890         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
891         b.stats.TickErr(err)
892         return resp, err
893 }
894
895 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
896         err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.TickOutBytes), length, contType, perm, options)
897         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
898         b.stats.TickErr(err)
899         return err
900 }
901
902 func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
903         err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.TickOutBytes), int64(len(data)), contType, perm, options)
904         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
905         b.stats.TickErr(err)
906         return err
907 }
908
909 func (b *s3bucket) Del(path string) error {
910         err := b.Bucket.Del(path)
911         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
912         b.stats.TickErr(err)
913         return err
914 }
915
916 type s3bucketStats struct {
917         statsTicker
918         Ops     uint64
919         GetOps  uint64
920         PutOps  uint64
921         HeadOps uint64
922         DelOps  uint64
923         ListOps uint64
924 }
925
926 func (s *s3bucketStats) TickErr(err error) {
927         if err == nil {
928                 return
929         }
930         errType := fmt.Sprintf("%T", err)
931         if err, ok := err.(*s3.Error); ok {
932                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
933         }
934         s.statsTicker.TickErr(err, errType)
935 }