Merge branch '13142-dispatch-nfiles'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/AdRoll/goamz/aws"
25         "github.com/AdRoll/goamz/s3"
26 )
27
28 const (
29         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
30         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
31 )
32
33 var (
34         // ErrS3TrashDisabled is returned by Trash if that operation
35         // is impossible with the current config.
36         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
37
38         s3AccessKeyFile string
39         s3SecretKeyFile string
40         s3RegionName    string
41         s3Endpoint      string
42         s3Replication   int
43         s3UnsafeDelete  bool
44         s3RaceWindow    time.Duration
45
46         s3ACL = s3.Private
47
48         zeroTime time.Time
49 )
50
51 const (
52         maxClockSkew  = 600 * time.Second
53         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
54 )
55
56 type s3VolumeAdder struct {
57         *Config
58 }
59
60 // String implements flag.Value
61 func (s *s3VolumeAdder) String() string {
62         return "-"
63 }
64
65 func (s *s3VolumeAdder) Set(bucketName string) error {
66         if bucketName == "" {
67                 return fmt.Errorf("no container name given")
68         }
69         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
70                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
71         }
72         if deprecated.flagSerializeIO {
73                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
74         }
75         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
76                 Bucket:        bucketName,
77                 AccessKeyFile: s3AccessKeyFile,
78                 SecretKeyFile: s3SecretKeyFile,
79                 Endpoint:      s3Endpoint,
80                 Region:        s3RegionName,
81                 RaceWindow:    arvados.Duration(s3RaceWindow),
82                 S3Replication: s3Replication,
83                 UnsafeDelete:  s3UnsafeDelete,
84                 ReadOnly:      deprecated.flagReadonly,
85                 IndexPageSize: 1000,
86         })
87         return nil
88 }
89
90 func s3regions() (okList []string) {
91         for r := range aws.Regions {
92                 okList = append(okList, r)
93         }
94         return
95 }
96
97 func init() {
98         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
99
100         flag.Var(&s3VolumeAdder{theConfig},
101                 "s3-bucket-volume",
102                 "Use the given bucket as a storage volume. Can be given multiple times.")
103         flag.StringVar(
104                 &s3RegionName,
105                 "s3-region",
106                 "",
107                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
108         flag.StringVar(
109                 &s3Endpoint,
110                 "s3-endpoint",
111                 "",
112                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
113         flag.StringVar(
114                 &s3AccessKeyFile,
115                 "s3-access-key-file",
116                 "",
117                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
118         flag.StringVar(
119                 &s3SecretKeyFile,
120                 "s3-secret-key-file",
121                 "",
122                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
123         flag.DurationVar(
124                 &s3RaceWindow,
125                 "s3-race-window",
126                 24*time.Hour,
127                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
128         flag.IntVar(
129                 &s3Replication,
130                 "s3-replication",
131                 2,
132                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
133         flag.BoolVar(
134                 &s3UnsafeDelete,
135                 "s3-unsafe-delete",
136                 false,
137                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
138 }
139
140 // S3Volume implements Volume using an S3 bucket.
141 type S3Volume struct {
142         AccessKeyFile      string
143         SecretKeyFile      string
144         Endpoint           string
145         Region             string
146         Bucket             string
147         LocationConstraint bool
148         IndexPageSize      int
149         S3Replication      int
150         ConnectTimeout     arvados.Duration
151         ReadTimeout        arvados.Duration
152         RaceWindow         arvados.Duration
153         ReadOnly           bool
154         UnsafeDelete       bool
155         StorageClasses     []string
156
157         bucket *s3bucket
158
159         startOnce sync.Once
160 }
161
162 // Examples implements VolumeWithExamples.
163 func (*S3Volume) Examples() []Volume {
164         return []Volume{
165                 &S3Volume{
166                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
167                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
168                         Endpoint:       "",
169                         Region:         "us-east-1",
170                         Bucket:         "example-bucket-name",
171                         IndexPageSize:  1000,
172                         S3Replication:  2,
173                         RaceWindow:     arvados.Duration(24 * time.Hour),
174                         ConnectTimeout: arvados.Duration(time.Minute),
175                         ReadTimeout:    arvados.Duration(5 * time.Minute),
176                 },
177                 &S3Volume{
178                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
179                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
180                         Endpoint:       "https://storage.googleapis.com",
181                         Region:         "",
182                         Bucket:         "example-bucket-name",
183                         IndexPageSize:  1000,
184                         S3Replication:  2,
185                         RaceWindow:     arvados.Duration(24 * time.Hour),
186                         ConnectTimeout: arvados.Duration(time.Minute),
187                         ReadTimeout:    arvados.Duration(5 * time.Minute),
188                 },
189         }
190 }
191
192 // Type implements Volume.
193 func (*S3Volume) Type() string {
194         return "S3"
195 }
196
197 // Start populates private fields and verifies the configuration is
198 // valid.
199 func (v *S3Volume) Start() error {
200         region, ok := aws.Regions[v.Region]
201         if v.Endpoint == "" {
202                 if !ok {
203                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
204                 }
205         } else if ok {
206                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
207                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
208         } else {
209                 region = aws.Region{
210                         Name:                 v.Region,
211                         S3Endpoint:           v.Endpoint,
212                         S3LocationConstraint: v.LocationConstraint,
213                 }
214         }
215
216         var err error
217         var auth aws.Auth
218         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
219         if err != nil {
220                 return err
221         }
222         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
223         if err != nil {
224                 return err
225         }
226
227         // Zero timeouts mean "wait forever", which is a bad
228         // default. Default to long timeouts instead.
229         if v.ConnectTimeout == 0 {
230                 v.ConnectTimeout = s3DefaultConnectTimeout
231         }
232         if v.ReadTimeout == 0 {
233                 v.ReadTimeout = s3DefaultReadTimeout
234         }
235
236         client := s3.New(auth, region)
237         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
238         client.ReadTimeout = time.Duration(v.ReadTimeout)
239         v.bucket = &s3bucket{
240                 Bucket: &s3.Bucket{
241                         S3:   client,
242                         Name: v.Bucket,
243                 },
244         }
245         return nil
246 }
247
248 // DeviceID returns a globally unique ID for the storage bucket.
249 func (v *S3Volume) DeviceID() string {
250         return "s3://" + v.Endpoint + "/" + v.Bucket
251 }
252
253 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
254         ready := make(chan bool)
255         go func() {
256                 rdr, err = v.getReader(loc)
257                 close(ready)
258         }()
259         select {
260         case <-ready:
261                 return
262         case <-ctx.Done():
263                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
264                 go func() {
265                         <-ready
266                         if err == nil {
267                                 rdr.Close()
268                         }
269                 }()
270                 return nil, ctx.Err()
271         }
272 }
273
274 // getReader wraps (Bucket)GetReader.
275 //
276 // In situations where (Bucket)GetReader would fail because the block
277 // disappeared in a Trash race, getReader calls fixRace to recover the
278 // data, and tries again.
279 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
280         rdr, err = v.bucket.GetReader(loc)
281         err = v.translateError(err)
282         if err == nil || !os.IsNotExist(err) {
283                 return
284         }
285
286         _, err = v.bucket.Head("recent/"+loc, nil)
287         err = v.translateError(err)
288         if err != nil {
289                 // If we can't read recent/X, there's no point in
290                 // trying fixRace. Give up.
291                 return
292         }
293         if !v.fixRace(loc) {
294                 err = os.ErrNotExist
295                 return
296         }
297
298         rdr, err = v.bucket.GetReader(loc)
299         if err != nil {
300                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
301                 err = v.translateError(err)
302         }
303         return
304 }
305
306 // Get a block: copy the block data into buf, and return the number of
307 // bytes copied.
308 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
309         rdr, err := v.getReaderWithContext(ctx, loc)
310         if err != nil {
311                 return 0, err
312         }
313
314         var n int
315         ready := make(chan bool)
316         go func() {
317                 defer close(ready)
318
319                 defer rdr.Close()
320                 n, err = io.ReadFull(rdr, buf)
321
322                 switch err {
323                 case nil, io.EOF, io.ErrUnexpectedEOF:
324                         err = nil
325                 default:
326                         err = v.translateError(err)
327                 }
328         }()
329         select {
330         case <-ctx.Done():
331                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
332                 rdr.Close()
333                 // Must wait for ReadFull to return, to ensure it
334                 // doesn't write to buf after we return.
335                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
336                 <-ready
337                 return 0, ctx.Err()
338         case <-ready:
339                 return n, err
340         }
341 }
342
343 // Compare the given data with the stored data.
344 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
345         errChan := make(chan error, 1)
346         go func() {
347                 _, err := v.bucket.Head("recent/"+loc, nil)
348                 errChan <- err
349         }()
350         var err error
351         select {
352         case <-ctx.Done():
353                 return ctx.Err()
354         case err = <-errChan:
355         }
356         if err != nil {
357                 // Checking for "loc" itself here would interfere with
358                 // future GET requests.
359                 //
360                 // On AWS, if X doesn't exist, a HEAD or GET request
361                 // for X causes X's non-existence to be cached. Thus,
362                 // if we test for X, then create X and return a
363                 // signature to our client, the client might still get
364                 // 404 from all keepstores when trying to read it.
365                 //
366                 // To avoid this, we avoid doing HEAD X or GET X until
367                 // we know X has been written.
368                 //
369                 // Note that X might exist even though recent/X
370                 // doesn't: for example, the response to HEAD recent/X
371                 // might itself come from a stale cache. In such
372                 // cases, we will return a false negative and
373                 // PutHandler might needlessly create another replica
374                 // on a different volume. That's not ideal, but it's
375                 // better than passing the eventually-consistent
376                 // problem on to our clients.
377                 return v.translateError(err)
378         }
379         rdr, err := v.getReaderWithContext(ctx, loc)
380         if err != nil {
381                 return err
382         }
383         defer rdr.Close()
384         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
385 }
386
387 // Put writes a block.
388 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
389         if v.ReadOnly {
390                 return MethodDisabledError
391         }
392         var opts s3.Options
393         size := len(block)
394         if size > 0 {
395                 md5, err := hex.DecodeString(loc)
396                 if err != nil {
397                         return err
398                 }
399                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
400         }
401
402         // Send the block data through a pipe, so that (if we need to)
403         // we can close the pipe early and abandon our PutReader()
404         // goroutine, without worrying about PutReader() accessing our
405         // block buffer after we release it.
406         bufr, bufw := io.Pipe()
407         go func() {
408                 io.Copy(bufw, bytes.NewReader(block))
409                 bufw.Close()
410         }()
411
412         var err error
413         ready := make(chan bool)
414         go func() {
415                 defer func() {
416                         if ctx.Err() != nil {
417                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
418                         }
419                 }()
420                 defer close(ready)
421                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
422                 if err != nil {
423                         return
424                 }
425                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
426         }()
427         select {
428         case <-ctx.Done():
429                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
430                 // Our pipe might be stuck in Write(), waiting for
431                 // io.Copy() to read. If so, un-stick it. This means
432                 // PutReader will get corrupt data, but that's OK: the
433                 // size and MD5 won't match, so the write will fail.
434                 go io.Copy(ioutil.Discard, bufr)
435                 // CloseWithError() will return once pending I/O is done.
436                 bufw.CloseWithError(ctx.Err())
437                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
438                 return ctx.Err()
439         case <-ready:
440                 return v.translateError(err)
441         }
442 }
443
444 // Touch sets the timestamp for the given locator to the current time.
445 func (v *S3Volume) Touch(loc string) error {
446         if v.ReadOnly {
447                 return MethodDisabledError
448         }
449         _, err := v.bucket.Head(loc, nil)
450         err = v.translateError(err)
451         if os.IsNotExist(err) && v.fixRace(loc) {
452                 // The data object got trashed in a race, but fixRace
453                 // rescued it.
454         } else if err != nil {
455                 return err
456         }
457         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
458         return v.translateError(err)
459 }
460
461 // Mtime returns the stored timestamp for the given locator.
462 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
463         _, err := v.bucket.Head(loc, nil)
464         if err != nil {
465                 return zeroTime, v.translateError(err)
466         }
467         resp, err := v.bucket.Head("recent/"+loc, nil)
468         err = v.translateError(err)
469         if os.IsNotExist(err) {
470                 // The data object X exists, but recent/X is missing.
471                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
472                 if err != nil {
473                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
474                         return zeroTime, v.translateError(err)
475                 }
476                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
477                 resp, err = v.bucket.Head("recent/"+loc, nil)
478                 if err != nil {
479                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
480                         return zeroTime, v.translateError(err)
481                 }
482         } else if err != nil {
483                 // HEAD recent/X failed for some other reason.
484                 return zeroTime, err
485         }
486         return v.lastModified(resp)
487 }
488
489 // IndexTo writes a complete list of locators with the given prefix
490 // for which Get() can retrieve data.
491 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
492         // Use a merge sort to find matching sets of X and recent/X.
493         dataL := s3Lister{
494                 Bucket:   v.bucket.Bucket,
495                 Prefix:   prefix,
496                 PageSize: v.IndexPageSize,
497         }
498         recentL := s3Lister{
499                 Bucket:   v.bucket.Bucket,
500                 Prefix:   "recent/" + prefix,
501                 PageSize: v.IndexPageSize,
502         }
503         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
504         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
505         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
506                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
507                 if data.Key >= "g" {
508                         // Conveniently, "recent/*" and "trash/*" are
509                         // lexically greater than all hex-encoded data
510                         // hashes, so stopping here avoids iterating
511                         // over all of them needlessly with dataL.
512                         break
513                 }
514                 if !v.isKeepBlock(data.Key) {
515                         continue
516                 }
517
518                 // stamp is the list entry we should use to report the
519                 // last-modified time for this data block: it will be
520                 // the recent/X entry if one exists, otherwise the
521                 // entry for the data block itself.
522                 stamp := data
523
524                 // Advance to the corresponding recent/X marker, if any
525                 for recent != nil {
526                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
527                                 recent = recentL.Next()
528                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
529                                 continue
530                         } else if cmp == 0 {
531                                 stamp = recent
532                                 recent = recentL.Next()
533                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
534                                 break
535                         } else {
536                                 // recent/X marker is missing: we'll
537                                 // use the timestamp on the data
538                                 // object.
539                                 break
540                         }
541                 }
542                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
543                 if err != nil {
544                         return err
545                 }
546                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
547         }
548         return nil
549 }
550
551 // Trash a Keep block.
552 func (v *S3Volume) Trash(loc string) error {
553         if v.ReadOnly {
554                 return MethodDisabledError
555         }
556         if t, err := v.Mtime(loc); err != nil {
557                 return err
558         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
559                 return nil
560         }
561         if theConfig.TrashLifetime == 0 {
562                 if !s3UnsafeDelete {
563                         return ErrS3TrashDisabled
564                 }
565                 return v.translateError(v.bucket.Del(loc))
566         }
567         err := v.checkRaceWindow(loc)
568         if err != nil {
569                 return err
570         }
571         err = v.safeCopy("trash/"+loc, loc)
572         if err != nil {
573                 return err
574         }
575         return v.translateError(v.bucket.Del(loc))
576 }
577
578 // checkRaceWindow returns a non-nil error if trash/loc is, or might
579 // be, in the race window (i.e., it's not safe to trash loc).
580 func (v *S3Volume) checkRaceWindow(loc string) error {
581         resp, err := v.bucket.Head("trash/"+loc, nil)
582         err = v.translateError(err)
583         if os.IsNotExist(err) {
584                 // OK, trash/X doesn't exist so we're not in the race
585                 // window
586                 return nil
587         } else if err != nil {
588                 // Error looking up trash/X. We don't know whether
589                 // we're in the race window
590                 return err
591         }
592         t, err := v.lastModified(resp)
593         if err != nil {
594                 // Can't parse timestamp
595                 return err
596         }
597         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
598         if safeWindow <= 0 {
599                 // We can't count on "touch trash/X" to prolong
600                 // trash/X's lifetime. The new timestamp might not
601                 // become visible until now+raceWindow, and EmptyTrash
602                 // is allowed to delete trash/X before then.
603                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
604         }
605         // trash/X exists, but it won't be eligible for deletion until
606         // after now+raceWindow, so it's safe to overwrite it.
607         return nil
608 }
609
610 // safeCopy calls PutCopy, and checks the response to make sure the
611 // copy succeeded and updated the timestamp on the destination object
612 // (PutCopy returns 200 OK if the request was received, even if the
613 // copy failed).
614 func (v *S3Volume) safeCopy(dst, src string) error {
615         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
616                 ContentType:       "application/octet-stream",
617                 MetadataDirective: "REPLACE",
618         }, v.bucket.Name+"/"+src)
619         err = v.translateError(err)
620         if err != nil {
621                 return err
622         }
623         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
624                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
625         } else if time.Now().Sub(t) > maxClockSkew {
626                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
627         }
628         return nil
629 }
630
631 // Get the LastModified header from resp, and parse it as RFC1123 or
632 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
633 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
634         s := resp.Header.Get("Last-Modified")
635         t, err = time.Parse(time.RFC1123, s)
636         if err != nil && s != "" {
637                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
638                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
639                 // as required by HTTP spec. If it's not a valid HTTP
640                 // header value, it's probably AWS (or s3test) giving
641                 // us a nearly-RFC1123 timestamp.
642                 t, err = time.Parse(nearlyRFC1123, s)
643         }
644         return
645 }
646
647 // Untrash moves block from trash back into store
648 func (v *S3Volume) Untrash(loc string) error {
649         err := v.safeCopy(loc, "trash/"+loc)
650         if err != nil {
651                 return err
652         }
653         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
654         return v.translateError(err)
655 }
656
657 // Status returns a *VolumeStatus representing the current in-use
658 // storage capacity and a fake available capacity that doesn't make
659 // the volume seem full or nearly-full.
660 func (v *S3Volume) Status() *VolumeStatus {
661         return &VolumeStatus{
662                 DeviceNum: 1,
663                 BytesFree: BlockSize * 1000,
664                 BytesUsed: 1,
665         }
666 }
667
668 // InternalStats returns bucket I/O and API call counters.
669 func (v *S3Volume) InternalStats() interface{} {
670         return &v.bucket.stats
671 }
672
673 // String implements fmt.Stringer.
674 func (v *S3Volume) String() string {
675         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
676 }
677
678 // Writable returns false if all future Put, Mtime, and Delete calls
679 // are expected to fail.
680 func (v *S3Volume) Writable() bool {
681         return !v.ReadOnly
682 }
683
684 // Replication returns the storage redundancy of the underlying
685 // device. Configured via command line flag.
686 func (v *S3Volume) Replication() int {
687         return v.S3Replication
688 }
689
690 // GetStorageClasses implements Volume
691 func (v *S3Volume) GetStorageClasses() []string {
692         return v.StorageClasses
693 }
694
695 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
696
697 func (v *S3Volume) isKeepBlock(s string) bool {
698         return s3KeepBlockRegexp.MatchString(s)
699 }
700
701 // fixRace(X) is called when "recent/X" exists but "X" doesn't
702 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
703 // there was a race between Put and Trash, fixRace recovers from the
704 // race by Untrashing the block.
705 func (v *S3Volume) fixRace(loc string) bool {
706         trash, err := v.bucket.Head("trash/"+loc, nil)
707         if err != nil {
708                 if !os.IsNotExist(v.translateError(err)) {
709                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
710                 }
711                 return false
712         }
713         trashTime, err := v.lastModified(trash)
714         if err != nil {
715                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
716                 return false
717         }
718
719         recent, err := v.bucket.Head("recent/"+loc, nil)
720         if err != nil {
721                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
722                 return false
723         }
724         recentTime, err := v.lastModified(recent)
725         if err != nil {
726                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
727                 return false
728         }
729
730         ageWhenTrashed := trashTime.Sub(recentTime)
731         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
732                 // No evidence of a race: block hasn't been written
733                 // since it became eligible for Trash. No fix needed.
734                 return false
735         }
736
737         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
738         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
739         err = v.safeCopy(loc, "trash/"+loc)
740         if err != nil {
741                 log.Printf("error: fixRace: %s", err)
742                 return false
743         }
744         return true
745 }
746
747 func (v *S3Volume) translateError(err error) error {
748         switch err := err.(type) {
749         case *s3.Error:
750                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
751                         strings.Contains(err.Error(), "Not Found") {
752                         return os.ErrNotExist
753                 }
754                 // Other 404 errors like NoSuchVersion and
755                 // NoSuchBucket are different problems which should
756                 // get called out downstream, so we don't convert them
757                 // to os.ErrNotExist.
758         }
759         return err
760 }
761
762 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
763 // and deletes them from the volume.
764 func (v *S3Volume) EmptyTrash() {
765         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
766
767         // Use a merge sort to find matching sets of trash/X and recent/X.
768         trashL := s3Lister{
769                 Bucket:   v.bucket.Bucket,
770                 Prefix:   "trash/",
771                 PageSize: v.IndexPageSize,
772         }
773         // Define "ready to delete" as "...when EmptyTrash started".
774         startT := time.Now()
775         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
776                 loc := trash.Key[6:]
777                 if !v.isKeepBlock(loc) {
778                         continue
779                 }
780                 bytesInTrash += trash.Size
781                 blocksInTrash++
782
783                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
784                 if err != nil {
785                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
786                         continue
787                 }
788                 recent, err := v.bucket.Head("recent/"+loc, nil)
789                 if err != nil && os.IsNotExist(v.translateError(err)) {
790                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
791                         err = v.Untrash(loc)
792                         if err != nil {
793                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
794                         }
795                         continue
796                 } else if err != nil {
797                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
798                         continue
799                 }
800                 recentT, err := v.lastModified(recent)
801                 if err != nil {
802                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
803                         continue
804                 }
805                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
806                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
807                                 // recent/loc is too old to protect
808                                 // loc from being Trashed again during
809                                 // the raceWindow that starts if we
810                                 // delete trash/X now.
811                                 //
812                                 // Note this means (TrashCheckInterval
813                                 // < BlobSignatureTTL - raceWindow) is
814                                 // necessary to avoid starvation.
815                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
816                                 v.fixRace(loc)
817                                 v.Touch(loc)
818                                 continue
819                         }
820                         _, err := v.bucket.Head(loc, nil)
821                         if os.IsNotExist(err) {
822                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
823                                 v.fixRace(loc)
824                                 continue
825                         } else if err != nil {
826                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
827                                 continue
828                         }
829                 }
830                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
831                         continue
832                 }
833                 err = v.bucket.Del(trash.Key)
834                 if err != nil {
835                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
836                         continue
837                 }
838                 bytesDeleted += trash.Size
839                 blocksDeleted++
840
841                 _, err = v.bucket.Head(loc, nil)
842                 if os.IsNotExist(err) {
843                         err = v.bucket.Del("recent/" + loc)
844                         if err != nil {
845                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
846                         }
847                 } else if err != nil {
848                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
849                 }
850         }
851         if err := trashL.Error(); err != nil {
852                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
853         }
854         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
855 }
856
857 type s3Lister struct {
858         Bucket     *s3.Bucket
859         Prefix     string
860         PageSize   int
861         nextMarker string
862         buf        []s3.Key
863         err        error
864 }
865
866 // First fetches the first page and returns the first item. It returns
867 // nil if the response is the empty set or an error occurs.
868 func (lister *s3Lister) First() *s3.Key {
869         lister.getPage()
870         return lister.pop()
871 }
872
873 // Next returns the next item, fetching the next page if necessary. It
874 // returns nil if the last available item has already been fetched, or
875 // an error occurs.
876 func (lister *s3Lister) Next() *s3.Key {
877         if len(lister.buf) == 0 && lister.nextMarker != "" {
878                 lister.getPage()
879         }
880         return lister.pop()
881 }
882
883 // Return the most recent error encountered by First or Next.
884 func (lister *s3Lister) Error() error {
885         return lister.err
886 }
887
888 func (lister *s3Lister) getPage() {
889         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
890         lister.nextMarker = ""
891         if err != nil {
892                 lister.err = err
893                 return
894         }
895         if resp.IsTruncated {
896                 lister.nextMarker = resp.NextMarker
897         }
898         lister.buf = make([]s3.Key, 0, len(resp.Contents))
899         for _, key := range resp.Contents {
900                 if !strings.HasPrefix(key.Key, lister.Prefix) {
901                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
902                         continue
903                 }
904                 lister.buf = append(lister.buf, key)
905         }
906 }
907
908 func (lister *s3Lister) pop() (k *s3.Key) {
909         if len(lister.buf) > 0 {
910                 k = &lister.buf[0]
911                 lister.buf = lister.buf[1:]
912         }
913         return
914 }
915
916 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
917 type s3bucket struct {
918         *s3.Bucket
919         stats s3bucketStats
920 }
921
922 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
923         rdr, err := b.Bucket.GetReader(path)
924         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
925         b.stats.TickErr(err)
926         return NewCountingReader(rdr, b.stats.TickInBytes), err
927 }
928
929 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
930         resp, err := b.Bucket.Head(path, headers)
931         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
932         b.stats.TickErr(err)
933         return resp, err
934 }
935
936 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
937         if length == 0 {
938                 // goamz will only send Content-Length: 0 when reader
939                 // is nil due to net.http.Request.ContentLength
940                 // behavior.  Otherwise, Content-Length header is
941                 // omitted which will cause some S3 services
942                 // (including AWS and Ceph RadosGW) to fail to create
943                 // empty objects.
944                 r = nil
945         } else {
946                 r = NewCountingReader(r, b.stats.TickOutBytes)
947         }
948         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
949         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
950         b.stats.TickErr(err)
951         return err
952 }
953
954 func (b *s3bucket) Del(path string) error {
955         err := b.Bucket.Del(path)
956         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
957         b.stats.TickErr(err)
958         return err
959 }
960
961 type s3bucketStats struct {
962         statsTicker
963         Ops     uint64
964         GetOps  uint64
965         PutOps  uint64
966         HeadOps uint64
967         DelOps  uint64
968         ListOps uint64
969 }
970
971 func (s *s3bucketStats) TickErr(err error) {
972         if err == nil {
973                 return
974         }
975         errType := fmt.Sprintf("%T", err)
976         if err, ok := err.(*s3.Error); ok {
977                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
978         }
979         s.statsTicker.TickErr(err, errType)
980 }