Merge branch '11644-mounts-api'
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/base64"
7         "encoding/hex"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         "os"
14         "regexp"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/AdRoll/goamz/aws"
21         "github.com/AdRoll/goamz/s3"
22         log "github.com/Sirupsen/logrus"
23 )
24
25 const (
26         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
27         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
28 )
29
30 var (
31         // ErrS3TrashDisabled is returned by Trash if that operation
32         // is impossible with the current config.
33         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
34
35         s3AccessKeyFile string
36         s3SecretKeyFile string
37         s3RegionName    string
38         s3Endpoint      string
39         s3Replication   int
40         s3UnsafeDelete  bool
41         s3RaceWindow    time.Duration
42
43         s3ACL = s3.Private
44 )
45
46 const (
47         maxClockSkew  = 600 * time.Second
48         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
49 )
50
51 type s3VolumeAdder struct {
52         *Config
53 }
54
55 // String implements flag.Value
56 func (s *s3VolumeAdder) String() string {
57         return "-"
58 }
59
60 func (s *s3VolumeAdder) Set(bucketName string) error {
61         if bucketName == "" {
62                 return fmt.Errorf("no container name given")
63         }
64         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
65                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
66         }
67         if deprecated.flagSerializeIO {
68                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
69         }
70         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
71                 Bucket:        bucketName,
72                 AccessKeyFile: s3AccessKeyFile,
73                 SecretKeyFile: s3SecretKeyFile,
74                 Endpoint:      s3Endpoint,
75                 Region:        s3RegionName,
76                 RaceWindow:    arvados.Duration(s3RaceWindow),
77                 S3Replication: s3Replication,
78                 UnsafeDelete:  s3UnsafeDelete,
79                 ReadOnly:      deprecated.flagReadonly,
80                 IndexPageSize: 1000,
81         })
82         return nil
83 }
84
85 func s3regions() (okList []string) {
86         for r := range aws.Regions {
87                 okList = append(okList, r)
88         }
89         return
90 }
91
92 func init() {
93         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
94
95         flag.Var(&s3VolumeAdder{theConfig},
96                 "s3-bucket-volume",
97                 "Use the given bucket as a storage volume. Can be given multiple times.")
98         flag.StringVar(
99                 &s3RegionName,
100                 "s3-region",
101                 "",
102                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
103         flag.StringVar(
104                 &s3Endpoint,
105                 "s3-endpoint",
106                 "",
107                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
108         flag.StringVar(
109                 &s3AccessKeyFile,
110                 "s3-access-key-file",
111                 "",
112                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
113         flag.StringVar(
114                 &s3SecretKeyFile,
115                 "s3-secret-key-file",
116                 "",
117                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
118         flag.DurationVar(
119                 &s3RaceWindow,
120                 "s3-race-window",
121                 24*time.Hour,
122                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
123         flag.IntVar(
124                 &s3Replication,
125                 "s3-replication",
126                 2,
127                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
128         flag.BoolVar(
129                 &s3UnsafeDelete,
130                 "s3-unsafe-delete",
131                 false,
132                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
133 }
134
135 // S3Volume implements Volume using an S3 bucket.
136 type S3Volume struct {
137         AccessKeyFile      string
138         SecretKeyFile      string
139         Endpoint           string
140         Region             string
141         Bucket             string
142         LocationConstraint bool
143         IndexPageSize      int
144         S3Replication      int
145         ConnectTimeout     arvados.Duration
146         ReadTimeout        arvados.Duration
147         RaceWindow         arvados.Duration
148         ReadOnly           bool
149         UnsafeDelete       bool
150
151         bucket *s3bucket
152
153         startOnce sync.Once
154 }
155
156 // Examples implements VolumeWithExamples.
157 func (*S3Volume) Examples() []Volume {
158         return []Volume{
159                 &S3Volume{
160                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
161                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
162                         Endpoint:       "",
163                         Region:         "us-east-1",
164                         Bucket:         "example-bucket-name",
165                         IndexPageSize:  1000,
166                         S3Replication:  2,
167                         RaceWindow:     arvados.Duration(24 * time.Hour),
168                         ConnectTimeout: arvados.Duration(time.Minute),
169                         ReadTimeout:    arvados.Duration(5 * time.Minute),
170                 },
171                 &S3Volume{
172                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
173                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
174                         Endpoint:       "https://storage.googleapis.com",
175                         Region:         "",
176                         Bucket:         "example-bucket-name",
177                         IndexPageSize:  1000,
178                         S3Replication:  2,
179                         RaceWindow:     arvados.Duration(24 * time.Hour),
180                         ConnectTimeout: arvados.Duration(time.Minute),
181                         ReadTimeout:    arvados.Duration(5 * time.Minute),
182                 },
183         }
184 }
185
186 // Type implements Volume.
187 func (*S3Volume) Type() string {
188         return "S3"
189 }
190
191 // Start populates private fields and verifies the configuration is
192 // valid.
193 func (v *S3Volume) Start() error {
194         region, ok := aws.Regions[v.Region]
195         if v.Endpoint == "" {
196                 if !ok {
197                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
198                 }
199         } else if ok {
200                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
201                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
202         } else {
203                 region = aws.Region{
204                         Name:                 v.Region,
205                         S3Endpoint:           v.Endpoint,
206                         S3LocationConstraint: v.LocationConstraint,
207                 }
208         }
209
210         var err error
211         var auth aws.Auth
212         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
213         if err != nil {
214                 return err
215         }
216         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
217         if err != nil {
218                 return err
219         }
220
221         // Zero timeouts mean "wait forever", which is a bad
222         // default. Default to long timeouts instead.
223         if v.ConnectTimeout == 0 {
224                 v.ConnectTimeout = s3DefaultConnectTimeout
225         }
226         if v.ReadTimeout == 0 {
227                 v.ReadTimeout = s3DefaultReadTimeout
228         }
229
230         client := s3.New(auth, region)
231         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
232         client.ReadTimeout = time.Duration(v.ReadTimeout)
233         v.bucket = &s3bucket{
234                 Bucket: &s3.Bucket{
235                         S3:   client,
236                         Name: v.Bucket,
237                 },
238         }
239         return nil
240 }
241
242 // DeviceID returns a globally unique ID for the storage bucket.
243 func (v *S3Volume) DeviceID() string {
244         return "s3://" + v.Endpoint + "/" + v.Bucket
245 }
246
247 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
248         ready := make(chan bool)
249         go func() {
250                 rdr, err = v.getReader(loc)
251                 close(ready)
252         }()
253         select {
254         case <-ready:
255                 return
256         case <-ctx.Done():
257                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
258                 go func() {
259                         <-ready
260                         if err == nil {
261                                 rdr.Close()
262                         }
263                 }()
264                 return nil, ctx.Err()
265         }
266 }
267
268 // getReader wraps (Bucket)GetReader.
269 //
270 // In situations where (Bucket)GetReader would fail because the block
271 // disappeared in a Trash race, getReader calls fixRace to recover the
272 // data, and tries again.
273 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
274         rdr, err = v.bucket.GetReader(loc)
275         err = v.translateError(err)
276         if err == nil || !os.IsNotExist(err) {
277                 return
278         }
279
280         _, err = v.bucket.Head("recent/"+loc, nil)
281         err = v.translateError(err)
282         if err != nil {
283                 // If we can't read recent/X, there's no point in
284                 // trying fixRace. Give up.
285                 return
286         }
287         if !v.fixRace(loc) {
288                 err = os.ErrNotExist
289                 return
290         }
291
292         rdr, err = v.bucket.GetReader(loc)
293         if err != nil {
294                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
295                 err = v.translateError(err)
296         }
297         return
298 }
299
300 // Get a block: copy the block data into buf, and return the number of
301 // bytes copied.
302 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
303         rdr, err := v.getReaderWithContext(ctx, loc)
304         if err != nil {
305                 return 0, err
306         }
307
308         var n int
309         ready := make(chan bool)
310         go func() {
311                 defer close(ready)
312
313                 defer rdr.Close()
314                 n, err = io.ReadFull(rdr, buf)
315
316                 switch err {
317                 case nil, io.EOF, io.ErrUnexpectedEOF:
318                         err = nil
319                 default:
320                         err = v.translateError(err)
321                 }
322         }()
323         select {
324         case <-ctx.Done():
325                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
326                 rdr.Close()
327                 // Must wait for ReadFull to return, to ensure it
328                 // doesn't write to buf after we return.
329                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
330                 <-ready
331                 return 0, ctx.Err()
332         case <-ready:
333                 return n, err
334         }
335 }
336
337 // Compare the given data with the stored data.
338 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
339         rdr, err := v.getReaderWithContext(ctx, loc)
340         if err != nil {
341                 return err
342         }
343         defer rdr.Close()
344         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
345 }
346
347 // Put writes a block.
348 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
349         if v.ReadOnly {
350                 return MethodDisabledError
351         }
352         var opts s3.Options
353         size := len(block)
354         if size > 0 {
355                 md5, err := hex.DecodeString(loc)
356                 if err != nil {
357                         return err
358                 }
359                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
360         }
361
362         // Send the block data through a pipe, so that (if we need to)
363         // we can close the pipe early and abandon our PutReader()
364         // goroutine, without worrying about PutReader() accessing our
365         // block buffer after we release it.
366         bufr, bufw := io.Pipe()
367         go func() {
368                 io.Copy(bufw, bytes.NewReader(block))
369                 bufw.Close()
370         }()
371
372         var err error
373         ready := make(chan bool)
374         go func() {
375                 defer func() {
376                         if ctx.Err() != nil {
377                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
378                         }
379                 }()
380                 defer close(ready)
381                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
382                 if err != nil {
383                         return
384                 }
385                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
386         }()
387         select {
388         case <-ctx.Done():
389                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
390                 // Our pipe might be stuck in Write(), waiting for
391                 // io.Copy() to read. If so, un-stick it. This means
392                 // PutReader will get corrupt data, but that's OK: the
393                 // size and MD5 won't match, so the write will fail.
394                 go io.Copy(ioutil.Discard, bufr)
395                 // CloseWithError() will return once pending I/O is done.
396                 bufw.CloseWithError(ctx.Err())
397                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
398                 return ctx.Err()
399         case <-ready:
400                 return v.translateError(err)
401         }
402 }
403
404 // Touch sets the timestamp for the given locator to the current time.
405 func (v *S3Volume) Touch(loc string) error {
406         if v.ReadOnly {
407                 return MethodDisabledError
408         }
409         _, err := v.bucket.Head(loc, nil)
410         err = v.translateError(err)
411         if os.IsNotExist(err) && v.fixRace(loc) {
412                 // The data object got trashed in a race, but fixRace
413                 // rescued it.
414         } else if err != nil {
415                 return err
416         }
417         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
418         return v.translateError(err)
419 }
420
421 // Mtime returns the stored timestamp for the given locator.
422 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
423         _, err := v.bucket.Head(loc, nil)
424         if err != nil {
425                 return zeroTime, v.translateError(err)
426         }
427         resp, err := v.bucket.Head("recent/"+loc, nil)
428         err = v.translateError(err)
429         if os.IsNotExist(err) {
430                 // The data object X exists, but recent/X is missing.
431                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
432                 if err != nil {
433                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
434                         return zeroTime, v.translateError(err)
435                 }
436                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
437                 resp, err = v.bucket.Head("recent/"+loc, nil)
438                 if err != nil {
439                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
440                         return zeroTime, v.translateError(err)
441                 }
442         } else if err != nil {
443                 // HEAD recent/X failed for some other reason.
444                 return zeroTime, err
445         }
446         return v.lastModified(resp)
447 }
448
449 // IndexTo writes a complete list of locators with the given prefix
450 // for which Get() can retrieve data.
451 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
452         // Use a merge sort to find matching sets of X and recent/X.
453         dataL := s3Lister{
454                 Bucket:   v.bucket.Bucket,
455                 Prefix:   prefix,
456                 PageSize: v.IndexPageSize,
457         }
458         recentL := s3Lister{
459                 Bucket:   v.bucket.Bucket,
460                 Prefix:   "recent/" + prefix,
461                 PageSize: v.IndexPageSize,
462         }
463         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
464         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
465         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
466                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
467                 if data.Key >= "g" {
468                         // Conveniently, "recent/*" and "trash/*" are
469                         // lexically greater than all hex-encoded data
470                         // hashes, so stopping here avoids iterating
471                         // over all of them needlessly with dataL.
472                         break
473                 }
474                 if !v.isKeepBlock(data.Key) {
475                         continue
476                 }
477
478                 // stamp is the list entry we should use to report the
479                 // last-modified time for this data block: it will be
480                 // the recent/X entry if one exists, otherwise the
481                 // entry for the data block itself.
482                 stamp := data
483
484                 // Advance to the corresponding recent/X marker, if any
485                 for recent != nil {
486                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
487                                 recent = recentL.Next()
488                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
489                                 continue
490                         } else if cmp == 0 {
491                                 stamp = recent
492                                 recent = recentL.Next()
493                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
494                                 break
495                         } else {
496                                 // recent/X marker is missing: we'll
497                                 // use the timestamp on the data
498                                 // object.
499                                 break
500                         }
501                 }
502                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
503                 if err != nil {
504                         return err
505                 }
506                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
507         }
508         return nil
509 }
510
511 // Trash a Keep block.
512 func (v *S3Volume) Trash(loc string) error {
513         if v.ReadOnly {
514                 return MethodDisabledError
515         }
516         if t, err := v.Mtime(loc); err != nil {
517                 return err
518         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
519                 return nil
520         }
521         if theConfig.TrashLifetime == 0 {
522                 if !s3UnsafeDelete {
523                         return ErrS3TrashDisabled
524                 }
525                 return v.translateError(v.bucket.Del(loc))
526         }
527         err := v.checkRaceWindow(loc)
528         if err != nil {
529                 return err
530         }
531         err = v.safeCopy("trash/"+loc, loc)
532         if err != nil {
533                 return err
534         }
535         return v.translateError(v.bucket.Del(loc))
536 }
537
538 // checkRaceWindow returns a non-nil error if trash/loc is, or might
539 // be, in the race window (i.e., it's not safe to trash loc).
540 func (v *S3Volume) checkRaceWindow(loc string) error {
541         resp, err := v.bucket.Head("trash/"+loc, nil)
542         err = v.translateError(err)
543         if os.IsNotExist(err) {
544                 // OK, trash/X doesn't exist so we're not in the race
545                 // window
546                 return nil
547         } else if err != nil {
548                 // Error looking up trash/X. We don't know whether
549                 // we're in the race window
550                 return err
551         }
552         t, err := v.lastModified(resp)
553         if err != nil {
554                 // Can't parse timestamp
555                 return err
556         }
557         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
558         if safeWindow <= 0 {
559                 // We can't count on "touch trash/X" to prolong
560                 // trash/X's lifetime. The new timestamp might not
561                 // become visible until now+raceWindow, and EmptyTrash
562                 // is allowed to delete trash/X before then.
563                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
564         }
565         // trash/X exists, but it won't be eligible for deletion until
566         // after now+raceWindow, so it's safe to overwrite it.
567         return nil
568 }
569
570 // safeCopy calls PutCopy, and checks the response to make sure the
571 // copy succeeded and updated the timestamp on the destination object
572 // (PutCopy returns 200 OK if the request was received, even if the
573 // copy failed).
574 func (v *S3Volume) safeCopy(dst, src string) error {
575         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
576                 ContentType:       "application/octet-stream",
577                 MetadataDirective: "REPLACE",
578         }, v.bucket.Name+"/"+src)
579         err = v.translateError(err)
580         if err != nil {
581                 return err
582         }
583         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
584                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
585         } else if time.Now().Sub(t) > maxClockSkew {
586                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
587         }
588         return nil
589 }
590
591 // Get the LastModified header from resp, and parse it as RFC1123 or
592 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
593 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
594         s := resp.Header.Get("Last-Modified")
595         t, err = time.Parse(time.RFC1123, s)
596         if err != nil && s != "" {
597                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
598                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
599                 // as required by HTTP spec. If it's not a valid HTTP
600                 // header value, it's probably AWS (or s3test) giving
601                 // us a nearly-RFC1123 timestamp.
602                 t, err = time.Parse(nearlyRFC1123, s)
603         }
604         return
605 }
606
607 // Untrash moves block from trash back into store
608 func (v *S3Volume) Untrash(loc string) error {
609         err := v.safeCopy(loc, "trash/"+loc)
610         if err != nil {
611                 return err
612         }
613         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
614         return v.translateError(err)
615 }
616
617 // Status returns a *VolumeStatus representing the current in-use
618 // storage capacity and a fake available capacity that doesn't make
619 // the volume seem full or nearly-full.
620 func (v *S3Volume) Status() *VolumeStatus {
621         return &VolumeStatus{
622                 DeviceNum: 1,
623                 BytesFree: BlockSize * 1000,
624                 BytesUsed: 1,
625         }
626 }
627
628 // InternalStats returns bucket I/O and API call counters.
629 func (v *S3Volume) InternalStats() interface{} {
630         return &v.bucket.stats
631 }
632
633 // String implements fmt.Stringer.
634 func (v *S3Volume) String() string {
635         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
636 }
637
638 // Writable returns false if all future Put, Mtime, and Delete calls
639 // are expected to fail.
640 func (v *S3Volume) Writable() bool {
641         return !v.ReadOnly
642 }
643
644 // Replication returns the storage redundancy of the underlying
645 // device. Configured via command line flag.
646 func (v *S3Volume) Replication() int {
647         return v.S3Replication
648 }
649
650 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
651
652 func (v *S3Volume) isKeepBlock(s string) bool {
653         return s3KeepBlockRegexp.MatchString(s)
654 }
655
656 // fixRace(X) is called when "recent/X" exists but "X" doesn't
657 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
658 // there was a race between Put and Trash, fixRace recovers from the
659 // race by Untrashing the block.
660 func (v *S3Volume) fixRace(loc string) bool {
661         trash, err := v.bucket.Head("trash/"+loc, nil)
662         if err != nil {
663                 if !os.IsNotExist(v.translateError(err)) {
664                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
665                 }
666                 return false
667         }
668         trashTime, err := v.lastModified(trash)
669         if err != nil {
670                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
671                 return false
672         }
673
674         recent, err := v.bucket.Head("recent/"+loc, nil)
675         if err != nil {
676                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
677                 return false
678         }
679         recentTime, err := v.lastModified(recent)
680         if err != nil {
681                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
682                 return false
683         }
684
685         ageWhenTrashed := trashTime.Sub(recentTime)
686         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
687                 // No evidence of a race: block hasn't been written
688                 // since it became eligible for Trash. No fix needed.
689                 return false
690         }
691
692         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
693         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
694         err = v.safeCopy(loc, "trash/"+loc)
695         if err != nil {
696                 log.Printf("error: fixRace: %s", err)
697                 return false
698         }
699         return true
700 }
701
702 func (v *S3Volume) translateError(err error) error {
703         switch err := err.(type) {
704         case *s3.Error:
705                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
706                         strings.Contains(err.Error(), "Not Found") {
707                         return os.ErrNotExist
708                 }
709                 // Other 404 errors like NoSuchVersion and
710                 // NoSuchBucket are different problems which should
711                 // get called out downstream, so we don't convert them
712                 // to os.ErrNotExist.
713         }
714         return err
715 }
716
717 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
718 // and deletes them from the volume.
719 func (v *S3Volume) EmptyTrash() {
720         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
721
722         // Use a merge sort to find matching sets of trash/X and recent/X.
723         trashL := s3Lister{
724                 Bucket:   v.bucket.Bucket,
725                 Prefix:   "trash/",
726                 PageSize: v.IndexPageSize,
727         }
728         // Define "ready to delete" as "...when EmptyTrash started".
729         startT := time.Now()
730         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
731                 loc := trash.Key[6:]
732                 if !v.isKeepBlock(loc) {
733                         continue
734                 }
735                 bytesInTrash += trash.Size
736                 blocksInTrash++
737
738                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
739                 if err != nil {
740                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
741                         continue
742                 }
743                 recent, err := v.bucket.Head("recent/"+loc, nil)
744                 if err != nil && os.IsNotExist(v.translateError(err)) {
745                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
746                         err = v.Untrash(loc)
747                         if err != nil {
748                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
749                         }
750                         continue
751                 } else if err != nil {
752                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
753                         continue
754                 }
755                 recentT, err := v.lastModified(recent)
756                 if err != nil {
757                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
758                         continue
759                 }
760                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
761                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
762                                 // recent/loc is too old to protect
763                                 // loc from being Trashed again during
764                                 // the raceWindow that starts if we
765                                 // delete trash/X now.
766                                 //
767                                 // Note this means (TrashCheckInterval
768                                 // < BlobSignatureTTL - raceWindow) is
769                                 // necessary to avoid starvation.
770                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
771                                 v.fixRace(loc)
772                                 v.Touch(loc)
773                                 continue
774                         }
775                         _, err := v.bucket.Head(loc, nil)
776                         if os.IsNotExist(err) {
777                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
778                                 v.fixRace(loc)
779                                 continue
780                         } else if err != nil {
781                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
782                                 continue
783                         }
784                 }
785                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
786                         continue
787                 }
788                 err = v.bucket.Del(trash.Key)
789                 if err != nil {
790                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
791                         continue
792                 }
793                 bytesDeleted += trash.Size
794                 blocksDeleted++
795
796                 _, err = v.bucket.Head(loc, nil)
797                 if os.IsNotExist(err) {
798                         err = v.bucket.Del("recent/" + loc)
799                         if err != nil {
800                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
801                         }
802                 } else if err != nil {
803                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
804                 }
805         }
806         if err := trashL.Error(); err != nil {
807                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
808         }
809         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
810 }
811
812 type s3Lister struct {
813         Bucket     *s3.Bucket
814         Prefix     string
815         PageSize   int
816         nextMarker string
817         buf        []s3.Key
818         err        error
819 }
820
821 // First fetches the first page and returns the first item. It returns
822 // nil if the response is the empty set or an error occurs.
823 func (lister *s3Lister) First() *s3.Key {
824         lister.getPage()
825         return lister.pop()
826 }
827
828 // Next returns the next item, fetching the next page if necessary. It
829 // returns nil if the last available item has already been fetched, or
830 // an error occurs.
831 func (lister *s3Lister) Next() *s3.Key {
832         if len(lister.buf) == 0 && lister.nextMarker != "" {
833                 lister.getPage()
834         }
835         return lister.pop()
836 }
837
838 // Return the most recent error encountered by First or Next.
839 func (lister *s3Lister) Error() error {
840         return lister.err
841 }
842
843 func (lister *s3Lister) getPage() {
844         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
845         lister.nextMarker = ""
846         if err != nil {
847                 lister.err = err
848                 return
849         }
850         if resp.IsTruncated {
851                 lister.nextMarker = resp.NextMarker
852         }
853         lister.buf = make([]s3.Key, 0, len(resp.Contents))
854         for _, key := range resp.Contents {
855                 if !strings.HasPrefix(key.Key, lister.Prefix) {
856                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
857                         continue
858                 }
859                 lister.buf = append(lister.buf, key)
860         }
861 }
862
863 func (lister *s3Lister) pop() (k *s3.Key) {
864         if len(lister.buf) > 0 {
865                 k = &lister.buf[0]
866                 lister.buf = lister.buf[1:]
867         }
868         return
869 }
870
871 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
872 type s3bucket struct {
873         *s3.Bucket
874         stats s3bucketStats
875 }
876
877 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
878         rdr, err := b.Bucket.GetReader(path)
879         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
880         b.stats.TickErr(err)
881         return NewCountingReader(rdr, b.stats.TickInBytes), err
882 }
883
884 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
885         resp, err := b.Bucket.Head(path, headers)
886         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
887         b.stats.TickErr(err)
888         return resp, err
889 }
890
891 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
892         err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.TickOutBytes), length, contType, perm, options)
893         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
894         b.stats.TickErr(err)
895         return err
896 }
897
898 func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
899         err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.TickOutBytes), int64(len(data)), contType, perm, options)
900         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
901         b.stats.TickErr(err)
902         return err
903 }
904
905 func (b *s3bucket) Del(path string) error {
906         err := b.Bucket.Del(path)
907         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
908         b.stats.TickErr(err)
909         return err
910 }
911
912 type s3bucketStats struct {
913         statsTicker
914         Ops     uint64
915         GetOps  uint64
916         PutOps  uint64
917         HeadOps uint64
918         DelOps  uint64
919         ListOps uint64
920 }
921
922 func (s *s3bucketStats) TickErr(err error) {
923         if err == nil {
924                 return
925         }
926         errType := fmt.Sprintf("%T", err)
927         if err, ok := err.(*s3.Error); ok {
928                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
929         }
930         s.statsTicker.TickErr(err, errType)
931 }