c52f616d1ad98a81872ee41c93aacebd6881d8b6
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/base64"
7         "encoding/hex"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "regexp"
16         "strings"
17         "sync"
18         "sync/atomic"
19         "time"
20
21         "git.curoverse.com/arvados.git/sdk/go/arvados"
22         "github.com/AdRoll/goamz/aws"
23         "github.com/AdRoll/goamz/s3"
24 )
25
26 const (
27         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
28         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
29 )
30
31 var (
32         // ErrS3TrashDisabled is returned by Trash if that operation
33         // is impossible with the current config.
34         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
35
36         s3AccessKeyFile string
37         s3SecretKeyFile string
38         s3RegionName    string
39         s3Endpoint      string
40         s3Replication   int
41         s3UnsafeDelete  bool
42         s3RaceWindow    time.Duration
43
44         s3ACL = s3.Private
45 )
46
47 const (
48         maxClockSkew  = 600 * time.Second
49         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
50 )
51
52 type s3VolumeAdder struct {
53         *Config
54 }
55
56 // String implements flag.Value
57 func (s *s3VolumeAdder) String() string {
58         return "-"
59 }
60
61 func (s *s3VolumeAdder) Set(bucketName string) error {
62         if bucketName == "" {
63                 return fmt.Errorf("no container name given")
64         }
65         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
66                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
67         }
68         if deprecated.flagSerializeIO {
69                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
70         }
71         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
72                 Bucket:        bucketName,
73                 AccessKeyFile: s3AccessKeyFile,
74                 SecretKeyFile: s3SecretKeyFile,
75                 Endpoint:      s3Endpoint,
76                 Region:        s3RegionName,
77                 RaceWindow:    arvados.Duration(s3RaceWindow),
78                 S3Replication: s3Replication,
79                 UnsafeDelete:  s3UnsafeDelete,
80                 ReadOnly:      deprecated.flagReadonly,
81                 IndexPageSize: 1000,
82         })
83         return nil
84 }
85
86 func s3regions() (okList []string) {
87         for r := range aws.Regions {
88                 okList = append(okList, r)
89         }
90         return
91 }
92
93 func init() {
94         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
95
96         flag.Var(&s3VolumeAdder{theConfig},
97                 "s3-bucket-volume",
98                 "Use the given bucket as a storage volume. Can be given multiple times.")
99         flag.StringVar(
100                 &s3RegionName,
101                 "s3-region",
102                 "",
103                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
104         flag.StringVar(
105                 &s3Endpoint,
106                 "s3-endpoint",
107                 "",
108                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
109         flag.StringVar(
110                 &s3AccessKeyFile,
111                 "s3-access-key-file",
112                 "",
113                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
114         flag.StringVar(
115                 &s3SecretKeyFile,
116                 "s3-secret-key-file",
117                 "",
118                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
119         flag.DurationVar(
120                 &s3RaceWindow,
121                 "s3-race-window",
122                 24*time.Hour,
123                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
124         flag.IntVar(
125                 &s3Replication,
126                 "s3-replication",
127                 2,
128                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
129         flag.BoolVar(
130                 &s3UnsafeDelete,
131                 "s3-unsafe-delete",
132                 false,
133                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
134 }
135
136 // S3Volume implements Volume using an S3 bucket.
137 type S3Volume struct {
138         AccessKeyFile      string
139         SecretKeyFile      string
140         Endpoint           string
141         Region             string
142         Bucket             string
143         LocationConstraint bool
144         IndexPageSize      int
145         S3Replication      int
146         ConnectTimeout     arvados.Duration
147         ReadTimeout        arvados.Duration
148         RaceWindow         arvados.Duration
149         ReadOnly           bool
150         UnsafeDelete       bool
151
152         bucket      *s3.Bucket
153         bucketStats bucketStats
154         volumeStats ioStats
155
156         startOnce sync.Once
157 }
158
159 type bucketStats struct {
160         Errors   uint64
161         Ops      uint64
162         GetOps   uint64
163         PutOps   uint64
164         HeadOps  uint64
165         DelOps   uint64
166         InBytes  uint64
167         OutBytes uint64
168 }
169
170 // Examples implements VolumeWithExamples.
171 func (*S3Volume) Examples() []Volume {
172         return []Volume{
173                 &S3Volume{
174                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
175                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
176                         Endpoint:       "",
177                         Region:         "us-east-1",
178                         Bucket:         "example-bucket-name",
179                         IndexPageSize:  1000,
180                         S3Replication:  2,
181                         RaceWindow:     arvados.Duration(24 * time.Hour),
182                         ConnectTimeout: arvados.Duration(time.Minute),
183                         ReadTimeout:    arvados.Duration(5 * time.Minute),
184                 },
185                 &S3Volume{
186                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
187                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
188                         Endpoint:       "https://storage.googleapis.com",
189                         Region:         "",
190                         Bucket:         "example-bucket-name",
191                         IndexPageSize:  1000,
192                         S3Replication:  2,
193                         RaceWindow:     arvados.Duration(24 * time.Hour),
194                         ConnectTimeout: arvados.Duration(time.Minute),
195                         ReadTimeout:    arvados.Duration(5 * time.Minute),
196                 },
197         }
198 }
199
200 // Type implements Volume.
201 func (*S3Volume) Type() string {
202         return "S3"
203 }
204
205 // Start populates private fields and verifies the configuration is
206 // valid.
207 func (v *S3Volume) Start() error {
208         region, ok := aws.Regions[v.Region]
209         if v.Endpoint == "" {
210                 if !ok {
211                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
212                 }
213         } else if ok {
214                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
215                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
216         } else {
217                 region = aws.Region{
218                         Name:                 v.Region,
219                         S3Endpoint:           v.Endpoint,
220                         S3LocationConstraint: v.LocationConstraint,
221                 }
222         }
223
224         var err error
225         var auth aws.Auth
226         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
227         if err != nil {
228                 return err
229         }
230         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
231         if err != nil {
232                 return err
233         }
234
235         // Zero timeouts mean "wait forever", which is a bad
236         // default. Default to long timeouts instead.
237         if v.ConnectTimeout == 0 {
238                 v.ConnectTimeout = s3DefaultConnectTimeout
239         }
240         if v.ReadTimeout == 0 {
241                 v.ReadTimeout = s3DefaultReadTimeout
242         }
243
244         client := s3.New(auth, region)
245         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
246         client.ReadTimeout = time.Duration(v.ReadTimeout)
247         v.bucket = &s3.Bucket{
248                 S3:   client,
249                 Name: v.Bucket,
250         }
251         return nil
252 }
253
254 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
255         ready := make(chan bool)
256         go func() {
257                 rdr, err = v.getReader(loc)
258                 close(ready)
259         }()
260         select {
261         case <-ready:
262                 return
263         case <-ctx.Done():
264                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
265                 go func() {
266                         <-ready
267                         if err == nil {
268                                 rdr.Close()
269                         }
270                 }()
271                 return nil, ctx.Err()
272         }
273 }
274
275 // getReader wraps (Bucket)GetReader.
276 //
277 // In situations where (Bucket)GetReader would fail because the block
278 // disappeared in a Trash race, getReader calls fixRace to recover the
279 // data, and tries again.
280 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
281         v.tick(&v.bucketStats.Ops, &v.bucketStats.GetOps)
282         rdr, err = v.bucket.GetReader(loc)
283         err = v.translateError(err)
284         if err == nil {
285                 rdr = NewCountingReader(rdr, v.tickInBytes)
286                 return
287         } else if !os.IsNotExist(v.tickErr(err)) {
288                 return
289         }
290
291         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
292         _, err = v.bucket.Head("recent/"+loc, nil)
293         err = v.translateError(v.tickErr(err))
294         if err != nil {
295                 // If we can't read recent/X, there's no point in
296                 // trying fixRace. Give up.
297                 return
298         }
299         if !v.fixRace(loc) {
300                 err = os.ErrNotExist
301                 return
302         }
303
304         v.tick(&v.bucketStats.Ops, &v.bucketStats.GetOps)
305         rdr, err = v.bucket.GetReader(loc)
306         if err != nil {
307                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
308                 err = v.translateError(v.tickErr(err))
309         }
310         rdr = NewCountingReader(rdr, v.tickInBytes)
311         return
312 }
313
314 // Get a block: copy the block data into buf, and return the number of
315 // bytes copied.
316 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
317         rdr, err := v.getReaderWithContext(ctx, loc)
318         if err != nil {
319                 return 0, err
320         }
321
322         var n int
323         ready := make(chan bool)
324         go func() {
325                 defer close(ready)
326
327                 defer rdr.Close()
328                 n, err = io.ReadFull(rdr, buf)
329
330                 switch err {
331                 case nil, io.EOF, io.ErrUnexpectedEOF:
332                         err = nil
333                 default:
334                         err = v.translateError(err)
335                 }
336         }()
337         select {
338         case <-ctx.Done():
339                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
340                 rdr.Close()
341                 // Must wait for ReadFull to return, to ensure it
342                 // doesn't write to buf after we return.
343                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
344                 <-ready
345                 return 0, ctx.Err()
346         case <-ready:
347                 return n, err
348         }
349 }
350
351 // Compare the given data with the stored data.
352 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
353         rdr, err := v.getReaderWithContext(ctx, loc)
354         if err != nil {
355                 return err
356         }
357         defer rdr.Close()
358         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
359 }
360
361 // Put writes a block.
362 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
363         if v.ReadOnly {
364                 return MethodDisabledError
365         }
366         var opts s3.Options
367         size := len(block)
368         if size > 0 {
369                 md5, err := hex.DecodeString(loc)
370                 if err != nil {
371                         return err
372                 }
373                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
374         }
375
376         // Send the block data through a pipe, so that (if we need to)
377         // we can close the pipe early and abandon our PutReader()
378         // goroutine, without worrying about PutReader() accessing our
379         // block buffer after we release it.
380         bufr, bufw := io.Pipe()
381         go func() {
382                 io.Copy(bufw, bytes.NewReader(block))
383                 bufw.Close()
384         }()
385
386         var err error
387         ready := make(chan bool)
388         go func() {
389                 defer func() {
390                         if ctx.Err() != nil {
391                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
392                         }
393                 }()
394                 defer close(ready)
395                 v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
396                 rdr := NewCountingReader(bufr, v.tickOutBytes)
397                 err = v.bucket.PutReader(loc, rdr, int64(size), "application/octet-stream", s3ACL, opts)
398                 if err != nil {
399                         v.tickErr(err)
400                         return
401                 }
402                 v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
403                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
404                 v.tickErr(err)
405         }()
406         select {
407         case <-ctx.Done():
408                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
409                 // Our pipe might be stuck in Write(), waiting for
410                 // io.Copy() to read. If so, un-stick it. This means
411                 // PutReader will get corrupt data, but that's OK: the
412                 // size and MD5 won't match, so the write will fail.
413                 go io.Copy(ioutil.Discard, bufr)
414                 // CloseWithError() will return once pending I/O is done.
415                 bufw.CloseWithError(ctx.Err())
416                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
417                 return ctx.Err()
418         case <-ready:
419                 return v.translateError(err)
420         }
421 }
422
423 // Touch sets the timestamp for the given locator to the current time.
424 func (v *S3Volume) Touch(loc string) error {
425         if v.ReadOnly {
426                 return MethodDisabledError
427         }
428         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
429         _, err := v.bucket.Head(loc, nil)
430         err = v.translateError(v.tickErr(err))
431         if os.IsNotExist(err) && v.fixRace(loc) {
432                 // The data object got trashed in a race, but fixRace
433                 // rescued it.
434         } else if err != nil {
435                 return err
436         }
437         v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
438         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
439         return v.translateError(v.tickErr(err))
440 }
441
442 // Mtime returns the stored timestamp for the given locator.
443 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
444         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
445         _, err := v.bucket.Head(loc, nil)
446         if err != nil {
447                 return zeroTime, v.translateError(v.tickErr(err))
448         }
449         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
450         resp, err := v.bucket.Head("recent/"+loc, nil)
451         err = v.translateError(v.tickErr(err))
452         if os.IsNotExist(err) {
453                 // The data object X exists, but recent/X is missing.
454                 v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
455                 err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
456                 if err != nil {
457                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
458                         return zeroTime, v.translateError(v.tickErr(err))
459                 }
460                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
461                 v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
462                 resp, err = v.bucket.Head("recent/"+loc, nil)
463                 if err != nil {
464                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
465                         return zeroTime, v.translateError(v.tickErr(err))
466                 }
467         } else if err != nil {
468                 // HEAD recent/X failed for some other reason.
469                 return zeroTime, err
470         }
471         return v.lastModified(resp)
472 }
473
474 // IndexTo writes a complete list of locators with the given prefix
475 // for which Get() can retrieve data.
476 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
477         // Use a merge sort to find matching sets of X and recent/X.
478         dataL := s3Lister{
479                 Bucket:   v.bucket,
480                 Prefix:   prefix,
481                 PageSize: v.IndexPageSize,
482         }
483         recentL := s3Lister{
484                 Bucket:   v.bucket,
485                 Prefix:   "recent/" + prefix,
486                 PageSize: v.IndexPageSize,
487         }
488         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
489                 if data.Key >= "g" {
490                         // Conveniently, "recent/*" and "trash/*" are
491                         // lexically greater than all hex-encoded data
492                         // hashes, so stopping here avoids iterating
493                         // over all of them needlessly with dataL.
494                         break
495                 }
496                 if !v.isKeepBlock(data.Key) {
497                         continue
498                 }
499
500                 // stamp is the list entry we should use to report the
501                 // last-modified time for this data block: it will be
502                 // the recent/X entry if one exists, otherwise the
503                 // entry for the data block itself.
504                 stamp := data
505
506                 // Advance to the corresponding recent/X marker, if any
507                 for recent != nil {
508                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
509                                 recent = recentL.Next()
510                                 continue
511                         } else if cmp == 0 {
512                                 stamp = recent
513                                 recent = recentL.Next()
514                                 break
515                         } else {
516                                 // recent/X marker is missing: we'll
517                                 // use the timestamp on the data
518                                 // object.
519                                 break
520                         }
521                 }
522                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
523                 if err != nil {
524                         return err
525                 }
526                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
527         }
528         return nil
529 }
530
531 // Trash a Keep block.
532 func (v *S3Volume) Trash(loc string) error {
533         if v.ReadOnly {
534                 return MethodDisabledError
535         }
536         if t, err := v.Mtime(loc); err != nil {
537                 return err
538         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
539                 return nil
540         }
541         if theConfig.TrashLifetime == 0 {
542                 if !s3UnsafeDelete {
543                         return ErrS3TrashDisabled
544                 }
545                 v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps)
546                 return v.translateError(v.tickErr(v.bucket.Del(loc)))
547         }
548         err := v.checkRaceWindow(loc)
549         if err != nil {
550                 return err
551         }
552         err = v.safeCopy("trash/"+loc, loc)
553         if err != nil {
554                 return err
555         }
556         v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps)
557         return v.translateError(v.tickErr(v.bucket.Del(loc)))
558 }
559
560 // checkRaceWindow returns a non-nil error if trash/loc is, or might
561 // be, in the race window (i.e., it's not safe to trash loc).
562 func (v *S3Volume) checkRaceWindow(loc string) error {
563         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
564         resp, err := v.bucket.Head("trash/"+loc, nil)
565         err = v.translateError(v.tickErr(err))
566         if os.IsNotExist(err) {
567                 // OK, trash/X doesn't exist so we're not in the race
568                 // window
569                 return nil
570         } else if err != nil {
571                 // Error looking up trash/X. We don't know whether
572                 // we're in the race window
573                 return err
574         }
575         t, err := v.lastModified(resp)
576         if err != nil {
577                 // Can't parse timestamp
578                 return err
579         }
580         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
581         if safeWindow <= 0 {
582                 // We can't count on "touch trash/X" to prolong
583                 // trash/X's lifetime. The new timestamp might not
584                 // become visible until now+raceWindow, and EmptyTrash
585                 // is allowed to delete trash/X before then.
586                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
587         }
588         // trash/X exists, but it won't be eligible for deletion until
589         // after now+raceWindow, so it's safe to overwrite it.
590         return nil
591 }
592
593 // safeCopy calls PutCopy, and checks the response to make sure the
594 // copy succeeded and updated the timestamp on the destination object
595 // (PutCopy returns 200 OK if the request was received, even if the
596 // copy failed).
597 func (v *S3Volume) safeCopy(dst, src string) error {
598         v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
599         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
600                 ContentType:       "application/octet-stream",
601                 MetadataDirective: "REPLACE",
602         }, v.bucket.Name+"/"+src)
603         err = v.translateError(v.tickErr(err))
604         if err != nil {
605                 return err
606         }
607         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
608                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
609         } else if time.Now().Sub(t) > maxClockSkew {
610                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
611         }
612         return nil
613 }
614
615 // Get the LastModified header from resp, and parse it as RFC1123 or
616 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
617 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
618         s := resp.Header.Get("Last-Modified")
619         t, err = time.Parse(time.RFC1123, s)
620         if err != nil && s != "" {
621                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
622                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
623                 // as required by HTTP spec. If it's not a valid HTTP
624                 // header value, it's probably AWS (or s3test) giving
625                 // us a nearly-RFC1123 timestamp.
626                 t, err = time.Parse(nearlyRFC1123, s)
627         }
628         return
629 }
630
631 // Untrash moves block from trash back into store
632 func (v *S3Volume) Untrash(loc string) error {
633         err := v.safeCopy(loc, "trash/"+loc)
634         if err != nil {
635                 return err
636         }
637         v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps)
638         err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
639         return v.translateError(v.tickErr(err))
640 }
641
642 // Status returns a *VolumeStatus representing the current in-use
643 // storage capacity and a fake available capacity that doesn't make
644 // the volume seem full or nearly-full.
645 func (v *S3Volume) Status() *VolumeStatus {
646         return &VolumeStatus{
647                 DeviceNum: 1,
648                 BytesFree: BlockSize * 1000,
649                 BytesUsed: 1,
650         }
651 }
652
653 // IOStatus implements InternalStatser.
654 func (v *S3Volume) InternalStats() interface{} {
655         return &v.bucketStats
656 }
657
658 // String implements fmt.Stringer.
659 func (v *S3Volume) String() string {
660         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
661 }
662
663 // Writable returns false if all future Put, Mtime, and Delete calls
664 // are expected to fail.
665 func (v *S3Volume) Writable() bool {
666         return !v.ReadOnly
667 }
668
669 // Replication returns the storage redundancy of the underlying
670 // device. Configured via command line flag.
671 func (v *S3Volume) Replication() int {
672         return v.S3Replication
673 }
674
675 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
676
677 func (v *S3Volume) isKeepBlock(s string) bool {
678         return s3KeepBlockRegexp.MatchString(s)
679 }
680
681 // fixRace(X) is called when "recent/X" exists but "X" doesn't
682 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
683 // there was a race between Put and Trash, fixRace recovers from the
684 // race by Untrashing the block.
685 func (v *S3Volume) fixRace(loc string) bool {
686         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
687         trash, err := v.bucket.Head("trash/"+loc, nil)
688         if err != nil {
689                 if !os.IsNotExist(v.translateError(v.tickErr(err))) {
690                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
691                 }
692                 return false
693         }
694         trashTime, err := v.lastModified(trash)
695         if err != nil {
696                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
697                 return false
698         }
699
700         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
701         recent, err := v.bucket.Head("recent/"+loc, nil)
702         if err != nil {
703                 v.tickErr(err)
704                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
705                 return false
706         }
707         recentTime, err := v.lastModified(recent)
708         if err != nil {
709                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
710                 return false
711         }
712
713         ageWhenTrashed := trashTime.Sub(recentTime)
714         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
715                 // No evidence of a race: block hasn't been written
716                 // since it became eligible for Trash. No fix needed.
717                 return false
718         }
719
720         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
721         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
722         err = v.safeCopy(loc, "trash/"+loc)
723         if err != nil {
724                 log.Printf("error: fixRace: %s", err)
725                 return false
726         }
727         return true
728 }
729
730 func (v *S3Volume) translateError(err error) error {
731         switch err := err.(type) {
732         case *s3.Error:
733                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
734                         strings.Contains(err.Error(), "Not Found") {
735                         return os.ErrNotExist
736                 }
737                 // Other 404 errors like NoSuchVersion and
738                 // NoSuchBucket are different problems which should
739                 // get called out downstream, so we don't convert them
740                 // to os.ErrNotExist.
741         }
742         return err
743 }
744
745 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
746 // and deletes them from the volume.
747 func (v *S3Volume) EmptyTrash() {
748         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
749
750         // Use a merge sort to find matching sets of trash/X and recent/X.
751         trashL := s3Lister{
752                 Bucket:   v.bucket,
753                 Prefix:   "trash/",
754                 PageSize: v.IndexPageSize,
755         }
756         // Define "ready to delete" as "...when EmptyTrash started".
757         startT := time.Now()
758         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
759                 loc := trash.Key[6:]
760                 if !v.isKeepBlock(loc) {
761                         continue
762                 }
763                 bytesInTrash += trash.Size
764                 blocksInTrash++
765
766                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
767                 if err != nil {
768                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
769                         continue
770                 }
771                 v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
772                 recent, err := v.bucket.Head("recent/"+loc, nil)
773                 if err != nil && os.IsNotExist(v.translateError(v.tickErr(err))) {
774                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
775                         err = v.Untrash(loc)
776                         if err != nil {
777                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
778                         }
779                         continue
780                 } else if err != nil {
781                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
782                         continue
783                 }
784                 recentT, err := v.lastModified(recent)
785                 if err != nil {
786                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
787                         continue
788                 }
789                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
790                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
791                                 // recent/loc is too old to protect
792                                 // loc from being Trashed again during
793                                 // the raceWindow that starts if we
794                                 // delete trash/X now.
795                                 //
796                                 // Note this means (TrashCheckInterval
797                                 // < BlobSignatureTTL - raceWindow) is
798                                 // necessary to avoid starvation.
799                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
800                                 v.fixRace(loc)
801                                 v.Touch(loc)
802                                 continue
803                         }
804                         v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
805                         _, err := v.bucket.Head(loc, nil)
806                         if os.IsNotExist(v.tickErr(err)) {
807                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
808                                 v.fixRace(loc)
809                                 continue
810                         } else if err != nil {
811                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
812                                 continue
813                         }
814                 }
815                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
816                         continue
817                 }
818                 v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps)
819                 err = v.bucket.Del(trash.Key)
820                 if err != nil {
821                         v.tickErr(err)
822                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
823                         continue
824                 }
825                 bytesDeleted += trash.Size
826                 blocksDeleted++
827
828                 v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps)
829                 _, err = v.bucket.Head(loc, nil)
830                 if os.IsNotExist(v.tickErr(err)) {
831                         v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps)
832                         err = v.bucket.Del("recent/" + loc)
833                         if err != nil {
834                                 v.tickErr(err)
835                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
836                         }
837                 } else if err != nil {
838                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
839                 }
840         }
841         if err := trashL.Error(); err != nil {
842                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
843         }
844         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
845 }
846
847 func (v *S3Volume) tick(counters ...*uint64) {
848         for _, counter := range counters {
849                 atomic.AddUint64(counter, 1)
850         }
851 }
852
853 func (v *S3Volume) tickErr(err error) error {
854         if err != nil {
855                 atomic.AddUint64(&v.bucketStats.Errors, 1)
856         }
857         return err
858 }
859
860 func (v *S3Volume) tickInBytes(n uint64) {
861         atomic.AddUint64(&v.bucketStats.InBytes, n)
862 }
863
864 func (v *S3Volume) tickOutBytes(n uint64) {
865         atomic.AddUint64(&v.bucketStats.OutBytes, n)
866 }
867
868 type s3Lister struct {
869         Bucket     *s3.Bucket
870         Prefix     string
871         PageSize   int
872         nextMarker string
873         buf        []s3.Key
874         err        error
875 }
876
877 // First fetches the first page and returns the first item. It returns
878 // nil if the response is the empty set or an error occurs.
879 func (lister *s3Lister) First() *s3.Key {
880         lister.getPage()
881         return lister.pop()
882 }
883
884 // Next returns the next item, fetching the next page if necessary. It
885 // returns nil if the last available item has already been fetched, or
886 // an error occurs.
887 func (lister *s3Lister) Next() *s3.Key {
888         if len(lister.buf) == 0 && lister.nextMarker != "" {
889                 lister.getPage()
890         }
891         return lister.pop()
892 }
893
894 // Return the most recent error encountered by First or Next.
895 func (lister *s3Lister) Error() error {
896         return lister.err
897 }
898
899 func (lister *s3Lister) getPage() {
900         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
901         lister.nextMarker = ""
902         if err != nil {
903                 lister.err = err
904                 return
905         }
906         if resp.IsTruncated {
907                 lister.nextMarker = resp.NextMarker
908         }
909         lister.buf = make([]s3.Key, 0, len(resp.Contents))
910         for _, key := range resp.Contents {
911                 if !strings.HasPrefix(key.Key, lister.Prefix) {
912                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
913                         continue
914                 }
915                 lister.buf = append(lister.buf, key)
916         }
917 }
918
919 func (lister *s3Lister) pop() (k *s3.Key) {
920         if len(lister.buf) > 0 {
921                 k = &lister.buf[0]
922                 lister.buf = lister.buf[1:]
923         }
924         return
925 }