8460: Rename pg -> event_source.go
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/base64"
7         "encoding/hex"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         "os"
14         "regexp"
15         "strings"
16         "sync"
17         "sync/atomic"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/AdRoll/goamz/aws"
22         "github.com/AdRoll/goamz/s3"
23         log "github.com/Sirupsen/logrus"
24 )
25
26 const (
27         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
28         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
29 )
30
31 var (
32         // ErrS3TrashDisabled is returned by Trash if that operation
33         // is impossible with the current config.
34         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
35
36         s3AccessKeyFile string
37         s3SecretKeyFile string
38         s3RegionName    string
39         s3Endpoint      string
40         s3Replication   int
41         s3UnsafeDelete  bool
42         s3RaceWindow    time.Duration
43
44         s3ACL = s3.Private
45 )
46
47 const (
48         maxClockSkew  = 600 * time.Second
49         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
50 )
51
52 type s3VolumeAdder struct {
53         *Config
54 }
55
56 // String implements flag.Value
57 func (s *s3VolumeAdder) String() string {
58         return "-"
59 }
60
61 func (s *s3VolumeAdder) Set(bucketName string) error {
62         if bucketName == "" {
63                 return fmt.Errorf("no container name given")
64         }
65         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
66                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
67         }
68         if deprecated.flagSerializeIO {
69                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
70         }
71         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
72                 Bucket:        bucketName,
73                 AccessKeyFile: s3AccessKeyFile,
74                 SecretKeyFile: s3SecretKeyFile,
75                 Endpoint:      s3Endpoint,
76                 Region:        s3RegionName,
77                 RaceWindow:    arvados.Duration(s3RaceWindow),
78                 S3Replication: s3Replication,
79                 UnsafeDelete:  s3UnsafeDelete,
80                 ReadOnly:      deprecated.flagReadonly,
81                 IndexPageSize: 1000,
82         })
83         return nil
84 }
85
86 func s3regions() (okList []string) {
87         for r := range aws.Regions {
88                 okList = append(okList, r)
89         }
90         return
91 }
92
93 func init() {
94         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
95
96         flag.Var(&s3VolumeAdder{theConfig},
97                 "s3-bucket-volume",
98                 "Use the given bucket as a storage volume. Can be given multiple times.")
99         flag.StringVar(
100                 &s3RegionName,
101                 "s3-region",
102                 "",
103                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
104         flag.StringVar(
105                 &s3Endpoint,
106                 "s3-endpoint",
107                 "",
108                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
109         flag.StringVar(
110                 &s3AccessKeyFile,
111                 "s3-access-key-file",
112                 "",
113                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
114         flag.StringVar(
115                 &s3SecretKeyFile,
116                 "s3-secret-key-file",
117                 "",
118                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
119         flag.DurationVar(
120                 &s3RaceWindow,
121                 "s3-race-window",
122                 24*time.Hour,
123                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
124         flag.IntVar(
125                 &s3Replication,
126                 "s3-replication",
127                 2,
128                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
129         flag.BoolVar(
130                 &s3UnsafeDelete,
131                 "s3-unsafe-delete",
132                 false,
133                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
134 }
135
136 // S3Volume implements Volume using an S3 bucket.
137 type S3Volume struct {
138         AccessKeyFile      string
139         SecretKeyFile      string
140         Endpoint           string
141         Region             string
142         Bucket             string
143         LocationConstraint bool
144         IndexPageSize      int
145         S3Replication      int
146         ConnectTimeout     arvados.Duration
147         ReadTimeout        arvados.Duration
148         RaceWindow         arvados.Duration
149         ReadOnly           bool
150         UnsafeDelete       bool
151
152         bucket *s3bucket
153
154         startOnce sync.Once
155 }
156
157 // Examples implements VolumeWithExamples.
158 func (*S3Volume) Examples() []Volume {
159         return []Volume{
160                 &S3Volume{
161                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
162                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
163                         Endpoint:       "",
164                         Region:         "us-east-1",
165                         Bucket:         "example-bucket-name",
166                         IndexPageSize:  1000,
167                         S3Replication:  2,
168                         RaceWindow:     arvados.Duration(24 * time.Hour),
169                         ConnectTimeout: arvados.Duration(time.Minute),
170                         ReadTimeout:    arvados.Duration(5 * time.Minute),
171                 },
172                 &S3Volume{
173                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
174                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
175                         Endpoint:       "https://storage.googleapis.com",
176                         Region:         "",
177                         Bucket:         "example-bucket-name",
178                         IndexPageSize:  1000,
179                         S3Replication:  2,
180                         RaceWindow:     arvados.Duration(24 * time.Hour),
181                         ConnectTimeout: arvados.Duration(time.Minute),
182                         ReadTimeout:    arvados.Duration(5 * time.Minute),
183                 },
184         }
185 }
186
187 // Type implements Volume.
188 func (*S3Volume) Type() string {
189         return "S3"
190 }
191
192 // Start populates private fields and verifies the configuration is
193 // valid.
194 func (v *S3Volume) Start() error {
195         region, ok := aws.Regions[v.Region]
196         if v.Endpoint == "" {
197                 if !ok {
198                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
199                 }
200         } else if ok {
201                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
202                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
203         } else {
204                 region = aws.Region{
205                         Name:                 v.Region,
206                         S3Endpoint:           v.Endpoint,
207                         S3LocationConstraint: v.LocationConstraint,
208                 }
209         }
210
211         var err error
212         var auth aws.Auth
213         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
214         if err != nil {
215                 return err
216         }
217         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
218         if err != nil {
219                 return err
220         }
221
222         // Zero timeouts mean "wait forever", which is a bad
223         // default. Default to long timeouts instead.
224         if v.ConnectTimeout == 0 {
225                 v.ConnectTimeout = s3DefaultConnectTimeout
226         }
227         if v.ReadTimeout == 0 {
228                 v.ReadTimeout = s3DefaultReadTimeout
229         }
230
231         client := s3.New(auth, region)
232         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
233         client.ReadTimeout = time.Duration(v.ReadTimeout)
234         v.bucket = &s3bucket{
235                 Bucket: &s3.Bucket{
236                         S3:   client,
237                         Name: v.Bucket,
238                 },
239         }
240         return nil
241 }
242
243 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
244         ready := make(chan bool)
245         go func() {
246                 rdr, err = v.getReader(loc)
247                 close(ready)
248         }()
249         select {
250         case <-ready:
251                 return
252         case <-ctx.Done():
253                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
254                 go func() {
255                         <-ready
256                         if err == nil {
257                                 rdr.Close()
258                         }
259                 }()
260                 return nil, ctx.Err()
261         }
262 }
263
264 // getReader wraps (Bucket)GetReader.
265 //
266 // In situations where (Bucket)GetReader would fail because the block
267 // disappeared in a Trash race, getReader calls fixRace to recover the
268 // data, and tries again.
269 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
270         rdr, err = v.bucket.GetReader(loc)
271         err = v.translateError(err)
272         if err == nil || !os.IsNotExist(err) {
273                 return
274         }
275
276         _, err = v.bucket.Head("recent/"+loc, nil)
277         err = v.translateError(err)
278         if err != nil {
279                 // If we can't read recent/X, there's no point in
280                 // trying fixRace. Give up.
281                 return
282         }
283         if !v.fixRace(loc) {
284                 err = os.ErrNotExist
285                 return
286         }
287
288         rdr, err = v.bucket.GetReader(loc)
289         if err != nil {
290                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
291                 err = v.translateError(err)
292         }
293         return
294 }
295
296 // Get a block: copy the block data into buf, and return the number of
297 // bytes copied.
298 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
299         rdr, err := v.getReaderWithContext(ctx, loc)
300         if err != nil {
301                 return 0, err
302         }
303
304         var n int
305         ready := make(chan bool)
306         go func() {
307                 defer close(ready)
308
309                 defer rdr.Close()
310                 n, err = io.ReadFull(rdr, buf)
311
312                 switch err {
313                 case nil, io.EOF, io.ErrUnexpectedEOF:
314                         err = nil
315                 default:
316                         err = v.translateError(err)
317                 }
318         }()
319         select {
320         case <-ctx.Done():
321                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
322                 rdr.Close()
323                 // Must wait for ReadFull to return, to ensure it
324                 // doesn't write to buf after we return.
325                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
326                 <-ready
327                 return 0, ctx.Err()
328         case <-ready:
329                 return n, err
330         }
331 }
332
333 // Compare the given data with the stored data.
334 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
335         rdr, err := v.getReaderWithContext(ctx, loc)
336         if err != nil {
337                 return err
338         }
339         defer rdr.Close()
340         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
341 }
342
343 // Put writes a block.
344 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
345         if v.ReadOnly {
346                 return MethodDisabledError
347         }
348         var opts s3.Options
349         size := len(block)
350         if size > 0 {
351                 md5, err := hex.DecodeString(loc)
352                 if err != nil {
353                         return err
354                 }
355                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
356         }
357
358         // Send the block data through a pipe, so that (if we need to)
359         // we can close the pipe early and abandon our PutReader()
360         // goroutine, without worrying about PutReader() accessing our
361         // block buffer after we release it.
362         bufr, bufw := io.Pipe()
363         go func() {
364                 io.Copy(bufw, bytes.NewReader(block))
365                 bufw.Close()
366         }()
367
368         var err error
369         ready := make(chan bool)
370         go func() {
371                 defer func() {
372                         if ctx.Err() != nil {
373                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
374                         }
375                 }()
376                 defer close(ready)
377                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
378                 if err != nil {
379                         return
380                 }
381                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
382         }()
383         select {
384         case <-ctx.Done():
385                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
386                 // Our pipe might be stuck in Write(), waiting for
387                 // io.Copy() to read. If so, un-stick it. This means
388                 // PutReader will get corrupt data, but that's OK: the
389                 // size and MD5 won't match, so the write will fail.
390                 go io.Copy(ioutil.Discard, bufr)
391                 // CloseWithError() will return once pending I/O is done.
392                 bufw.CloseWithError(ctx.Err())
393                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
394                 return ctx.Err()
395         case <-ready:
396                 return v.translateError(err)
397         }
398 }
399
400 // Touch sets the timestamp for the given locator to the current time.
401 func (v *S3Volume) Touch(loc string) error {
402         if v.ReadOnly {
403                 return MethodDisabledError
404         }
405         _, err := v.bucket.Head(loc, nil)
406         err = v.translateError(err)
407         if os.IsNotExist(err) && v.fixRace(loc) {
408                 // The data object got trashed in a race, but fixRace
409                 // rescued it.
410         } else if err != nil {
411                 return err
412         }
413         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
414         return v.translateError(err)
415 }
416
417 // Mtime returns the stored timestamp for the given locator.
418 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
419         _, err := v.bucket.Head(loc, nil)
420         if err != nil {
421                 return zeroTime, v.translateError(err)
422         }
423         resp, err := v.bucket.Head("recent/"+loc, nil)
424         err = v.translateError(err)
425         if os.IsNotExist(err) {
426                 // The data object X exists, but recent/X is missing.
427                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
428                 if err != nil {
429                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
430                         return zeroTime, v.translateError(err)
431                 }
432                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
433                 resp, err = v.bucket.Head("recent/"+loc, nil)
434                 if err != nil {
435                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
436                         return zeroTime, v.translateError(err)
437                 }
438         } else if err != nil {
439                 // HEAD recent/X failed for some other reason.
440                 return zeroTime, err
441         }
442         return v.lastModified(resp)
443 }
444
445 // IndexTo writes a complete list of locators with the given prefix
446 // for which Get() can retrieve data.
447 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
448         // Use a merge sort to find matching sets of X and recent/X.
449         dataL := s3Lister{
450                 Bucket:   v.bucket.Bucket,
451                 Prefix:   prefix,
452                 PageSize: v.IndexPageSize,
453         }
454         recentL := s3Lister{
455                 Bucket:   v.bucket.Bucket,
456                 Prefix:   "recent/" + prefix,
457                 PageSize: v.IndexPageSize,
458         }
459         v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
460         v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
461         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
462                 v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
463                 if data.Key >= "g" {
464                         // Conveniently, "recent/*" and "trash/*" are
465                         // lexically greater than all hex-encoded data
466                         // hashes, so stopping here avoids iterating
467                         // over all of them needlessly with dataL.
468                         break
469                 }
470                 if !v.isKeepBlock(data.Key) {
471                         continue
472                 }
473
474                 // stamp is the list entry we should use to report the
475                 // last-modified time for this data block: it will be
476                 // the recent/X entry if one exists, otherwise the
477                 // entry for the data block itself.
478                 stamp := data
479
480                 // Advance to the corresponding recent/X marker, if any
481                 for recent != nil {
482                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
483                                 recent = recentL.Next()
484                                 v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
485                                 continue
486                         } else if cmp == 0 {
487                                 stamp = recent
488                                 recent = recentL.Next()
489                                 v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
490                                 break
491                         } else {
492                                 // recent/X marker is missing: we'll
493                                 // use the timestamp on the data
494                                 // object.
495                                 break
496                         }
497                 }
498                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
499                 if err != nil {
500                         return err
501                 }
502                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
503         }
504         return nil
505 }
506
507 // Trash a Keep block.
508 func (v *S3Volume) Trash(loc string) error {
509         if v.ReadOnly {
510                 return MethodDisabledError
511         }
512         if t, err := v.Mtime(loc); err != nil {
513                 return err
514         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
515                 return nil
516         }
517         if theConfig.TrashLifetime == 0 {
518                 if !s3UnsafeDelete {
519                         return ErrS3TrashDisabled
520                 }
521                 return v.translateError(v.bucket.Del(loc))
522         }
523         err := v.checkRaceWindow(loc)
524         if err != nil {
525                 return err
526         }
527         err = v.safeCopy("trash/"+loc, loc)
528         if err != nil {
529                 return err
530         }
531         return v.translateError(v.bucket.Del(loc))
532 }
533
534 // checkRaceWindow returns a non-nil error if trash/loc is, or might
535 // be, in the race window (i.e., it's not safe to trash loc).
536 func (v *S3Volume) checkRaceWindow(loc string) error {
537         resp, err := v.bucket.Head("trash/"+loc, nil)
538         err = v.translateError(err)
539         if os.IsNotExist(err) {
540                 // OK, trash/X doesn't exist so we're not in the race
541                 // window
542                 return nil
543         } else if err != nil {
544                 // Error looking up trash/X. We don't know whether
545                 // we're in the race window
546                 return err
547         }
548         t, err := v.lastModified(resp)
549         if err != nil {
550                 // Can't parse timestamp
551                 return err
552         }
553         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
554         if safeWindow <= 0 {
555                 // We can't count on "touch trash/X" to prolong
556                 // trash/X's lifetime. The new timestamp might not
557                 // become visible until now+raceWindow, and EmptyTrash
558                 // is allowed to delete trash/X before then.
559                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
560         }
561         // trash/X exists, but it won't be eligible for deletion until
562         // after now+raceWindow, so it's safe to overwrite it.
563         return nil
564 }
565
566 // safeCopy calls PutCopy, and checks the response to make sure the
567 // copy succeeded and updated the timestamp on the destination object
568 // (PutCopy returns 200 OK if the request was received, even if the
569 // copy failed).
570 func (v *S3Volume) safeCopy(dst, src string) error {
571         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
572                 ContentType:       "application/octet-stream",
573                 MetadataDirective: "REPLACE",
574         }, v.bucket.Name+"/"+src)
575         err = v.translateError(err)
576         if err != nil {
577                 return err
578         }
579         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
580                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
581         } else if time.Now().Sub(t) > maxClockSkew {
582                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
583         }
584         return nil
585 }
586
587 // Get the LastModified header from resp, and parse it as RFC1123 or
588 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
589 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
590         s := resp.Header.Get("Last-Modified")
591         t, err = time.Parse(time.RFC1123, s)
592         if err != nil && s != "" {
593                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
594                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
595                 // as required by HTTP spec. If it's not a valid HTTP
596                 // header value, it's probably AWS (or s3test) giving
597                 // us a nearly-RFC1123 timestamp.
598                 t, err = time.Parse(nearlyRFC1123, s)
599         }
600         return
601 }
602
603 // Untrash moves block from trash back into store
604 func (v *S3Volume) Untrash(loc string) error {
605         err := v.safeCopy(loc, "trash/"+loc)
606         if err != nil {
607                 return err
608         }
609         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
610         return v.translateError(err)
611 }
612
613 // Status returns a *VolumeStatus representing the current in-use
614 // storage capacity and a fake available capacity that doesn't make
615 // the volume seem full or nearly-full.
616 func (v *S3Volume) Status() *VolumeStatus {
617         return &VolumeStatus{
618                 DeviceNum: 1,
619                 BytesFree: BlockSize * 1000,
620                 BytesUsed: 1,
621         }
622 }
623
624 // InternalStats returns bucket I/O and API call counters.
625 func (v *S3Volume) InternalStats() interface{} {
626         return &v.bucket.stats
627 }
628
629 // String implements fmt.Stringer.
630 func (v *S3Volume) String() string {
631         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
632 }
633
634 // Writable returns false if all future Put, Mtime, and Delete calls
635 // are expected to fail.
636 func (v *S3Volume) Writable() bool {
637         return !v.ReadOnly
638 }
639
640 // Replication returns the storage redundancy of the underlying
641 // device. Configured via command line flag.
642 func (v *S3Volume) Replication() int {
643         return v.S3Replication
644 }
645
646 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
647
648 func (v *S3Volume) isKeepBlock(s string) bool {
649         return s3KeepBlockRegexp.MatchString(s)
650 }
651
652 // fixRace(X) is called when "recent/X" exists but "X" doesn't
653 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
654 // there was a race between Put and Trash, fixRace recovers from the
655 // race by Untrashing the block.
656 func (v *S3Volume) fixRace(loc string) bool {
657         trash, err := v.bucket.Head("trash/"+loc, nil)
658         if err != nil {
659                 if !os.IsNotExist(v.translateError(err)) {
660                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
661                 }
662                 return false
663         }
664         trashTime, err := v.lastModified(trash)
665         if err != nil {
666                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
667                 return false
668         }
669
670         recent, err := v.bucket.Head("recent/"+loc, nil)
671         if err != nil {
672                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
673                 return false
674         }
675         recentTime, err := v.lastModified(recent)
676         if err != nil {
677                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
678                 return false
679         }
680
681         ageWhenTrashed := trashTime.Sub(recentTime)
682         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
683                 // No evidence of a race: block hasn't been written
684                 // since it became eligible for Trash. No fix needed.
685                 return false
686         }
687
688         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
689         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
690         err = v.safeCopy(loc, "trash/"+loc)
691         if err != nil {
692                 log.Printf("error: fixRace: %s", err)
693                 return false
694         }
695         return true
696 }
697
698 func (v *S3Volume) translateError(err error) error {
699         switch err := err.(type) {
700         case *s3.Error:
701                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
702                         strings.Contains(err.Error(), "Not Found") {
703                         return os.ErrNotExist
704                 }
705                 // Other 404 errors like NoSuchVersion and
706                 // NoSuchBucket are different problems which should
707                 // get called out downstream, so we don't convert them
708                 // to os.ErrNotExist.
709         }
710         return err
711 }
712
713 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
714 // and deletes them from the volume.
715 func (v *S3Volume) EmptyTrash() {
716         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
717
718         // Use a merge sort to find matching sets of trash/X and recent/X.
719         trashL := s3Lister{
720                 Bucket:   v.bucket.Bucket,
721                 Prefix:   "trash/",
722                 PageSize: v.IndexPageSize,
723         }
724         // Define "ready to delete" as "...when EmptyTrash started".
725         startT := time.Now()
726         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
727                 loc := trash.Key[6:]
728                 if !v.isKeepBlock(loc) {
729                         continue
730                 }
731                 bytesInTrash += trash.Size
732                 blocksInTrash++
733
734                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
735                 if err != nil {
736                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
737                         continue
738                 }
739                 recent, err := v.bucket.Head("recent/"+loc, nil)
740                 if err != nil && os.IsNotExist(v.translateError(err)) {
741                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
742                         err = v.Untrash(loc)
743                         if err != nil {
744                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
745                         }
746                         continue
747                 } else if err != nil {
748                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
749                         continue
750                 }
751                 recentT, err := v.lastModified(recent)
752                 if err != nil {
753                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
754                         continue
755                 }
756                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
757                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
758                                 // recent/loc is too old to protect
759                                 // loc from being Trashed again during
760                                 // the raceWindow that starts if we
761                                 // delete trash/X now.
762                                 //
763                                 // Note this means (TrashCheckInterval
764                                 // < BlobSignatureTTL - raceWindow) is
765                                 // necessary to avoid starvation.
766                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
767                                 v.fixRace(loc)
768                                 v.Touch(loc)
769                                 continue
770                         }
771                         _, err := v.bucket.Head(loc, nil)
772                         if os.IsNotExist(err) {
773                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
774                                 v.fixRace(loc)
775                                 continue
776                         } else if err != nil {
777                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
778                                 continue
779                         }
780                 }
781                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
782                         continue
783                 }
784                 err = v.bucket.Del(trash.Key)
785                 if err != nil {
786                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
787                         continue
788                 }
789                 bytesDeleted += trash.Size
790                 blocksDeleted++
791
792                 _, err = v.bucket.Head(loc, nil)
793                 if os.IsNotExist(err) {
794                         err = v.bucket.Del("recent/" + loc)
795                         if err != nil {
796                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
797                         }
798                 } else if err != nil {
799                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
800                 }
801         }
802         if err := trashL.Error(); err != nil {
803                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
804         }
805         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
806 }
807
808 type s3Lister struct {
809         Bucket     *s3.Bucket
810         Prefix     string
811         PageSize   int
812         nextMarker string
813         buf        []s3.Key
814         err        error
815 }
816
817 // First fetches the first page and returns the first item. It returns
818 // nil if the response is the empty set or an error occurs.
819 func (lister *s3Lister) First() *s3.Key {
820         lister.getPage()
821         return lister.pop()
822 }
823
824 // Next returns the next item, fetching the next page if necessary. It
825 // returns nil if the last available item has already been fetched, or
826 // an error occurs.
827 func (lister *s3Lister) Next() *s3.Key {
828         if len(lister.buf) == 0 && lister.nextMarker != "" {
829                 lister.getPage()
830         }
831         return lister.pop()
832 }
833
834 // Return the most recent error encountered by First or Next.
835 func (lister *s3Lister) Error() error {
836         return lister.err
837 }
838
839 func (lister *s3Lister) getPage() {
840         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
841         lister.nextMarker = ""
842         if err != nil {
843                 lister.err = err
844                 return
845         }
846         if resp.IsTruncated {
847                 lister.nextMarker = resp.NextMarker
848         }
849         lister.buf = make([]s3.Key, 0, len(resp.Contents))
850         for _, key := range resp.Contents {
851                 if !strings.HasPrefix(key.Key, lister.Prefix) {
852                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
853                         continue
854                 }
855                 lister.buf = append(lister.buf, key)
856         }
857 }
858
859 func (lister *s3Lister) pop() (k *s3.Key) {
860         if len(lister.buf) > 0 {
861                 k = &lister.buf[0]
862                 lister.buf = lister.buf[1:]
863         }
864         return
865 }
866
867 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
868 type s3bucket struct {
869         *s3.Bucket
870         stats s3bucketStats
871 }
872
873 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
874         rdr, err := b.Bucket.GetReader(path)
875         b.stats.tick(&b.stats.Ops, &b.stats.GetOps)
876         b.stats.tickErr(err)
877         return NewCountingReader(rdr, b.stats.tickInBytes), err
878 }
879
880 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
881         resp, err := b.Bucket.Head(path, headers)
882         b.stats.tick(&b.stats.Ops, &b.stats.HeadOps)
883         b.stats.tickErr(err)
884         return resp, err
885 }
886
887 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
888         err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.tickOutBytes), length, contType, perm, options)
889         b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
890         b.stats.tickErr(err)
891         return err
892 }
893
894 func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
895         err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.tickOutBytes), int64(len(data)), contType, perm, options)
896         b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
897         b.stats.tickErr(err)
898         return err
899 }
900
901 func (b *s3bucket) Del(path string) error {
902         err := b.Bucket.Del(path)
903         b.stats.tick(&b.stats.Ops, &b.stats.DelOps)
904         b.stats.tickErr(err)
905         return err
906 }
907
908 type s3bucketStats struct {
909         Errors   uint64
910         Ops      uint64
911         GetOps   uint64
912         PutOps   uint64
913         HeadOps  uint64
914         DelOps   uint64
915         ListOps  uint64
916         InBytes  uint64
917         OutBytes uint64
918
919         ErrorCodes map[string]uint64 `json:",omitempty"`
920
921         lock sync.Mutex
922 }
923
924 func (s *s3bucketStats) tickInBytes(n uint64) {
925         atomic.AddUint64(&s.InBytes, n)
926 }
927
928 func (s *s3bucketStats) tickOutBytes(n uint64) {
929         atomic.AddUint64(&s.OutBytes, n)
930 }
931
932 func (s *s3bucketStats) tick(counters ...*uint64) {
933         for _, counter := range counters {
934                 atomic.AddUint64(counter, 1)
935         }
936 }
937
938 func (s *s3bucketStats) tickErr(err error) {
939         if err == nil {
940                 return
941         }
942         atomic.AddUint64(&s.Errors, 1)
943         errStr := fmt.Sprintf("%T", err)
944         if err, ok := err.(*s3.Error); ok {
945                 errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
946         }
947         s.lock.Lock()
948         if s.ErrorCodes == nil {
949                 s.ErrorCodes = make(map[string]uint64)
950         }
951         s.ErrorCodes[errStr]++
952         s.lock.Unlock()
953 }