10484: Tidy up stats-tracking code into a bucket proxy type.
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/base64"
7         "encoding/hex"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "strings"
17         "sync"
18         "sync/atomic"
19         "time"
20
21         "git.curoverse.com/arvados.git/sdk/go/arvados"
22         "github.com/AdRoll/goamz/aws"
23         "github.com/AdRoll/goamz/s3"
24 )
25
26 const (
27         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
28         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
29 )
30
31 var (
32         // ErrS3TrashDisabled is returned by Trash if that operation
33         // is impossible with the current config.
34         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
35
36         s3AccessKeyFile string
37         s3SecretKeyFile string
38         s3RegionName    string
39         s3Endpoint      string
40         s3Replication   int
41         s3UnsafeDelete  bool
42         s3RaceWindow    time.Duration
43
44         s3ACL = s3.Private
45 )
46
47 const (
48         maxClockSkew  = 600 * time.Second
49         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
50 )
51
52 type s3VolumeAdder struct {
53         *Config
54 }
55
56 // String implements flag.Value
57 func (s *s3VolumeAdder) String() string {
58         return "-"
59 }
60
61 func (s *s3VolumeAdder) Set(bucketName string) error {
62         if bucketName == "" {
63                 return fmt.Errorf("no container name given")
64         }
65         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
66                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
67         }
68         if deprecated.flagSerializeIO {
69                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
70         }
71         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
72                 Bucket:        bucketName,
73                 AccessKeyFile: s3AccessKeyFile,
74                 SecretKeyFile: s3SecretKeyFile,
75                 Endpoint:      s3Endpoint,
76                 Region:        s3RegionName,
77                 RaceWindow:    arvados.Duration(s3RaceWindow),
78                 S3Replication: s3Replication,
79                 UnsafeDelete:  s3UnsafeDelete,
80                 ReadOnly:      deprecated.flagReadonly,
81                 IndexPageSize: 1000,
82         })
83         return nil
84 }
85
86 func s3regions() (okList []string) {
87         for r := range aws.Regions {
88                 okList = append(okList, r)
89         }
90         return
91 }
92
93 func init() {
94         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
95
96         flag.Var(&s3VolumeAdder{theConfig},
97                 "s3-bucket-volume",
98                 "Use the given bucket as a storage volume. Can be given multiple times.")
99         flag.StringVar(
100                 &s3RegionName,
101                 "s3-region",
102                 "",
103                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
104         flag.StringVar(
105                 &s3Endpoint,
106                 "s3-endpoint",
107                 "",
108                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
109         flag.StringVar(
110                 &s3AccessKeyFile,
111                 "s3-access-key-file",
112                 "",
113                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
114         flag.StringVar(
115                 &s3SecretKeyFile,
116                 "s3-secret-key-file",
117                 "",
118                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
119         flag.DurationVar(
120                 &s3RaceWindow,
121                 "s3-race-window",
122                 24*time.Hour,
123                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
124         flag.IntVar(
125                 &s3Replication,
126                 "s3-replication",
127                 2,
128                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
129         flag.BoolVar(
130                 &s3UnsafeDelete,
131                 "s3-unsafe-delete",
132                 false,
133                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
134 }
135
136 // S3Volume implements Volume using an S3 bucket.
137 type S3Volume struct {
138         AccessKeyFile      string
139         SecretKeyFile      string
140         Endpoint           string
141         Region             string
142         Bucket             string
143         LocationConstraint bool
144         IndexPageSize      int
145         S3Replication      int
146         ConnectTimeout     arvados.Duration
147         ReadTimeout        arvados.Duration
148         RaceWindow         arvados.Duration
149         ReadOnly           bool
150         UnsafeDelete       bool
151
152         bucket      *s3bucket
153         volumeStats ioStats
154
155         startOnce sync.Once
156 }
157
158 // Examples implements VolumeWithExamples.
159 func (*S3Volume) Examples() []Volume {
160         return []Volume{
161                 &S3Volume{
162                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
163                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
164                         Endpoint:       "",
165                         Region:         "us-east-1",
166                         Bucket:         "example-bucket-name",
167                         IndexPageSize:  1000,
168                         S3Replication:  2,
169                         RaceWindow:     arvados.Duration(24 * time.Hour),
170                         ConnectTimeout: arvados.Duration(time.Minute),
171                         ReadTimeout:    arvados.Duration(5 * time.Minute),
172                 },
173                 &S3Volume{
174                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
175                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
176                         Endpoint:       "https://storage.googleapis.com",
177                         Region:         "",
178                         Bucket:         "example-bucket-name",
179                         IndexPageSize:  1000,
180                         S3Replication:  2,
181                         RaceWindow:     arvados.Duration(24 * time.Hour),
182                         ConnectTimeout: arvados.Duration(time.Minute),
183                         ReadTimeout:    arvados.Duration(5 * time.Minute),
184                 },
185         }
186 }
187
188 // Type implements Volume.
189 func (*S3Volume) Type() string {
190         return "S3"
191 }
192
193 // Start populates private fields and verifies the configuration is
194 // valid.
195 func (v *S3Volume) Start() error {
196         region, ok := aws.Regions[v.Region]
197         if v.Endpoint == "" {
198                 if !ok {
199                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
200                 }
201         } else if ok {
202                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
203                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
204         } else {
205                 region = aws.Region{
206                         Name:                 v.Region,
207                         S3Endpoint:           v.Endpoint,
208                         S3LocationConstraint: v.LocationConstraint,
209                 }
210         }
211
212         var err error
213         var auth aws.Auth
214         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
215         if err != nil {
216                 return err
217         }
218         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
219         if err != nil {
220                 return err
221         }
222
223         // Zero timeouts mean "wait forever", which is a bad
224         // default. Default to long timeouts instead.
225         if v.ConnectTimeout == 0 {
226                 v.ConnectTimeout = s3DefaultConnectTimeout
227         }
228         if v.ReadTimeout == 0 {
229                 v.ReadTimeout = s3DefaultReadTimeout
230         }
231
232         client := s3.New(auth, region)
233         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
234         client.ReadTimeout = time.Duration(v.ReadTimeout)
235         v.bucket = &s3bucket{
236                 Bucket: &s3.Bucket{
237                         S3:   client,
238                         Name: v.Bucket,
239                 },
240         }
241         return nil
242 }
243
244 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
245         ready := make(chan bool)
246         go func() {
247                 rdr, err = v.getReader(loc)
248                 close(ready)
249         }()
250         select {
251         case <-ready:
252                 return
253         case <-ctx.Done():
254                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
255                 go func() {
256                         <-ready
257                         if err == nil {
258                                 rdr.Close()
259                         }
260                 }()
261                 return nil, ctx.Err()
262         }
263 }
264
265 // getReader wraps (Bucket)GetReader.
266 //
267 // In situations where (Bucket)GetReader would fail because the block
268 // disappeared in a Trash race, getReader calls fixRace to recover the
269 // data, and tries again.
270 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
271         rdr, err = v.bucket.GetReader(loc)
272         err = v.translateError(err)
273         if err == nil || !os.IsNotExist(err) {
274                 return
275         }
276
277         _, err = v.bucket.Head("recent/"+loc, nil)
278         err = v.translateError(err)
279         if err != nil {
280                 // If we can't read recent/X, there's no point in
281                 // trying fixRace. Give up.
282                 return
283         }
284         if !v.fixRace(loc) {
285                 err = os.ErrNotExist
286                 return
287         }
288
289         rdr, err = v.bucket.GetReader(loc)
290         if err != nil {
291                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
292                 err = v.translateError(err)
293         }
294         return
295 }
296
297 // Get a block: copy the block data into buf, and return the number of
298 // bytes copied.
299 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
300         rdr, err := v.getReaderWithContext(ctx, loc)
301         if err != nil {
302                 return 0, err
303         }
304
305         var n int
306         ready := make(chan bool)
307         go func() {
308                 defer close(ready)
309
310                 defer rdr.Close()
311                 n, err = io.ReadFull(rdr, buf)
312
313                 switch err {
314                 case nil, io.EOF, io.ErrUnexpectedEOF:
315                         err = nil
316                 default:
317                         err = v.translateError(err)
318                 }
319         }()
320         select {
321         case <-ctx.Done():
322                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
323                 rdr.Close()
324                 // Must wait for ReadFull to return, to ensure it
325                 // doesn't write to buf after we return.
326                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
327                 <-ready
328                 return 0, ctx.Err()
329         case <-ready:
330                 return n, err
331         }
332 }
333
334 // Compare the given data with the stored data.
335 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
336         rdr, err := v.getReaderWithContext(ctx, loc)
337         if err != nil {
338                 return err
339         }
340         defer rdr.Close()
341         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
342 }
343
344 // Put writes a block.
345 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
346         if v.ReadOnly {
347                 return MethodDisabledError
348         }
349         var opts s3.Options
350         size := len(block)
351         if size > 0 {
352                 md5, err := hex.DecodeString(loc)
353                 if err != nil {
354                         return err
355                 }
356                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
357         }
358
359         // Send the block data through a pipe, so that (if we need to)
360         // we can close the pipe early and abandon our PutReader()
361         // goroutine, without worrying about PutReader() accessing our
362         // block buffer after we release it.
363         bufr, bufw := io.Pipe()
364         go func() {
365                 io.Copy(bufw, bytes.NewReader(block))
366                 bufw.Close()
367         }()
368
369         var err error
370         ready := make(chan bool)
371         go func() {
372                 defer func() {
373                         if ctx.Err() != nil {
374                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
375                         }
376                 }()
377                 defer close(ready)
378                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
379                 if err != nil {
380                         return
381                 }
382                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
383         }()
384         select {
385         case <-ctx.Done():
386                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
387                 // Our pipe might be stuck in Write(), waiting for
388                 // io.Copy() to read. If so, un-stick it. This means
389                 // PutReader will get corrupt data, but that's OK: the
390                 // size and MD5 won't match, so the write will fail.
391                 go io.Copy(ioutil.Discard, bufr)
392                 // CloseWithError() will return once pending I/O is done.
393                 bufw.CloseWithError(ctx.Err())
394                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
395                 return ctx.Err()
396         case <-ready:
397                 return v.translateError(err)
398         }
399 }
400
401 // Touch sets the timestamp for the given locator to the current time.
402 func (v *S3Volume) Touch(loc string) error {
403         if v.ReadOnly {
404                 return MethodDisabledError
405         }
406         _, err := v.bucket.Head(loc, nil)
407         err = v.translateError(err)
408         if os.IsNotExist(err) && v.fixRace(loc) {
409                 // The data object got trashed in a race, but fixRace
410                 // rescued it.
411         } else if err != nil {
412                 return err
413         }
414         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
415         return v.translateError(err)
416 }
417
418 // Mtime returns the stored timestamp for the given locator.
419 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
420         _, err := v.bucket.Head(loc, nil)
421         if err != nil {
422                 return zeroTime, v.translateError(err)
423         }
424         resp, err := v.bucket.Head("recent/"+loc, nil)
425         err = v.translateError(err)
426         if os.IsNotExist(err) {
427                 // The data object X exists, but recent/X is missing.
428                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
429                 if err != nil {
430                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
431                         return zeroTime, v.translateError(err)
432                 }
433                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
434                 resp, err = v.bucket.Head("recent/"+loc, nil)
435                 if err != nil {
436                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
437                         return zeroTime, v.translateError(err)
438                 }
439         } else if err != nil {
440                 // HEAD recent/X failed for some other reason.
441                 return zeroTime, err
442         }
443         return v.lastModified(resp)
444 }
445
446 // IndexTo writes a complete list of locators with the given prefix
447 // for which Get() can retrieve data.
448 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
449         // Use a merge sort to find matching sets of X and recent/X.
450         dataL := s3Lister{
451                 Bucket:   v.bucket.Bucket,
452                 Prefix:   prefix,
453                 PageSize: v.IndexPageSize,
454         }
455         recentL := s3Lister{
456                 Bucket:   v.bucket.Bucket,
457                 Prefix:   "recent/" + prefix,
458                 PageSize: v.IndexPageSize,
459         }
460         v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
461         v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
462         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
463                 v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
464                 if data.Key >= "g" {
465                         // Conveniently, "recent/*" and "trash/*" are
466                         // lexically greater than all hex-encoded data
467                         // hashes, so stopping here avoids iterating
468                         // over all of them needlessly with dataL.
469                         break
470                 }
471                 if !v.isKeepBlock(data.Key) {
472                         continue
473                 }
474
475                 // stamp is the list entry we should use to report the
476                 // last-modified time for this data block: it will be
477                 // the recent/X entry if one exists, otherwise the
478                 // entry for the data block itself.
479                 stamp := data
480
481                 // Advance to the corresponding recent/X marker, if any
482                 for recent != nil {
483                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
484                                 recent = recentL.Next()
485                                 v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
486                                 continue
487                         } else if cmp == 0 {
488                                 stamp = recent
489                                 recent = recentL.Next()
490                                 v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
491                                 break
492                         } else {
493                                 // recent/X marker is missing: we'll
494                                 // use the timestamp on the data
495                                 // object.
496                                 break
497                         }
498                 }
499                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
500                 if err != nil {
501                         return err
502                 }
503                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
504         }
505         return nil
506 }
507
508 // Trash a Keep block.
509 func (v *S3Volume) Trash(loc string) error {
510         if v.ReadOnly {
511                 return MethodDisabledError
512         }
513         if t, err := v.Mtime(loc); err != nil {
514                 return err
515         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
516                 return nil
517         }
518         if theConfig.TrashLifetime == 0 {
519                 if !s3UnsafeDelete {
520                         return ErrS3TrashDisabled
521                 }
522                 return v.translateError(v.bucket.Del(loc))
523         }
524         err := v.checkRaceWindow(loc)
525         if err != nil {
526                 return err
527         }
528         err = v.safeCopy("trash/"+loc, loc)
529         if err != nil {
530                 return err
531         }
532         return v.translateError(v.bucket.Del(loc))
533 }
534
535 // checkRaceWindow returns a non-nil error if trash/loc is, or might
536 // be, in the race window (i.e., it's not safe to trash loc).
537 func (v *S3Volume) checkRaceWindow(loc string) error {
538         resp, err := v.bucket.Head("trash/"+loc, nil)
539         err = v.translateError(err)
540         if os.IsNotExist(err) {
541                 // OK, trash/X doesn't exist so we're not in the race
542                 // window
543                 return nil
544         } else if err != nil {
545                 // Error looking up trash/X. We don't know whether
546                 // we're in the race window
547                 return err
548         }
549         t, err := v.lastModified(resp)
550         if err != nil {
551                 // Can't parse timestamp
552                 return err
553         }
554         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
555         if safeWindow <= 0 {
556                 // We can't count on "touch trash/X" to prolong
557                 // trash/X's lifetime. The new timestamp might not
558                 // become visible until now+raceWindow, and EmptyTrash
559                 // is allowed to delete trash/X before then.
560                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
561         }
562         // trash/X exists, but it won't be eligible for deletion until
563         // after now+raceWindow, so it's safe to overwrite it.
564         return nil
565 }
566
567 // safeCopy calls PutCopy, and checks the response to make sure the
568 // copy succeeded and updated the timestamp on the destination object
569 // (PutCopy returns 200 OK if the request was received, even if the
570 // copy failed).
571 func (v *S3Volume) safeCopy(dst, src string) error {
572         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
573                 ContentType:       "application/octet-stream",
574                 MetadataDirective: "REPLACE",
575         }, v.bucket.Name+"/"+src)
576         err = v.translateError(err)
577         if err != nil {
578                 return err
579         }
580         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
581                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
582         } else if time.Now().Sub(t) > maxClockSkew {
583                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
584         }
585         return nil
586 }
587
588 // Get the LastModified header from resp, and parse it as RFC1123 or
589 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
590 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
591         s := resp.Header.Get("Last-Modified")
592         t, err = time.Parse(time.RFC1123, s)
593         if err != nil && s != "" {
594                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
595                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
596                 // as required by HTTP spec. If it's not a valid HTTP
597                 // header value, it's probably AWS (or s3test) giving
598                 // us a nearly-RFC1123 timestamp.
599                 t, err = time.Parse(nearlyRFC1123, s)
600         }
601         return
602 }
603
604 // Untrash moves block from trash back into store
605 func (v *S3Volume) Untrash(loc string) error {
606         err := v.safeCopy(loc, "trash/"+loc)
607         if err != nil {
608                 return err
609         }
610         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
611         return v.translateError(err)
612 }
613
614 // Status returns a *VolumeStatus representing the current in-use
615 // storage capacity and a fake available capacity that doesn't make
616 // the volume seem full or nearly-full.
617 func (v *S3Volume) Status() *VolumeStatus {
618         return &VolumeStatus{
619                 DeviceNum: 1,
620                 BytesFree: BlockSize * 1000,
621                 BytesUsed: 1,
622         }
623 }
624
625 // InternalStats returns bucket I/O and API call counters.
626 func (v *S3Volume) InternalStats() interface{} {
627         return &v.bucket.stats
628 }
629
630 // String implements fmt.Stringer.
631 func (v *S3Volume) String() string {
632         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
633 }
634
635 // Writable returns false if all future Put, Mtime, and Delete calls
636 // are expected to fail.
637 func (v *S3Volume) Writable() bool {
638         return !v.ReadOnly
639 }
640
641 // Replication returns the storage redundancy of the underlying
642 // device. Configured via command line flag.
643 func (v *S3Volume) Replication() int {
644         return v.S3Replication
645 }
646
647 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
648
649 func (v *S3Volume) isKeepBlock(s string) bool {
650         return s3KeepBlockRegexp.MatchString(s)
651 }
652
653 // fixRace(X) is called when "recent/X" exists but "X" doesn't
654 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
655 // there was a race between Put and Trash, fixRace recovers from the
656 // race by Untrashing the block.
657 func (v *S3Volume) fixRace(loc string) bool {
658         trash, err := v.bucket.Head("trash/"+loc, nil)
659         if err != nil {
660                 if !os.IsNotExist(v.translateError(err)) {
661                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
662                 }
663                 return false
664         }
665         trashTime, err := v.lastModified(trash)
666         if err != nil {
667                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
668                 return false
669         }
670
671         recent, err := v.bucket.Head("recent/"+loc, nil)
672         if err != nil {
673                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
674                 return false
675         }
676         recentTime, err := v.lastModified(recent)
677         if err != nil {
678                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
679                 return false
680         }
681
682         ageWhenTrashed := trashTime.Sub(recentTime)
683         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
684                 // No evidence of a race: block hasn't been written
685                 // since it became eligible for Trash. No fix needed.
686                 return false
687         }
688
689         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
690         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
691         err = v.safeCopy(loc, "trash/"+loc)
692         if err != nil {
693                 log.Printf("error: fixRace: %s", err)
694                 return false
695         }
696         return true
697 }
698
699 func (v *S3Volume) translateError(err error) error {
700         switch err := err.(type) {
701         case *s3.Error:
702                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
703                         strings.Contains(err.Error(), "Not Found") {
704                         return os.ErrNotExist
705                 }
706                 // Other 404 errors like NoSuchVersion and
707                 // NoSuchBucket are different problems which should
708                 // get called out downstream, so we don't convert them
709                 // to os.ErrNotExist.
710         }
711         return err
712 }
713
714 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
715 // and deletes them from the volume.
716 func (v *S3Volume) EmptyTrash() {
717         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
718
719         // Use a merge sort to find matching sets of trash/X and recent/X.
720         trashL := s3Lister{
721                 Bucket:   v.bucket.Bucket,
722                 Prefix:   "trash/",
723                 PageSize: v.IndexPageSize,
724         }
725         // Define "ready to delete" as "...when EmptyTrash started".
726         startT := time.Now()
727         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
728                 loc := trash.Key[6:]
729                 if !v.isKeepBlock(loc) {
730                         continue
731                 }
732                 bytesInTrash += trash.Size
733                 blocksInTrash++
734
735                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
736                 if err != nil {
737                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
738                         continue
739                 }
740                 recent, err := v.bucket.Head("recent/"+loc, nil)
741                 if err != nil && os.IsNotExist(v.translateError(err)) {
742                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
743                         err = v.Untrash(loc)
744                         if err != nil {
745                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
746                         }
747                         continue
748                 } else if err != nil {
749                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
750                         continue
751                 }
752                 recentT, err := v.lastModified(recent)
753                 if err != nil {
754                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
755                         continue
756                 }
757                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
758                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
759                                 // recent/loc is too old to protect
760                                 // loc from being Trashed again during
761                                 // the raceWindow that starts if we
762                                 // delete trash/X now.
763                                 //
764                                 // Note this means (TrashCheckInterval
765                                 // < BlobSignatureTTL - raceWindow) is
766                                 // necessary to avoid starvation.
767                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
768                                 v.fixRace(loc)
769                                 v.Touch(loc)
770                                 continue
771                         }
772                         _, err := v.bucket.Head(loc, nil)
773                         if os.IsNotExist(err) {
774                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
775                                 v.fixRace(loc)
776                                 continue
777                         } else if err != nil {
778                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
779                                 continue
780                         }
781                 }
782                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
783                         continue
784                 }
785                 err = v.bucket.Del(trash.Key)
786                 if err != nil {
787                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
788                         continue
789                 }
790                 bytesDeleted += trash.Size
791                 blocksDeleted++
792
793                 _, err = v.bucket.Head(loc, nil)
794                 if os.IsNotExist(err) {
795                         err = v.bucket.Del("recent/" + loc)
796                         if err != nil {
797                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
798                         }
799                 } else if err != nil {
800                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
801                 }
802         }
803         if err := trashL.Error(); err != nil {
804                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
805         }
806         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
807 }
808
809 type s3Lister struct {
810         Bucket     *s3.Bucket
811         Prefix     string
812         PageSize   int
813         nextMarker string
814         buf        []s3.Key
815         err        error
816 }
817
818 // First fetches the first page and returns the first item. It returns
819 // nil if the response is the empty set or an error occurs.
820 func (lister *s3Lister) First() *s3.Key {
821         lister.getPage()
822         return lister.pop()
823 }
824
825 // Next returns the next item, fetching the next page if necessary. It
826 // returns nil if the last available item has already been fetched, or
827 // an error occurs.
828 func (lister *s3Lister) Next() *s3.Key {
829         if len(lister.buf) == 0 && lister.nextMarker != "" {
830                 lister.getPage()
831         }
832         return lister.pop()
833 }
834
835 // Return the most recent error encountered by First or Next.
836 func (lister *s3Lister) Error() error {
837         return lister.err
838 }
839
840 func (lister *s3Lister) getPage() {
841         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
842         lister.nextMarker = ""
843         if err != nil {
844                 lister.err = err
845                 return
846         }
847         if resp.IsTruncated {
848                 lister.nextMarker = resp.NextMarker
849         }
850         lister.buf = make([]s3.Key, 0, len(resp.Contents))
851         for _, key := range resp.Contents {
852                 if !strings.HasPrefix(key.Key, lister.Prefix) {
853                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
854                         continue
855                 }
856                 lister.buf = append(lister.buf, key)
857         }
858 }
859
860 func (lister *s3Lister) pop() (k *s3.Key) {
861         if len(lister.buf) > 0 {
862                 k = &lister.buf[0]
863                 lister.buf = lister.buf[1:]
864         }
865         return
866 }
867
868 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
869 type s3bucket struct {
870         *s3.Bucket
871         stats s3bucketStats
872 }
873
874 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
875         rdr, err := b.Bucket.GetReader(path)
876         b.stats.tick(&b.stats.Ops, &b.stats.GetOps)
877         b.stats.tickErr(err)
878         return NewCountingReader(rdr, b.stats.tickInBytes), err
879 }
880
881 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
882         resp, err := b.Bucket.Head(path, headers)
883         b.stats.tick(&b.stats.Ops, &b.stats.HeadOps)
884         b.stats.tickErr(err)
885         return resp, err
886 }
887
888 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
889         err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.tickOutBytes), length, contType, perm, options)
890         b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
891         b.stats.tickErr(err)
892         return err
893 }
894
895 func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
896         err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.tickOutBytes), int64(len(data)), contType, perm, options)
897         b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
898         b.stats.tickErr(err)
899         return err
900 }
901
902 func (b *s3bucket) Del(path string) error {
903         err := b.Bucket.Del(path)
904         b.stats.tick(&b.stats.Ops, &b.stats.DelOps)
905         b.stats.tickErr(err)
906         return err
907 }
908
909 type s3bucketStats struct {
910         Errors   uint64
911         Ops      uint64
912         GetOps   uint64
913         PutOps   uint64
914         HeadOps  uint64
915         DelOps   uint64
916         ListOps  uint64
917         InBytes  uint64
918         OutBytes uint64
919
920         ErrorCodes map[string]uint64 `json:",omitempty"`
921
922         lock sync.Mutex
923 }
924
925 func (s *s3bucketStats) tickInBytes(n uint64) {
926         atomic.AddUint64(&s.InBytes, n)
927 }
928
929 func (s *s3bucketStats) tickOutBytes(n uint64) {
930         atomic.AddUint64(&s.OutBytes, n)
931 }
932
933 func (s *s3bucketStats) tick(counters ...*uint64) {
934         for _, counter := range counters {
935                 atomic.AddUint64(counter, 1)
936         }
937 }
938
939 func (s *s3bucketStats) tickErr(err error) {
940         if err == nil {
941                 return
942         }
943         atomic.AddUint64(&s.Errors, 1)
944         errStr := fmt.Sprintf("%T", err)
945         if err, ok := err.(*s3.Error); ok {
946                 errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
947         }
948         s.lock.Lock()
949         if s.ErrorCodes == nil {
950                 s.ErrorCodes = make(map[string]uint64)
951         }
952         s.ErrorCodes[errStr]++
953         s.lock.Unlock()
954 }