12538: Refactor crunch-run shutdown
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/AdRoll/goamz/aws"
25         "github.com/AdRoll/goamz/s3"
26         log "github.com/Sirupsen/logrus"
27 )
28
29 const (
30         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
31         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
32 )
33
34 var (
35         // ErrS3TrashDisabled is returned by Trash if that operation
36         // is impossible with the current config.
37         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
38
39         s3AccessKeyFile string
40         s3SecretKeyFile string
41         s3RegionName    string
42         s3Endpoint      string
43         s3Replication   int
44         s3UnsafeDelete  bool
45         s3RaceWindow    time.Duration
46
47         s3ACL = s3.Private
48 )
49
50 const (
51         maxClockSkew  = 600 * time.Second
52         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
53 )
54
55 type s3VolumeAdder struct {
56         *Config
57 }
58
59 // String implements flag.Value
60 func (s *s3VolumeAdder) String() string {
61         return "-"
62 }
63
64 func (s *s3VolumeAdder) Set(bucketName string) error {
65         if bucketName == "" {
66                 return fmt.Errorf("no container name given")
67         }
68         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
69                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
70         }
71         if deprecated.flagSerializeIO {
72                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
73         }
74         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
75                 Bucket:        bucketName,
76                 AccessKeyFile: s3AccessKeyFile,
77                 SecretKeyFile: s3SecretKeyFile,
78                 Endpoint:      s3Endpoint,
79                 Region:        s3RegionName,
80                 RaceWindow:    arvados.Duration(s3RaceWindow),
81                 S3Replication: s3Replication,
82                 UnsafeDelete:  s3UnsafeDelete,
83                 ReadOnly:      deprecated.flagReadonly,
84                 IndexPageSize: 1000,
85         })
86         return nil
87 }
88
89 func s3regions() (okList []string) {
90         for r := range aws.Regions {
91                 okList = append(okList, r)
92         }
93         return
94 }
95
96 func init() {
97         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
98
99         flag.Var(&s3VolumeAdder{theConfig},
100                 "s3-bucket-volume",
101                 "Use the given bucket as a storage volume. Can be given multiple times.")
102         flag.StringVar(
103                 &s3RegionName,
104                 "s3-region",
105                 "",
106                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
107         flag.StringVar(
108                 &s3Endpoint,
109                 "s3-endpoint",
110                 "",
111                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
112         flag.StringVar(
113                 &s3AccessKeyFile,
114                 "s3-access-key-file",
115                 "",
116                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
117         flag.StringVar(
118                 &s3SecretKeyFile,
119                 "s3-secret-key-file",
120                 "",
121                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
122         flag.DurationVar(
123                 &s3RaceWindow,
124                 "s3-race-window",
125                 24*time.Hour,
126                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
127         flag.IntVar(
128                 &s3Replication,
129                 "s3-replication",
130                 2,
131                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
132         flag.BoolVar(
133                 &s3UnsafeDelete,
134                 "s3-unsafe-delete",
135                 false,
136                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
137 }
138
139 // S3Volume implements Volume using an S3 bucket.
140 type S3Volume struct {
141         AccessKeyFile      string
142         SecretKeyFile      string
143         Endpoint           string
144         Region             string
145         Bucket             string
146         LocationConstraint bool
147         IndexPageSize      int
148         S3Replication      int
149         ConnectTimeout     arvados.Duration
150         ReadTimeout        arvados.Duration
151         RaceWindow         arvados.Duration
152         ReadOnly           bool
153         UnsafeDelete       bool
154
155         bucket *s3bucket
156
157         startOnce sync.Once
158 }
159
160 // Examples implements VolumeWithExamples.
161 func (*S3Volume) Examples() []Volume {
162         return []Volume{
163                 &S3Volume{
164                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
165                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
166                         Endpoint:       "",
167                         Region:         "us-east-1",
168                         Bucket:         "example-bucket-name",
169                         IndexPageSize:  1000,
170                         S3Replication:  2,
171                         RaceWindow:     arvados.Duration(24 * time.Hour),
172                         ConnectTimeout: arvados.Duration(time.Minute),
173                         ReadTimeout:    arvados.Duration(5 * time.Minute),
174                 },
175                 &S3Volume{
176                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
177                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
178                         Endpoint:       "https://storage.googleapis.com",
179                         Region:         "",
180                         Bucket:         "example-bucket-name",
181                         IndexPageSize:  1000,
182                         S3Replication:  2,
183                         RaceWindow:     arvados.Duration(24 * time.Hour),
184                         ConnectTimeout: arvados.Duration(time.Minute),
185                         ReadTimeout:    arvados.Duration(5 * time.Minute),
186                 },
187         }
188 }
189
190 // Type implements Volume.
191 func (*S3Volume) Type() string {
192         return "S3"
193 }
194
195 // Start populates private fields and verifies the configuration is
196 // valid.
197 func (v *S3Volume) Start() error {
198         region, ok := aws.Regions[v.Region]
199         if v.Endpoint == "" {
200                 if !ok {
201                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
202                 }
203         } else if ok {
204                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
205                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
206         } else {
207                 region = aws.Region{
208                         Name:                 v.Region,
209                         S3Endpoint:           v.Endpoint,
210                         S3LocationConstraint: v.LocationConstraint,
211                 }
212         }
213
214         var err error
215         var auth aws.Auth
216         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
217         if err != nil {
218                 return err
219         }
220         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
221         if err != nil {
222                 return err
223         }
224
225         // Zero timeouts mean "wait forever", which is a bad
226         // default. Default to long timeouts instead.
227         if v.ConnectTimeout == 0 {
228                 v.ConnectTimeout = s3DefaultConnectTimeout
229         }
230         if v.ReadTimeout == 0 {
231                 v.ReadTimeout = s3DefaultReadTimeout
232         }
233
234         client := s3.New(auth, region)
235         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
236         client.ReadTimeout = time.Duration(v.ReadTimeout)
237         v.bucket = &s3bucket{
238                 Bucket: &s3.Bucket{
239                         S3:   client,
240                         Name: v.Bucket,
241                 },
242         }
243         return nil
244 }
245
246 // DeviceID returns a globally unique ID for the storage bucket.
247 func (v *S3Volume) DeviceID() string {
248         return "s3://" + v.Endpoint + "/" + v.Bucket
249 }
250
251 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
252         ready := make(chan bool)
253         go func() {
254                 rdr, err = v.getReader(loc)
255                 close(ready)
256         }()
257         select {
258         case <-ready:
259                 return
260         case <-ctx.Done():
261                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
262                 go func() {
263                         <-ready
264                         if err == nil {
265                                 rdr.Close()
266                         }
267                 }()
268                 return nil, ctx.Err()
269         }
270 }
271
272 // getReader wraps (Bucket)GetReader.
273 //
274 // In situations where (Bucket)GetReader would fail because the block
275 // disappeared in a Trash race, getReader calls fixRace to recover the
276 // data, and tries again.
277 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
278         rdr, err = v.bucket.GetReader(loc)
279         err = v.translateError(err)
280         if err == nil || !os.IsNotExist(err) {
281                 return
282         }
283
284         _, err = v.bucket.Head("recent/"+loc, nil)
285         err = v.translateError(err)
286         if err != nil {
287                 // If we can't read recent/X, there's no point in
288                 // trying fixRace. Give up.
289                 return
290         }
291         if !v.fixRace(loc) {
292                 err = os.ErrNotExist
293                 return
294         }
295
296         rdr, err = v.bucket.GetReader(loc)
297         if err != nil {
298                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
299                 err = v.translateError(err)
300         }
301         return
302 }
303
304 // Get a block: copy the block data into buf, and return the number of
305 // bytes copied.
306 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
307         rdr, err := v.getReaderWithContext(ctx, loc)
308         if err != nil {
309                 return 0, err
310         }
311
312         var n int
313         ready := make(chan bool)
314         go func() {
315                 defer close(ready)
316
317                 defer rdr.Close()
318                 n, err = io.ReadFull(rdr, buf)
319
320                 switch err {
321                 case nil, io.EOF, io.ErrUnexpectedEOF:
322                         err = nil
323                 default:
324                         err = v.translateError(err)
325                 }
326         }()
327         select {
328         case <-ctx.Done():
329                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
330                 rdr.Close()
331                 // Must wait for ReadFull to return, to ensure it
332                 // doesn't write to buf after we return.
333                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
334                 <-ready
335                 return 0, ctx.Err()
336         case <-ready:
337                 return n, err
338         }
339 }
340
341 // Compare the given data with the stored data.
342 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
343         errChan := make(chan error, 1)
344         go func() {
345                 _, err := v.bucket.Head("recent/"+loc, nil)
346                 errChan <- err
347         }()
348         var err error
349         select {
350         case <-ctx.Done():
351                 return ctx.Err()
352         case err = <-errChan:
353         }
354         if err != nil {
355                 // Checking for "loc" itself here would interfere with
356                 // future GET requests.
357                 //
358                 // On AWS, if X doesn't exist, a HEAD or GET request
359                 // for X causes X's non-existence to be cached. Thus,
360                 // if we test for X, then create X and return a
361                 // signature to our client, the client might still get
362                 // 404 from all keepstores when trying to read it.
363                 //
364                 // To avoid this, we avoid doing HEAD X or GET X until
365                 // we know X has been written.
366                 //
367                 // Note that X might exist even though recent/X
368                 // doesn't: for example, the response to HEAD recent/X
369                 // might itself come from a stale cache. In such
370                 // cases, we will return a false negative and
371                 // PutHandler might needlessly create another replica
372                 // on a different volume. That's not ideal, but it's
373                 // better than passing the eventually-consistent
374                 // problem on to our clients.
375                 return v.translateError(err)
376         }
377         rdr, err := v.getReaderWithContext(ctx, loc)
378         if err != nil {
379                 return err
380         }
381         defer rdr.Close()
382         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
383 }
384
385 // Put writes a block.
386 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
387         if v.ReadOnly {
388                 return MethodDisabledError
389         }
390         var opts s3.Options
391         size := len(block)
392         if size > 0 {
393                 md5, err := hex.DecodeString(loc)
394                 if err != nil {
395                         return err
396                 }
397                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
398         }
399
400         // Send the block data through a pipe, so that (if we need to)
401         // we can close the pipe early and abandon our PutReader()
402         // goroutine, without worrying about PutReader() accessing our
403         // block buffer after we release it.
404         bufr, bufw := io.Pipe()
405         go func() {
406                 io.Copy(bufw, bytes.NewReader(block))
407                 bufw.Close()
408         }()
409
410         var err error
411         ready := make(chan bool)
412         go func() {
413                 defer func() {
414                         if ctx.Err() != nil {
415                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
416                         }
417                 }()
418                 defer close(ready)
419                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
420                 if err != nil {
421                         return
422                 }
423                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
424         }()
425         select {
426         case <-ctx.Done():
427                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
428                 // Our pipe might be stuck in Write(), waiting for
429                 // io.Copy() to read. If so, un-stick it. This means
430                 // PutReader will get corrupt data, but that's OK: the
431                 // size and MD5 won't match, so the write will fail.
432                 go io.Copy(ioutil.Discard, bufr)
433                 // CloseWithError() will return once pending I/O is done.
434                 bufw.CloseWithError(ctx.Err())
435                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
436                 return ctx.Err()
437         case <-ready:
438                 return v.translateError(err)
439         }
440 }
441
442 // Touch sets the timestamp for the given locator to the current time.
443 func (v *S3Volume) Touch(loc string) error {
444         if v.ReadOnly {
445                 return MethodDisabledError
446         }
447         _, err := v.bucket.Head(loc, nil)
448         err = v.translateError(err)
449         if os.IsNotExist(err) && v.fixRace(loc) {
450                 // The data object got trashed in a race, but fixRace
451                 // rescued it.
452         } else if err != nil {
453                 return err
454         }
455         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
456         return v.translateError(err)
457 }
458
459 // Mtime returns the stored timestamp for the given locator.
460 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
461         _, err := v.bucket.Head(loc, nil)
462         if err != nil {
463                 return zeroTime, v.translateError(err)
464         }
465         resp, err := v.bucket.Head("recent/"+loc, nil)
466         err = v.translateError(err)
467         if os.IsNotExist(err) {
468                 // The data object X exists, but recent/X is missing.
469                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
470                 if err != nil {
471                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
472                         return zeroTime, v.translateError(err)
473                 }
474                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
475                 resp, err = v.bucket.Head("recent/"+loc, nil)
476                 if err != nil {
477                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
478                         return zeroTime, v.translateError(err)
479                 }
480         } else if err != nil {
481                 // HEAD recent/X failed for some other reason.
482                 return zeroTime, err
483         }
484         return v.lastModified(resp)
485 }
486
487 // IndexTo writes a complete list of locators with the given prefix
488 // for which Get() can retrieve data.
489 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
490         // Use a merge sort to find matching sets of X and recent/X.
491         dataL := s3Lister{
492                 Bucket:   v.bucket.Bucket,
493                 Prefix:   prefix,
494                 PageSize: v.IndexPageSize,
495         }
496         recentL := s3Lister{
497                 Bucket:   v.bucket.Bucket,
498                 Prefix:   "recent/" + prefix,
499                 PageSize: v.IndexPageSize,
500         }
501         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
502         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
503         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
504                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
505                 if data.Key >= "g" {
506                         // Conveniently, "recent/*" and "trash/*" are
507                         // lexically greater than all hex-encoded data
508                         // hashes, so stopping here avoids iterating
509                         // over all of them needlessly with dataL.
510                         break
511                 }
512                 if !v.isKeepBlock(data.Key) {
513                         continue
514                 }
515
516                 // stamp is the list entry we should use to report the
517                 // last-modified time for this data block: it will be
518                 // the recent/X entry if one exists, otherwise the
519                 // entry for the data block itself.
520                 stamp := data
521
522                 // Advance to the corresponding recent/X marker, if any
523                 for recent != nil {
524                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
525                                 recent = recentL.Next()
526                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
527                                 continue
528                         } else if cmp == 0 {
529                                 stamp = recent
530                                 recent = recentL.Next()
531                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
532                                 break
533                         } else {
534                                 // recent/X marker is missing: we'll
535                                 // use the timestamp on the data
536                                 // object.
537                                 break
538                         }
539                 }
540                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
541                 if err != nil {
542                         return err
543                 }
544                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
545         }
546         return nil
547 }
548
549 // Trash a Keep block.
550 func (v *S3Volume) Trash(loc string) error {
551         if v.ReadOnly {
552                 return MethodDisabledError
553         }
554         if t, err := v.Mtime(loc); err != nil {
555                 return err
556         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
557                 return nil
558         }
559         if theConfig.TrashLifetime == 0 {
560                 if !s3UnsafeDelete {
561                         return ErrS3TrashDisabled
562                 }
563                 return v.translateError(v.bucket.Del(loc))
564         }
565         err := v.checkRaceWindow(loc)
566         if err != nil {
567                 return err
568         }
569         err = v.safeCopy("trash/"+loc, loc)
570         if err != nil {
571                 return err
572         }
573         return v.translateError(v.bucket.Del(loc))
574 }
575
576 // checkRaceWindow returns a non-nil error if trash/loc is, or might
577 // be, in the race window (i.e., it's not safe to trash loc).
578 func (v *S3Volume) checkRaceWindow(loc string) error {
579         resp, err := v.bucket.Head("trash/"+loc, nil)
580         err = v.translateError(err)
581         if os.IsNotExist(err) {
582                 // OK, trash/X doesn't exist so we're not in the race
583                 // window
584                 return nil
585         } else if err != nil {
586                 // Error looking up trash/X. We don't know whether
587                 // we're in the race window
588                 return err
589         }
590         t, err := v.lastModified(resp)
591         if err != nil {
592                 // Can't parse timestamp
593                 return err
594         }
595         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
596         if safeWindow <= 0 {
597                 // We can't count on "touch trash/X" to prolong
598                 // trash/X's lifetime. The new timestamp might not
599                 // become visible until now+raceWindow, and EmptyTrash
600                 // is allowed to delete trash/X before then.
601                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
602         }
603         // trash/X exists, but it won't be eligible for deletion until
604         // after now+raceWindow, so it's safe to overwrite it.
605         return nil
606 }
607
608 // safeCopy calls PutCopy, and checks the response to make sure the
609 // copy succeeded and updated the timestamp on the destination object
610 // (PutCopy returns 200 OK if the request was received, even if the
611 // copy failed).
612 func (v *S3Volume) safeCopy(dst, src string) error {
613         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
614                 ContentType:       "application/octet-stream",
615                 MetadataDirective: "REPLACE",
616         }, v.bucket.Name+"/"+src)
617         err = v.translateError(err)
618         if err != nil {
619                 return err
620         }
621         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
622                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
623         } else if time.Now().Sub(t) > maxClockSkew {
624                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
625         }
626         return nil
627 }
628
629 // Get the LastModified header from resp, and parse it as RFC1123 or
630 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
631 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
632         s := resp.Header.Get("Last-Modified")
633         t, err = time.Parse(time.RFC1123, s)
634         if err != nil && s != "" {
635                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
636                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
637                 // as required by HTTP spec. If it's not a valid HTTP
638                 // header value, it's probably AWS (or s3test) giving
639                 // us a nearly-RFC1123 timestamp.
640                 t, err = time.Parse(nearlyRFC1123, s)
641         }
642         return
643 }
644
645 // Untrash moves block from trash back into store
646 func (v *S3Volume) Untrash(loc string) error {
647         err := v.safeCopy(loc, "trash/"+loc)
648         if err != nil {
649                 return err
650         }
651         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
652         return v.translateError(err)
653 }
654
655 // Status returns a *VolumeStatus representing the current in-use
656 // storage capacity and a fake available capacity that doesn't make
657 // the volume seem full or nearly-full.
658 func (v *S3Volume) Status() *VolumeStatus {
659         return &VolumeStatus{
660                 DeviceNum: 1,
661                 BytesFree: BlockSize * 1000,
662                 BytesUsed: 1,
663         }
664 }
665
666 // InternalStats returns bucket I/O and API call counters.
667 func (v *S3Volume) InternalStats() interface{} {
668         return &v.bucket.stats
669 }
670
671 // String implements fmt.Stringer.
672 func (v *S3Volume) String() string {
673         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
674 }
675
676 // Writable returns false if all future Put, Mtime, and Delete calls
677 // are expected to fail.
678 func (v *S3Volume) Writable() bool {
679         return !v.ReadOnly
680 }
681
682 // Replication returns the storage redundancy of the underlying
683 // device. Configured via command line flag.
684 func (v *S3Volume) Replication() int {
685         return v.S3Replication
686 }
687
688 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
689
690 func (v *S3Volume) isKeepBlock(s string) bool {
691         return s3KeepBlockRegexp.MatchString(s)
692 }
693
694 // fixRace(X) is called when "recent/X" exists but "X" doesn't
695 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
696 // there was a race between Put and Trash, fixRace recovers from the
697 // race by Untrashing the block.
698 func (v *S3Volume) fixRace(loc string) bool {
699         trash, err := v.bucket.Head("trash/"+loc, nil)
700         if err != nil {
701                 if !os.IsNotExist(v.translateError(err)) {
702                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
703                 }
704                 return false
705         }
706         trashTime, err := v.lastModified(trash)
707         if err != nil {
708                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
709                 return false
710         }
711
712         recent, err := v.bucket.Head("recent/"+loc, nil)
713         if err != nil {
714                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
715                 return false
716         }
717         recentTime, err := v.lastModified(recent)
718         if err != nil {
719                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
720                 return false
721         }
722
723         ageWhenTrashed := trashTime.Sub(recentTime)
724         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
725                 // No evidence of a race: block hasn't been written
726                 // since it became eligible for Trash. No fix needed.
727                 return false
728         }
729
730         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
731         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
732         err = v.safeCopy(loc, "trash/"+loc)
733         if err != nil {
734                 log.Printf("error: fixRace: %s", err)
735                 return false
736         }
737         return true
738 }
739
740 func (v *S3Volume) translateError(err error) error {
741         switch err := err.(type) {
742         case *s3.Error:
743                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
744                         strings.Contains(err.Error(), "Not Found") {
745                         return os.ErrNotExist
746                 }
747                 // Other 404 errors like NoSuchVersion and
748                 // NoSuchBucket are different problems which should
749                 // get called out downstream, so we don't convert them
750                 // to os.ErrNotExist.
751         }
752         return err
753 }
754
755 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
756 // and deletes them from the volume.
757 func (v *S3Volume) EmptyTrash() {
758         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
759
760         // Use a merge sort to find matching sets of trash/X and recent/X.
761         trashL := s3Lister{
762                 Bucket:   v.bucket.Bucket,
763                 Prefix:   "trash/",
764                 PageSize: v.IndexPageSize,
765         }
766         // Define "ready to delete" as "...when EmptyTrash started".
767         startT := time.Now()
768         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
769                 loc := trash.Key[6:]
770                 if !v.isKeepBlock(loc) {
771                         continue
772                 }
773                 bytesInTrash += trash.Size
774                 blocksInTrash++
775
776                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
777                 if err != nil {
778                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
779                         continue
780                 }
781                 recent, err := v.bucket.Head("recent/"+loc, nil)
782                 if err != nil && os.IsNotExist(v.translateError(err)) {
783                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
784                         err = v.Untrash(loc)
785                         if err != nil {
786                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
787                         }
788                         continue
789                 } else if err != nil {
790                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
791                         continue
792                 }
793                 recentT, err := v.lastModified(recent)
794                 if err != nil {
795                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
796                         continue
797                 }
798                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
799                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
800                                 // recent/loc is too old to protect
801                                 // loc from being Trashed again during
802                                 // the raceWindow that starts if we
803                                 // delete trash/X now.
804                                 //
805                                 // Note this means (TrashCheckInterval
806                                 // < BlobSignatureTTL - raceWindow) is
807                                 // necessary to avoid starvation.
808                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
809                                 v.fixRace(loc)
810                                 v.Touch(loc)
811                                 continue
812                         }
813                         _, err := v.bucket.Head(loc, nil)
814                         if os.IsNotExist(err) {
815                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
816                                 v.fixRace(loc)
817                                 continue
818                         } else if err != nil {
819                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
820                                 continue
821                         }
822                 }
823                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
824                         continue
825                 }
826                 err = v.bucket.Del(trash.Key)
827                 if err != nil {
828                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
829                         continue
830                 }
831                 bytesDeleted += trash.Size
832                 blocksDeleted++
833
834                 _, err = v.bucket.Head(loc, nil)
835                 if os.IsNotExist(err) {
836                         err = v.bucket.Del("recent/" + loc)
837                         if err != nil {
838                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
839                         }
840                 } else if err != nil {
841                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
842                 }
843         }
844         if err := trashL.Error(); err != nil {
845                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
846         }
847         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
848 }
849
850 type s3Lister struct {
851         Bucket     *s3.Bucket
852         Prefix     string
853         PageSize   int
854         nextMarker string
855         buf        []s3.Key
856         err        error
857 }
858
859 // First fetches the first page and returns the first item. It returns
860 // nil if the response is the empty set or an error occurs.
861 func (lister *s3Lister) First() *s3.Key {
862         lister.getPage()
863         return lister.pop()
864 }
865
866 // Next returns the next item, fetching the next page if necessary. It
867 // returns nil if the last available item has already been fetched, or
868 // an error occurs.
869 func (lister *s3Lister) Next() *s3.Key {
870         if len(lister.buf) == 0 && lister.nextMarker != "" {
871                 lister.getPage()
872         }
873         return lister.pop()
874 }
875
876 // Return the most recent error encountered by First or Next.
877 func (lister *s3Lister) Error() error {
878         return lister.err
879 }
880
881 func (lister *s3Lister) getPage() {
882         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
883         lister.nextMarker = ""
884         if err != nil {
885                 lister.err = err
886                 return
887         }
888         if resp.IsTruncated {
889                 lister.nextMarker = resp.NextMarker
890         }
891         lister.buf = make([]s3.Key, 0, len(resp.Contents))
892         for _, key := range resp.Contents {
893                 if !strings.HasPrefix(key.Key, lister.Prefix) {
894                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
895                         continue
896                 }
897                 lister.buf = append(lister.buf, key)
898         }
899 }
900
901 func (lister *s3Lister) pop() (k *s3.Key) {
902         if len(lister.buf) > 0 {
903                 k = &lister.buf[0]
904                 lister.buf = lister.buf[1:]
905         }
906         return
907 }
908
909 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
910 type s3bucket struct {
911         *s3.Bucket
912         stats s3bucketStats
913 }
914
915 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
916         rdr, err := b.Bucket.GetReader(path)
917         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
918         b.stats.TickErr(err)
919         return NewCountingReader(rdr, b.stats.TickInBytes), err
920 }
921
922 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
923         resp, err := b.Bucket.Head(path, headers)
924         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
925         b.stats.TickErr(err)
926         return resp, err
927 }
928
929 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
930         if length == 0 {
931                 // goamz will only send Content-Length: 0 when reader
932                 // is nil due to net.http.Request.ContentLength
933                 // behavior.  Otherwise, Content-Length header is
934                 // omitted which will cause some S3 services
935                 // (including AWS and Ceph RadosGW) to fail to create
936                 // empty objects.
937                 r = nil
938         } else {
939                 r = NewCountingReader(r, b.stats.TickOutBytes)
940         }
941         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
942         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
943         b.stats.TickErr(err)
944         return err
945 }
946
947 func (b *s3bucket) Del(path string) error {
948         err := b.Bucket.Del(path)
949         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
950         b.stats.TickErr(err)
951         return err
952 }
953
954 type s3bucketStats struct {
955         statsTicker
956         Ops     uint64
957         GetOps  uint64
958         PutOps  uint64
959         HeadOps uint64
960         DelOps  uint64
961         ListOps uint64
962 }
963
964 func (s *s3bucketStats) TickErr(err error) {
965         if err == nil {
966                 return
967         }
968         errType := fmt.Sprintf("%T", err)
969         if err, ok := err.(*s3.Error); ok {
970                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
971         }
972         s.statsTicker.TickErr(err, errType)
973 }