10484: Track non-s3 errors by Go type.
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/base64"
7         "encoding/hex"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "strings"
17         "sync"
18         "sync/atomic"
19         "time"
20
21         "git.curoverse.com/arvados.git/sdk/go/arvados"
22         "github.com/AdRoll/goamz/aws"
23         "github.com/AdRoll/goamz/s3"
24 )
25
26 const (
27         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
28         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
29 )
30
31 var (
32         // ErrS3TrashDisabled is returned by Trash if that operation
33         // is impossible with the current config.
34         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
35
36         s3AccessKeyFile string
37         s3SecretKeyFile string
38         s3RegionName    string
39         s3Endpoint      string
40         s3Replication   int
41         s3UnsafeDelete  bool
42         s3RaceWindow    time.Duration
43
44         s3ACL = s3.Private
45 )
46
47 const (
48         maxClockSkew  = 600 * time.Second
49         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
50 )
51
52 type s3VolumeAdder struct {
53         *Config
54 }
55
56 // String implements flag.Value
57 func (s *s3VolumeAdder) String() string {
58         return "-"
59 }
60
61 func (s *s3VolumeAdder) Set(bucketName string) error {
62         if bucketName == "" {
63                 return fmt.Errorf("no container name given")
64         }
65         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
66                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
67         }
68         if deprecated.flagSerializeIO {
69                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
70         }
71         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
72                 Bucket:        bucketName,
73                 AccessKeyFile: s3AccessKeyFile,
74                 SecretKeyFile: s3SecretKeyFile,
75                 Endpoint:      s3Endpoint,
76                 Region:        s3RegionName,
77                 RaceWindow:    arvados.Duration(s3RaceWindow),
78                 S3Replication: s3Replication,
79                 UnsafeDelete:  s3UnsafeDelete,
80                 ReadOnly:      deprecated.flagReadonly,
81                 IndexPageSize: 1000,
82         })
83         return nil
84 }
85
86 func s3regions() (okList []string) {
87         for r := range aws.Regions {
88                 okList = append(okList, r)
89         }
90         return
91 }
92
93 func init() {
94         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
95
96         flag.Var(&s3VolumeAdder{theConfig},
97                 "s3-bucket-volume",
98                 "Use the given bucket as a storage volume. Can be given multiple times.")
99         flag.StringVar(
100                 &s3RegionName,
101                 "s3-region",
102                 "",
103                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
104         flag.StringVar(
105                 &s3Endpoint,
106                 "s3-endpoint",
107                 "",
108                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
109         flag.StringVar(
110                 &s3AccessKeyFile,
111                 "s3-access-key-file",
112                 "",
113                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
114         flag.StringVar(
115                 &s3SecretKeyFile,
116                 "s3-secret-key-file",
117                 "",
118                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
119         flag.DurationVar(
120                 &s3RaceWindow,
121                 "s3-race-window",
122                 24*time.Hour,
123                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
124         flag.IntVar(
125                 &s3Replication,
126                 "s3-replication",
127                 2,
128                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
129         flag.BoolVar(
130                 &s3UnsafeDelete,
131                 "s3-unsafe-delete",
132                 false,
133                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
134 }
135
136 // S3Volume implements Volume using an S3 bucket.
137 type S3Volume struct {
138         AccessKeyFile      string
139         SecretKeyFile      string
140         Endpoint           string
141         Region             string
142         Bucket             string
143         LocationConstraint bool
144         IndexPageSize      int
145         S3Replication      int
146         ConnectTimeout     arvados.Duration
147         ReadTimeout        arvados.Duration
148         RaceWindow         arvados.Duration
149         ReadOnly           bool
150         UnsafeDelete       bool
151
152         bucket      *s3.Bucket
153         bucketStats bucketStats
154         volumeStats ioStats
155
156         startOnce sync.Once
157 }
158
159 type bucketStats struct {
160         Errors   uint64
161         Ops      uint64
162         GetOps   uint64
163         PutOps   uint64
164         HeadOps  uint64
165         DelOps   uint64
166         InBytes  uint64
167         OutBytes uint64
168
169         ErrorCodes map[string]uint64 `json:",omitempty"`
170
171         lock sync.Mutex
172 }
173
174 // Examples implements VolumeWithExamples.
175 func (*S3Volume) Examples() []Volume {
176         return []Volume{
177                 &S3Volume{
178                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
179                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
180                         Endpoint:       "",
181                         Region:         "us-east-1",
182                         Bucket:         "example-bucket-name",
183                         IndexPageSize:  1000,
184                         S3Replication:  2,
185                         RaceWindow:     arvados.Duration(24 * time.Hour),
186                         ConnectTimeout: arvados.Duration(time.Minute),
187                         ReadTimeout:    arvados.Duration(5 * time.Minute),
188                 },
189                 &S3Volume{
190                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
191                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
192                         Endpoint:       "https://storage.googleapis.com",
193                         Region:         "",
194                         Bucket:         "example-bucket-name",
195                         IndexPageSize:  1000,
196                         S3Replication:  2,
197                         RaceWindow:     arvados.Duration(24 * time.Hour),
198                         ConnectTimeout: arvados.Duration(time.Minute),
199                         ReadTimeout:    arvados.Duration(5 * time.Minute),
200                 },
201         }
202 }
203
204 // Type implements Volume.
205 func (*S3Volume) Type() string {
206         return "S3"
207 }
208
209 // Start populates private fields and verifies the configuration is
210 // valid.
211 func (v *S3Volume) Start() error {
212         region, ok := aws.Regions[v.Region]
213         if v.Endpoint == "" {
214                 if !ok {
215                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
216                 }
217         } else if ok {
218                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
219                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
220         } else {
221                 region = aws.Region{
222                         Name:                 v.Region,
223                         S3Endpoint:           v.Endpoint,
224                         S3LocationConstraint: v.LocationConstraint,
225                 }
226         }
227
228         var err error
229         var auth aws.Auth
230         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
231         if err != nil {
232                 return err
233         }
234         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
235         if err != nil {
236                 return err
237         }
238
239         // Zero timeouts mean "wait forever", which is a bad
240         // default. Default to long timeouts instead.
241         if v.ConnectTimeout == 0 {
242                 v.ConnectTimeout = s3DefaultConnectTimeout
243         }
244         if v.ReadTimeout == 0 {
245                 v.ReadTimeout = s3DefaultReadTimeout
246         }
247
248         client := s3.New(auth, region)
249         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
250         client.ReadTimeout = time.Duration(v.ReadTimeout)
251         v.bucket = &s3.Bucket{
252                 S3:   client,
253                 Name: v.Bucket,
254         }
255         return nil
256 }
257
258 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
259         ready := make(chan bool)
260         go func() {
261                 rdr, err = v.getReader(loc)
262                 close(ready)
263         }()
264         select {
265         case <-ready:
266                 return
267         case <-ctx.Done():
268                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
269                 go func() {
270                         <-ready
271                         if err == nil {
272                                 rdr.Close()
273                         }
274                 }()
275                 return nil, ctx.Err()
276         }
277 }
278
279 // getReader wraps (Bucket)GetReader.
280 //
281 // In situations where (Bucket)GetReader would fail because the block
282 // disappeared in a Trash race, getReader calls fixRace to recover the
283 // data, and tries again.
284 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
285         v.tick(&v.bucketStats.Ops, &v.bucketStats.GetOps)
286         rdr, err = v.bucket.GetReader(loc)
287         err = v.translateError(err)
288         if err == nil {
289                 rdr = NewCountingReader(rdr, v.tickInBytes)
290                 return
291         } else if !os.IsNotExist(v.tickErr(err)) {
292                 return
293         }
294
295         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
296         _, err = v.bucket.Head("recent/"+loc, nil)
297         err = v.translateError(v.tickErr(err))
298         if err != nil {
299                 // If we can't read recent/X, there's no point in
300                 // trying fixRace. Give up.
301                 return
302         }
303         if !v.fixRace(loc) {
304                 err = os.ErrNotExist
305                 return
306         }
307
308         v.tick(&v.bucketStats.Ops, &v.bucketStats.GetOps)
309         rdr, err = v.bucket.GetReader(loc)
310         if err != nil {
311                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
312                 err = v.translateError(v.tickErr(err))
313         }
314         rdr = NewCountingReader(rdr, v.tickInBytes)
315         return
316 }
317
318 // Get a block: copy the block data into buf, and return the number of
319 // bytes copied.
320 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
321         rdr, err := v.getReaderWithContext(ctx, loc)
322         if err != nil {
323                 return 0, err
324         }
325
326         var n int
327         ready := make(chan bool)
328         go func() {
329                 defer close(ready)
330
331                 defer rdr.Close()
332                 n, err = io.ReadFull(rdr, buf)
333
334                 switch err {
335                 case nil, io.EOF, io.ErrUnexpectedEOF:
336                         err = nil
337                 default:
338                         err = v.translateError(err)
339                 }
340         }()
341         select {
342         case <-ctx.Done():
343                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
344                 rdr.Close()
345                 // Must wait for ReadFull to return, to ensure it
346                 // doesn't write to buf after we return.
347                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
348                 <-ready
349                 return 0, ctx.Err()
350         case <-ready:
351                 return n, err
352         }
353 }
354
355 // Compare the given data with the stored data.
356 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
357         rdr, err := v.getReaderWithContext(ctx, loc)
358         if err != nil {
359                 return err
360         }
361         defer rdr.Close()
362         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
363 }
364
365 // Put writes a block.
366 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
367         if v.ReadOnly {
368                 return MethodDisabledError
369         }
370         var opts s3.Options
371         size := len(block)
372         if size > 0 {
373                 md5, err := hex.DecodeString(loc)
374                 if err != nil {
375                         return err
376                 }
377                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
378         }
379
380         // Send the block data through a pipe, so that (if we need to)
381         // we can close the pipe early and abandon our PutReader()
382         // goroutine, without worrying about PutReader() accessing our
383         // block buffer after we release it.
384         bufr, bufw := io.Pipe()
385         go func() {
386                 io.Copy(bufw, bytes.NewReader(block))
387                 bufw.Close()
388         }()
389
390         var err error
391         ready := make(chan bool)
392         go func() {
393                 defer func() {
394                         if ctx.Err() != nil {
395                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
396                         }
397                 }()
398                 defer close(ready)
399                 v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
400                 rdr := NewCountingReader(bufr, v.tickOutBytes)
401                 err = v.bucket.PutReader(loc, rdr, int64(size), "application/octet-stream", s3ACL, opts)
402                 if err != nil {
403                         v.tickErr(err)
404                         return
405                 }
406                 v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
407                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
408                 v.tickErr(err)
409         }()
410         select {
411         case <-ctx.Done():
412                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
413                 // Our pipe might be stuck in Write(), waiting for
414                 // io.Copy() to read. If so, un-stick it. This means
415                 // PutReader will get corrupt data, but that's OK: the
416                 // size and MD5 won't match, so the write will fail.
417                 go io.Copy(ioutil.Discard, bufr)
418                 // CloseWithError() will return once pending I/O is done.
419                 bufw.CloseWithError(ctx.Err())
420                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
421                 return ctx.Err()
422         case <-ready:
423                 return v.translateError(err)
424         }
425 }
426
427 // Touch sets the timestamp for the given locator to the current time.
428 func (v *S3Volume) Touch(loc string) error {
429         if v.ReadOnly {
430                 return MethodDisabledError
431         }
432         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
433         _, err := v.bucket.Head(loc, nil)
434         err = v.translateError(v.tickErr(err))
435         if os.IsNotExist(err) && v.fixRace(loc) {
436                 // The data object got trashed in a race, but fixRace
437                 // rescued it.
438         } else if err != nil {
439                 return err
440         }
441         v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
442         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
443         return v.translateError(v.tickErr(err))
444 }
445
446 // Mtime returns the stored timestamp for the given locator.
447 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
448         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
449         _, err := v.bucket.Head(loc, nil)
450         if err != nil {
451                 return zeroTime, v.translateError(v.tickErr(err))
452         }
453         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
454         resp, err := v.bucket.Head("recent/"+loc, nil)
455         err = v.translateError(v.tickErr(err))
456         if os.IsNotExist(err) {
457                 // The data object X exists, but recent/X is missing.
458                 v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
459                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
460                 if err != nil {
461                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
462                         return zeroTime, v.translateError(v.tickErr(err))
463                 }
464                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
465                 v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
466                 resp, err = v.bucket.Head("recent/"+loc, nil)
467                 if err != nil {
468                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
469                         return zeroTime, v.translateError(v.tickErr(err))
470                 }
471         } else if err != nil {
472                 // HEAD recent/X failed for some other reason.
473                 return zeroTime, err
474         }
475         return v.lastModified(resp)
476 }
477
478 // IndexTo writes a complete list of locators with the given prefix
479 // for which Get() can retrieve data.
480 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
481         // Use a merge sort to find matching sets of X and recent/X.
482         dataL := s3Lister{
483                 Bucket:   v.bucket,
484                 Prefix:   prefix,
485                 PageSize: v.IndexPageSize,
486         }
487         recentL := s3Lister{
488                 Bucket:   v.bucket,
489                 Prefix:   "recent/" + prefix,
490                 PageSize: v.IndexPageSize,
491         }
492         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
493                 if data.Key >= "g" {
494                         // Conveniently, "recent/*" and "trash/*" are
495                         // lexically greater than all hex-encoded data
496                         // hashes, so stopping here avoids iterating
497                         // over all of them needlessly with dataL.
498                         break
499                 }
500                 if !v.isKeepBlock(data.Key) {
501                         continue
502                 }
503
504                 // stamp is the list entry we should use to report the
505                 // last-modified time for this data block: it will be
506                 // the recent/X entry if one exists, otherwise the
507                 // entry for the data block itself.
508                 stamp := data
509
510                 // Advance to the corresponding recent/X marker, if any
511                 for recent != nil {
512                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
513                                 recent = recentL.Next()
514                                 continue
515                         } else if cmp == 0 {
516                                 stamp = recent
517                                 recent = recentL.Next()
518                                 break
519                         } else {
520                                 // recent/X marker is missing: we'll
521                                 // use the timestamp on the data
522                                 // object.
523                                 break
524                         }
525                 }
526                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
527                 if err != nil {
528                         return err
529                 }
530                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
531         }
532         return nil
533 }
534
535 // Trash a Keep block.
536 func (v *S3Volume) Trash(loc string) error {
537         if v.ReadOnly {
538                 return MethodDisabledError
539         }
540         if t, err := v.Mtime(loc); err != nil {
541                 return err
542         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
543                 return nil
544         }
545         if theConfig.TrashLifetime == 0 {
546                 if !s3UnsafeDelete {
547                         return ErrS3TrashDisabled
548                 }
549                 v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps)
550                 return v.translateError(v.tickErr(v.bucket.Del(loc)))
551         }
552         err := v.checkRaceWindow(loc)
553         if err != nil {
554                 return err
555         }
556         err = v.safeCopy("trash/"+loc, loc)
557         if err != nil {
558                 return err
559         }
560         v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps)
561         return v.translateError(v.tickErr(v.bucket.Del(loc)))
562 }
563
564 // checkRaceWindow returns a non-nil error if trash/loc is, or might
565 // be, in the race window (i.e., it's not safe to trash loc).
566 func (v *S3Volume) checkRaceWindow(loc string) error {
567         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
568         resp, err := v.bucket.Head("trash/"+loc, nil)
569         err = v.translateError(v.tickErr(err))
570         if os.IsNotExist(err) {
571                 // OK, trash/X doesn't exist so we're not in the race
572                 // window
573                 return nil
574         } else if err != nil {
575                 // Error looking up trash/X. We don't know whether
576                 // we're in the race window
577                 return err
578         }
579         t, err := v.lastModified(resp)
580         if err != nil {
581                 // Can't parse timestamp
582                 return err
583         }
584         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
585         if safeWindow <= 0 {
586                 // We can't count on "touch trash/X" to prolong
587                 // trash/X's lifetime. The new timestamp might not
588                 // become visible until now+raceWindow, and EmptyTrash
589                 // is allowed to delete trash/X before then.
590                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
591         }
592         // trash/X exists, but it won't be eligible for deletion until
593         // after now+raceWindow, so it's safe to overwrite it.
594         return nil
595 }
596
597 // safeCopy calls PutCopy, and checks the response to make sure the
598 // copy succeeded and updated the timestamp on the destination object
599 // (PutCopy returns 200 OK if the request was received, even if the
600 // copy failed).
601 func (v *S3Volume) safeCopy(dst, src string) error {
602         v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
603         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
604                 ContentType:       "application/octet-stream",
605                 MetadataDirective: "REPLACE",
606         }, v.bucket.Name+"/"+src)
607         err = v.translateError(v.tickErr(err))
608         if err != nil {
609                 return err
610         }
611         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
612                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
613         } else if time.Now().Sub(t) > maxClockSkew {
614                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
615         }
616         return nil
617 }
618
619 // Get the LastModified header from resp, and parse it as RFC1123 or
620 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
621 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
622         s := resp.Header.Get("Last-Modified")
623         t, err = time.Parse(time.RFC1123, s)
624         if err != nil && s != "" {
625                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
626                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
627                 // as required by HTTP spec. If it's not a valid HTTP
628                 // header value, it's probably AWS (or s3test) giving
629                 // us a nearly-RFC1123 timestamp.
630                 t, err = time.Parse(nearlyRFC1123, s)
631         }
632         return
633 }
634
635 // Untrash moves block from trash back into store
636 func (v *S3Volume) Untrash(loc string) error {
637         err := v.safeCopy(loc, "trash/"+loc)
638         if err != nil {
639                 return err
640         }
641         v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
642         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
643         return v.translateError(v.tickErr(err))
644 }
645
646 // Status returns a *VolumeStatus representing the current in-use
647 // storage capacity and a fake available capacity that doesn't make
648 // the volume seem full or nearly-full.
649 func (v *S3Volume) Status() *VolumeStatus {
650         return &VolumeStatus{
651                 DeviceNum: 1,
652                 BytesFree: BlockSize * 1000,
653                 BytesUsed: 1,
654         }
655 }
656
657 // IOStatus implements InternalStatser.
658 func (v *S3Volume) InternalStats() interface{} {
659         return &v.bucketStats
660 }
661
662 // String implements fmt.Stringer.
663 func (v *S3Volume) String() string {
664         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
665 }
666
667 // Writable returns false if all future Put, Mtime, and Delete calls
668 // are expected to fail.
669 func (v *S3Volume) Writable() bool {
670         return !v.ReadOnly
671 }
672
673 // Replication returns the storage redundancy of the underlying
674 // device. Configured via command line flag.
675 func (v *S3Volume) Replication() int {
676         return v.S3Replication
677 }
678
679 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
680
681 func (v *S3Volume) isKeepBlock(s string) bool {
682         return s3KeepBlockRegexp.MatchString(s)
683 }
684
685 // fixRace(X) is called when "recent/X" exists but "X" doesn't
686 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
687 // there was a race between Put and Trash, fixRace recovers from the
688 // race by Untrashing the block.
689 func (v *S3Volume) fixRace(loc string) bool {
690         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
691         trash, err := v.bucket.Head("trash/"+loc, nil)
692         if err != nil {
693                 if !os.IsNotExist(v.translateError(v.tickErr(err))) {
694                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
695                 }
696                 return false
697         }
698         trashTime, err := v.lastModified(trash)
699         if err != nil {
700                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
701                 return false
702         }
703
704         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
705         recent, err := v.bucket.Head("recent/"+loc, nil)
706         if err != nil {
707                 v.tickErr(err)
708                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
709                 return false
710         }
711         recentTime, err := v.lastModified(recent)
712         if err != nil {
713                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
714                 return false
715         }
716
717         ageWhenTrashed := trashTime.Sub(recentTime)
718         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
719                 // No evidence of a race: block hasn't been written
720                 // since it became eligible for Trash. No fix needed.
721                 return false
722         }
723
724         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
725         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
726         err = v.safeCopy(loc, "trash/"+loc)
727         if err != nil {
728                 log.Printf("error: fixRace: %s", err)
729                 return false
730         }
731         return true
732 }
733
734 func (v *S3Volume) translateError(err error) error {
735         switch err := err.(type) {
736         case *s3.Error:
737                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
738                         strings.Contains(err.Error(), "Not Found") {
739                         return os.ErrNotExist
740                 }
741                 // Other 404 errors like NoSuchVersion and
742                 // NoSuchBucket are different problems which should
743                 // get called out downstream, so we don't convert them
744                 // to os.ErrNotExist.
745         }
746         return err
747 }
748
749 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
750 // and deletes them from the volume.
751 func (v *S3Volume) EmptyTrash() {
752         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
753
754         // Use a merge sort to find matching sets of trash/X and recent/X.
755         trashL := s3Lister{
756                 Bucket:   v.bucket,
757                 Prefix:   "trash/",
758                 PageSize: v.IndexPageSize,
759         }
760         // Define "ready to delete" as "...when EmptyTrash started".
761         startT := time.Now()
762         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
763                 loc := trash.Key[6:]
764                 if !v.isKeepBlock(loc) {
765                         continue
766                 }
767                 bytesInTrash += trash.Size
768                 blocksInTrash++
769
770                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
771                 if err != nil {
772                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
773                         continue
774                 }
775                 v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
776                 recent, err := v.bucket.Head("recent/"+loc, nil)
777                 if err != nil && os.IsNotExist(v.translateError(v.tickErr(err))) {
778                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
779                         err = v.Untrash(loc)
780                         if err != nil {
781                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
782                         }
783                         continue
784                 } else if err != nil {
785                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
786                         continue
787                 }
788                 recentT, err := v.lastModified(recent)
789                 if err != nil {
790                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
791                         continue
792                 }
793                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
794                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
795                                 // recent/loc is too old to protect
796                                 // loc from being Trashed again during
797                                 // the raceWindow that starts if we
798                                 // delete trash/X now.
799                                 //
800                                 // Note this means (TrashCheckInterval
801                                 // < BlobSignatureTTL - raceWindow) is
802                                 // necessary to avoid starvation.
803                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
804                                 v.fixRace(loc)
805                                 v.Touch(loc)
806                                 continue
807                         }
808                         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
809                         _, err := v.bucket.Head(loc, nil)
810                         if os.IsNotExist(v.tickErr(err)) {
811                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
812                                 v.fixRace(loc)
813                                 continue
814                         } else if err != nil {
815                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
816                                 continue
817                         }
818                 }
819                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
820                         continue
821                 }
822                 v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps)
823                 err = v.bucket.Del(trash.Key)
824                 if err != nil {
825                         v.tickErr(err)
826                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
827                         continue
828                 }
829                 bytesDeleted += trash.Size
830                 blocksDeleted++
831
832                 v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
833                 _, err = v.bucket.Head(loc, nil)
834                 if os.IsNotExist(v.tickErr(err)) {
835                         v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps)
836                         err = v.bucket.Del("recent/" + loc)
837                         if err != nil {
838                                 v.tickErr(err)
839                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
840                         }
841                 } else if err != nil {
842                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
843                 }
844         }
845         if err := trashL.Error(); err != nil {
846                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
847         }
848         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
849 }
850
851 func (v *S3Volume) tick(counters ...*uint64) {
852         for _, counter := range counters {
853                 atomic.AddUint64(counter, 1)
854         }
855 }
856
857 func (v *S3Volume) tickErr(err error) error {
858         if err == nil {
859                 return nil
860         }
861         atomic.AddUint64(&v.bucketStats.Errors, 1)
862         errStr := fmt.Sprintf("%T", err)
863         if err, ok := err.(*s3.Error); ok {
864                 errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
865         }
866         v.bucketStats.lock.Lock()
867         if v.bucketStats.ErrorCodes == nil {
868                 v.bucketStats.ErrorCodes = make(map[string]uint64)
869         }
870         v.bucketStats.ErrorCodes[errStr]++
871         v.bucketStats.lock.Unlock()
872         return err
873 }
874
875 func (v *S3Volume) tickInBytes(n uint64) {
876         atomic.AddUint64(&v.bucketStats.InBytes, n)
877 }
878
879 func (v *S3Volume) tickOutBytes(n uint64) {
880         atomic.AddUint64(&v.bucketStats.OutBytes, n)
881 }
882
883 type s3Lister struct {
884         Bucket     *s3.Bucket
885         Prefix     string
886         PageSize   int
887         nextMarker string
888         buf        []s3.Key
889         err        error
890 }
891
892 // First fetches the first page and returns the first item. It returns
893 // nil if the response is the empty set or an error occurs.
894 func (lister *s3Lister) First() *s3.Key {
895         lister.getPage()
896         return lister.pop()
897 }
898
899 // Next returns the next item, fetching the next page if necessary. It
900 // returns nil if the last available item has already been fetched, or
901 // an error occurs.
902 func (lister *s3Lister) Next() *s3.Key {
903         if len(lister.buf) == 0 && lister.nextMarker != "" {
904                 lister.getPage()
905         }
906         return lister.pop()
907 }
908
909 // Return the most recent error encountered by First or Next.
910 func (lister *s3Lister) Error() error {
911         return lister.err
912 }
913
914 func (lister *s3Lister) getPage() {
915         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
916         lister.nextMarker = ""
917         if err != nil {
918                 lister.err = err
919                 return
920         }
921         if resp.IsTruncated {
922                 lister.nextMarker = resp.NextMarker
923         }
924         lister.buf = make([]s3.Key, 0, len(resp.Contents))
925         for _, key := range resp.Contents {
926                 if !strings.HasPrefix(key.Key, lister.Prefix) {
927                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
928                         continue
929                 }
930                 lister.buf = append(lister.buf, key)
931         }
932 }
933
934 func (lister *s3Lister) pop() (k *s3.Key) {
935         if len(lister.buf) > 0 {
936                 k = &lister.buf[0]
937                 lister.buf = lister.buf[1:]
938         }
939         return
940 }