Merge branch '13365-default-secondaryFiles' refs #13365
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "github.com/AdRoll/goamz/aws"
26         "github.com/AdRoll/goamz/s3"
27 )
28
29 const (
30         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
31         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
32 )
33
34 var (
35         // ErrS3TrashDisabled is returned by Trash if that operation
36         // is impossible with the current config.
37         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
38
39         s3AccessKeyFile string
40         s3SecretKeyFile string
41         s3RegionName    string
42         s3Endpoint      string
43         s3Replication   int
44         s3UnsafeDelete  bool
45         s3RaceWindow    time.Duration
46
47         s3ACL = s3.Private
48
49         zeroTime time.Time
50 )
51
52 const (
53         maxClockSkew  = 600 * time.Second
54         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
55 )
56
57 type s3VolumeAdder struct {
58         *Config
59 }
60
61 // String implements flag.Value
62 func (s *s3VolumeAdder) String() string {
63         return "-"
64 }
65
66 func (s *s3VolumeAdder) Set(bucketName string) error {
67         if bucketName == "" {
68                 return fmt.Errorf("no container name given")
69         }
70         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
71                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
72         }
73         if deprecated.flagSerializeIO {
74                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
75         }
76         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
77                 Bucket:        bucketName,
78                 AccessKeyFile: s3AccessKeyFile,
79                 SecretKeyFile: s3SecretKeyFile,
80                 Endpoint:      s3Endpoint,
81                 Region:        s3RegionName,
82                 RaceWindow:    arvados.Duration(s3RaceWindow),
83                 S3Replication: s3Replication,
84                 UnsafeDelete:  s3UnsafeDelete,
85                 ReadOnly:      deprecated.flagReadonly,
86                 IndexPageSize: 1000,
87         })
88         return nil
89 }
90
91 func s3regions() (okList []string) {
92         for r := range aws.Regions {
93                 okList = append(okList, r)
94         }
95         return
96 }
97
98 func init() {
99         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
100
101         flag.Var(&s3VolumeAdder{theConfig},
102                 "s3-bucket-volume",
103                 "Use the given bucket as a storage volume. Can be given multiple times.")
104         flag.StringVar(
105                 &s3RegionName,
106                 "s3-region",
107                 "",
108                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
109         flag.StringVar(
110                 &s3Endpoint,
111                 "s3-endpoint",
112                 "",
113                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
114         flag.StringVar(
115                 &s3AccessKeyFile,
116                 "s3-access-key-file",
117                 "",
118                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
119         flag.StringVar(
120                 &s3SecretKeyFile,
121                 "s3-secret-key-file",
122                 "",
123                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
124         flag.DurationVar(
125                 &s3RaceWindow,
126                 "s3-race-window",
127                 24*time.Hour,
128                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
129         flag.IntVar(
130                 &s3Replication,
131                 "s3-replication",
132                 2,
133                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
134         flag.BoolVar(
135                 &s3UnsafeDelete,
136                 "s3-unsafe-delete",
137                 false,
138                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
139 }
140
141 // S3Volume implements Volume using an S3 bucket.
142 type S3Volume struct {
143         AccessKeyFile      string
144         SecretKeyFile      string
145         Endpoint           string
146         Region             string
147         Bucket             string
148         LocationConstraint bool
149         IndexPageSize      int
150         S3Replication      int
151         ConnectTimeout     arvados.Duration
152         ReadTimeout        arvados.Duration
153         RaceWindow         arvados.Duration
154         ReadOnly           bool
155         UnsafeDelete       bool
156         StorageClasses     []string
157
158         bucket *s3bucket
159
160         startOnce sync.Once
161 }
162
163 // Examples implements VolumeWithExamples.
164 func (*S3Volume) Examples() []Volume {
165         return []Volume{
166                 &S3Volume{
167                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
168                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
169                         Endpoint:       "",
170                         Region:         "us-east-1",
171                         Bucket:         "example-bucket-name",
172                         IndexPageSize:  1000,
173                         S3Replication:  2,
174                         RaceWindow:     arvados.Duration(24 * time.Hour),
175                         ConnectTimeout: arvados.Duration(time.Minute),
176                         ReadTimeout:    arvados.Duration(5 * time.Minute),
177                 },
178                 &S3Volume{
179                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
180                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
181                         Endpoint:       "https://storage.googleapis.com",
182                         Region:         "",
183                         Bucket:         "example-bucket-name",
184                         IndexPageSize:  1000,
185                         S3Replication:  2,
186                         RaceWindow:     arvados.Duration(24 * time.Hour),
187                         ConnectTimeout: arvados.Duration(time.Minute),
188                         ReadTimeout:    arvados.Duration(5 * time.Minute),
189                 },
190         }
191 }
192
193 // Type implements Volume.
194 func (*S3Volume) Type() string {
195         return "S3"
196 }
197
198 // Start populates private fields and verifies the configuration is
199 // valid.
200 func (v *S3Volume) Start() error {
201         region, ok := aws.Regions[v.Region]
202         if v.Endpoint == "" {
203                 if !ok {
204                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
205                 }
206         } else if ok {
207                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
208                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
209         } else {
210                 region = aws.Region{
211                         Name:                 v.Region,
212                         S3Endpoint:           v.Endpoint,
213                         S3LocationConstraint: v.LocationConstraint,
214                 }
215         }
216
217         var err error
218         var auth aws.Auth
219         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
220         if err != nil {
221                 return err
222         }
223         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
224         if err != nil {
225                 return err
226         }
227
228         // Zero timeouts mean "wait forever", which is a bad
229         // default. Default to long timeouts instead.
230         if v.ConnectTimeout == 0 {
231                 v.ConnectTimeout = s3DefaultConnectTimeout
232         }
233         if v.ReadTimeout == 0 {
234                 v.ReadTimeout = s3DefaultReadTimeout
235         }
236
237         client := s3.New(auth, region)
238         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
239         client.ReadTimeout = time.Duration(v.ReadTimeout)
240         v.bucket = &s3bucket{
241                 Bucket: &s3.Bucket{
242                         S3:   client,
243                         Name: v.Bucket,
244                 },
245         }
246         return nil
247 }
248
249 // DeviceID returns a globally unique ID for the storage bucket.
250 func (v *S3Volume) DeviceID() string {
251         return "s3://" + v.Endpoint + "/" + v.Bucket
252 }
253
254 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
255         ready := make(chan bool)
256         go func() {
257                 rdr, err = v.getReader(loc)
258                 close(ready)
259         }()
260         select {
261         case <-ready:
262                 return
263         case <-ctx.Done():
264                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
265                 go func() {
266                         <-ready
267                         if err == nil {
268                                 rdr.Close()
269                         }
270                 }()
271                 return nil, ctx.Err()
272         }
273 }
274
275 // getReader wraps (Bucket)GetReader.
276 //
277 // In situations where (Bucket)GetReader would fail because the block
278 // disappeared in a Trash race, getReader calls fixRace to recover the
279 // data, and tries again.
280 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
281         rdr, err = v.bucket.GetReader(loc)
282         err = v.translateError(err)
283         if err == nil || !os.IsNotExist(err) {
284                 return
285         }
286
287         _, err = v.bucket.Head("recent/"+loc, nil)
288         err = v.translateError(err)
289         if err != nil {
290                 // If we can't read recent/X, there's no point in
291                 // trying fixRace. Give up.
292                 return
293         }
294         if !v.fixRace(loc) {
295                 err = os.ErrNotExist
296                 return
297         }
298
299         rdr, err = v.bucket.GetReader(loc)
300         if err != nil {
301                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
302                 err = v.translateError(err)
303         }
304         return
305 }
306
307 // Get a block: copy the block data into buf, and return the number of
308 // bytes copied.
309 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
310         rdr, err := v.getReaderWithContext(ctx, loc)
311         if err != nil {
312                 return 0, err
313         }
314
315         var n int
316         ready := make(chan bool)
317         go func() {
318                 defer close(ready)
319
320                 defer rdr.Close()
321                 n, err = io.ReadFull(rdr, buf)
322
323                 switch err {
324                 case nil, io.EOF, io.ErrUnexpectedEOF:
325                         err = nil
326                 default:
327                         err = v.translateError(err)
328                 }
329         }()
330         select {
331         case <-ctx.Done():
332                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
333                 rdr.Close()
334                 // Must wait for ReadFull to return, to ensure it
335                 // doesn't write to buf after we return.
336                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
337                 <-ready
338                 return 0, ctx.Err()
339         case <-ready:
340                 return n, err
341         }
342 }
343
344 // Compare the given data with the stored data.
345 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
346         errChan := make(chan error, 1)
347         go func() {
348                 _, err := v.bucket.Head("recent/"+loc, nil)
349                 errChan <- err
350         }()
351         var err error
352         select {
353         case <-ctx.Done():
354                 return ctx.Err()
355         case err = <-errChan:
356         }
357         if err != nil {
358                 // Checking for "loc" itself here would interfere with
359                 // future GET requests.
360                 //
361                 // On AWS, if X doesn't exist, a HEAD or GET request
362                 // for X causes X's non-existence to be cached. Thus,
363                 // if we test for X, then create X and return a
364                 // signature to our client, the client might still get
365                 // 404 from all keepstores when trying to read it.
366                 //
367                 // To avoid this, we avoid doing HEAD X or GET X until
368                 // we know X has been written.
369                 //
370                 // Note that X might exist even though recent/X
371                 // doesn't: for example, the response to HEAD recent/X
372                 // might itself come from a stale cache. In such
373                 // cases, we will return a false negative and
374                 // PutHandler might needlessly create another replica
375                 // on a different volume. That's not ideal, but it's
376                 // better than passing the eventually-consistent
377                 // problem on to our clients.
378                 return v.translateError(err)
379         }
380         rdr, err := v.getReaderWithContext(ctx, loc)
381         if err != nil {
382                 return err
383         }
384         defer rdr.Close()
385         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
386 }
387
388 // Put writes a block.
389 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
390         if v.ReadOnly {
391                 return MethodDisabledError
392         }
393         var opts s3.Options
394         size := len(block)
395         if size > 0 {
396                 md5, err := hex.DecodeString(loc)
397                 if err != nil {
398                         return err
399                 }
400                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
401         }
402
403         // Send the block data through a pipe, so that (if we need to)
404         // we can close the pipe early and abandon our PutReader()
405         // goroutine, without worrying about PutReader() accessing our
406         // block buffer after we release it.
407         bufr, bufw := io.Pipe()
408         go func() {
409                 io.Copy(bufw, bytes.NewReader(block))
410                 bufw.Close()
411         }()
412
413         var err error
414         ready := make(chan bool)
415         go func() {
416                 defer func() {
417                         if ctx.Err() != nil {
418                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
419                         }
420                 }()
421                 defer close(ready)
422                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
423                 if err != nil {
424                         return
425                 }
426                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
427         }()
428         select {
429         case <-ctx.Done():
430                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
431                 // Our pipe might be stuck in Write(), waiting for
432                 // io.Copy() to read. If so, un-stick it. This means
433                 // PutReader will get corrupt data, but that's OK: the
434                 // size and MD5 won't match, so the write will fail.
435                 go io.Copy(ioutil.Discard, bufr)
436                 // CloseWithError() will return once pending I/O is done.
437                 bufw.CloseWithError(ctx.Err())
438                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
439                 return ctx.Err()
440         case <-ready:
441                 return v.translateError(err)
442         }
443 }
444
445 // Touch sets the timestamp for the given locator to the current time.
446 func (v *S3Volume) Touch(loc string) error {
447         if v.ReadOnly {
448                 return MethodDisabledError
449         }
450         _, err := v.bucket.Head(loc, nil)
451         err = v.translateError(err)
452         if os.IsNotExist(err) && v.fixRace(loc) {
453                 // The data object got trashed in a race, but fixRace
454                 // rescued it.
455         } else if err != nil {
456                 return err
457         }
458         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
459         return v.translateError(err)
460 }
461
462 // Mtime returns the stored timestamp for the given locator.
463 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
464         _, err := v.bucket.Head(loc, nil)
465         if err != nil {
466                 return zeroTime, v.translateError(err)
467         }
468         resp, err := v.bucket.Head("recent/"+loc, nil)
469         err = v.translateError(err)
470         if os.IsNotExist(err) {
471                 // The data object X exists, but recent/X is missing.
472                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
473                 if err != nil {
474                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
475                         return zeroTime, v.translateError(err)
476                 }
477                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
478                 resp, err = v.bucket.Head("recent/"+loc, nil)
479                 if err != nil {
480                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
481                         return zeroTime, v.translateError(err)
482                 }
483         } else if err != nil {
484                 // HEAD recent/X failed for some other reason.
485                 return zeroTime, err
486         }
487         return v.lastModified(resp)
488 }
489
490 // IndexTo writes a complete list of locators with the given prefix
491 // for which Get() can retrieve data.
492 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
493         // Use a merge sort to find matching sets of X and recent/X.
494         dataL := s3Lister{
495                 Bucket:   v.bucket.Bucket,
496                 Prefix:   prefix,
497                 PageSize: v.IndexPageSize,
498         }
499         recentL := s3Lister{
500                 Bucket:   v.bucket.Bucket,
501                 Prefix:   "recent/" + prefix,
502                 PageSize: v.IndexPageSize,
503         }
504         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
505         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
506         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
507                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
508                 if data.Key >= "g" {
509                         // Conveniently, "recent/*" and "trash/*" are
510                         // lexically greater than all hex-encoded data
511                         // hashes, so stopping here avoids iterating
512                         // over all of them needlessly with dataL.
513                         break
514                 }
515                 if !v.isKeepBlock(data.Key) {
516                         continue
517                 }
518
519                 // stamp is the list entry we should use to report the
520                 // last-modified time for this data block: it will be
521                 // the recent/X entry if one exists, otherwise the
522                 // entry for the data block itself.
523                 stamp := data
524
525                 // Advance to the corresponding recent/X marker, if any
526                 for recent != nil {
527                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
528                                 recent = recentL.Next()
529                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
530                                 continue
531                         } else if cmp == 0 {
532                                 stamp = recent
533                                 recent = recentL.Next()
534                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
535                                 break
536                         } else {
537                                 // recent/X marker is missing: we'll
538                                 // use the timestamp on the data
539                                 // object.
540                                 break
541                         }
542                 }
543                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
544                 if err != nil {
545                         return err
546                 }
547                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
548         }
549         return nil
550 }
551
552 // Trash a Keep block.
553 func (v *S3Volume) Trash(loc string) error {
554         if v.ReadOnly {
555                 return MethodDisabledError
556         }
557         if t, err := v.Mtime(loc); err != nil {
558                 return err
559         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
560                 return nil
561         }
562         if theConfig.TrashLifetime == 0 {
563                 if !s3UnsafeDelete {
564                         return ErrS3TrashDisabled
565                 }
566                 return v.translateError(v.bucket.Del(loc))
567         }
568         err := v.checkRaceWindow(loc)
569         if err != nil {
570                 return err
571         }
572         err = v.safeCopy("trash/"+loc, loc)
573         if err != nil {
574                 return err
575         }
576         return v.translateError(v.bucket.Del(loc))
577 }
578
579 // checkRaceWindow returns a non-nil error if trash/loc is, or might
580 // be, in the race window (i.e., it's not safe to trash loc).
581 func (v *S3Volume) checkRaceWindow(loc string) error {
582         resp, err := v.bucket.Head("trash/"+loc, nil)
583         err = v.translateError(err)
584         if os.IsNotExist(err) {
585                 // OK, trash/X doesn't exist so we're not in the race
586                 // window
587                 return nil
588         } else if err != nil {
589                 // Error looking up trash/X. We don't know whether
590                 // we're in the race window
591                 return err
592         }
593         t, err := v.lastModified(resp)
594         if err != nil {
595                 // Can't parse timestamp
596                 return err
597         }
598         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
599         if safeWindow <= 0 {
600                 // We can't count on "touch trash/X" to prolong
601                 // trash/X's lifetime. The new timestamp might not
602                 // become visible until now+raceWindow, and EmptyTrash
603                 // is allowed to delete trash/X before then.
604                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
605         }
606         // trash/X exists, but it won't be eligible for deletion until
607         // after now+raceWindow, so it's safe to overwrite it.
608         return nil
609 }
610
611 // safeCopy calls PutCopy, and checks the response to make sure the
612 // copy succeeded and updated the timestamp on the destination object
613 // (PutCopy returns 200 OK if the request was received, even if the
614 // copy failed).
615 func (v *S3Volume) safeCopy(dst, src string) error {
616         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
617                 ContentType:       "application/octet-stream",
618                 MetadataDirective: "REPLACE",
619         }, v.bucket.Name+"/"+src)
620         err = v.translateError(err)
621         if err != nil {
622                 return err
623         }
624         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
625                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
626         } else if time.Now().Sub(t) > maxClockSkew {
627                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
628         }
629         return nil
630 }
631
632 // Get the LastModified header from resp, and parse it as RFC1123 or
633 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
634 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
635         s := resp.Header.Get("Last-Modified")
636         t, err = time.Parse(time.RFC1123, s)
637         if err != nil && s != "" {
638                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
639                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
640                 // as required by HTTP spec. If it's not a valid HTTP
641                 // header value, it's probably AWS (or s3test) giving
642                 // us a nearly-RFC1123 timestamp.
643                 t, err = time.Parse(nearlyRFC1123, s)
644         }
645         return
646 }
647
648 // Untrash moves block from trash back into store
649 func (v *S3Volume) Untrash(loc string) error {
650         err := v.safeCopy(loc, "trash/"+loc)
651         if err != nil {
652                 return err
653         }
654         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
655         return v.translateError(err)
656 }
657
658 // Status returns a *VolumeStatus representing the current in-use
659 // storage capacity and a fake available capacity that doesn't make
660 // the volume seem full or nearly-full.
661 func (v *S3Volume) Status() *VolumeStatus {
662         return &VolumeStatus{
663                 DeviceNum: 1,
664                 BytesFree: BlockSize * 1000,
665                 BytesUsed: 1,
666         }
667 }
668
669 // InternalStats returns bucket I/O and API call counters.
670 func (v *S3Volume) InternalStats() interface{} {
671         return &v.bucket.stats
672 }
673
674 // String implements fmt.Stringer.
675 func (v *S3Volume) String() string {
676         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
677 }
678
679 // Writable returns false if all future Put, Mtime, and Delete calls
680 // are expected to fail.
681 func (v *S3Volume) Writable() bool {
682         return !v.ReadOnly
683 }
684
685 // Replication returns the storage redundancy of the underlying
686 // device. Configured via command line flag.
687 func (v *S3Volume) Replication() int {
688         return v.S3Replication
689 }
690
691 // GetStorageClasses implements Volume
692 func (v *S3Volume) GetStorageClasses() []string {
693         return v.StorageClasses
694 }
695
696 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
697
698 func (v *S3Volume) isKeepBlock(s string) bool {
699         return s3KeepBlockRegexp.MatchString(s)
700 }
701
702 // fixRace(X) is called when "recent/X" exists but "X" doesn't
703 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
704 // there was a race between Put and Trash, fixRace recovers from the
705 // race by Untrashing the block.
706 func (v *S3Volume) fixRace(loc string) bool {
707         trash, err := v.bucket.Head("trash/"+loc, nil)
708         if err != nil {
709                 if !os.IsNotExist(v.translateError(err)) {
710                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
711                 }
712                 return false
713         }
714         trashTime, err := v.lastModified(trash)
715         if err != nil {
716                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
717                 return false
718         }
719
720         recent, err := v.bucket.Head("recent/"+loc, nil)
721         if err != nil {
722                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
723                 return false
724         }
725         recentTime, err := v.lastModified(recent)
726         if err != nil {
727                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
728                 return false
729         }
730
731         ageWhenTrashed := trashTime.Sub(recentTime)
732         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
733                 // No evidence of a race: block hasn't been written
734                 // since it became eligible for Trash. No fix needed.
735                 return false
736         }
737
738         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
739         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
740         err = v.safeCopy(loc, "trash/"+loc)
741         if err != nil {
742                 log.Printf("error: fixRace: %s", err)
743                 return false
744         }
745         return true
746 }
747
748 func (v *S3Volume) translateError(err error) error {
749         switch err := err.(type) {
750         case *s3.Error:
751                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
752                         strings.Contains(err.Error(), "Not Found") {
753                         return os.ErrNotExist
754                 }
755                 // Other 404 errors like NoSuchVersion and
756                 // NoSuchBucket are different problems which should
757                 // get called out downstream, so we don't convert them
758                 // to os.ErrNotExist.
759         }
760         return err
761 }
762
763 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
764 // and deletes them from the volume.
765 func (v *S3Volume) EmptyTrash() {
766         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
767
768         // Define "ready to delete" as "...when EmptyTrash started".
769         startT := time.Now()
770
771         emptyOneKey := func(trash *s3.Key) {
772                 loc := trash.Key[6:]
773                 if !v.isKeepBlock(loc) {
774                         return
775                 }
776                 atomic.AddInt64(&bytesInTrash, trash.Size)
777                 atomic.AddInt64(&blocksInTrash, 1)
778
779                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
780                 if err != nil {
781                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
782                         return
783                 }
784                 recent, err := v.bucket.Head("recent/"+loc, nil)
785                 if err != nil && os.IsNotExist(v.translateError(err)) {
786                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
787                         err = v.Untrash(loc)
788                         if err != nil {
789                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
790                         }
791                         return
792                 } else if err != nil {
793                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
794                         return
795                 }
796                 recentT, err := v.lastModified(recent)
797                 if err != nil {
798                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
799                         return
800                 }
801                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
802                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
803                                 // recent/loc is too old to protect
804                                 // loc from being Trashed again during
805                                 // the raceWindow that starts if we
806                                 // delete trash/X now.
807                                 //
808                                 // Note this means (TrashCheckInterval
809                                 // < BlobSignatureTTL - raceWindow) is
810                                 // necessary to avoid starvation.
811                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
812                                 v.fixRace(loc)
813                                 v.Touch(loc)
814                                 return
815                         }
816                         _, err := v.bucket.Head(loc, nil)
817                         if os.IsNotExist(err) {
818                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
819                                 v.fixRace(loc)
820                                 return
821                         } else if err != nil {
822                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
823                                 return
824                         }
825                 }
826                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
827                         return
828                 }
829                 err = v.bucket.Del(trash.Key)
830                 if err != nil {
831                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
832                         return
833                 }
834                 atomic.AddInt64(&bytesDeleted, trash.Size)
835                 atomic.AddInt64(&blocksDeleted, 1)
836
837                 _, err = v.bucket.Head(loc, nil)
838                 if err == nil {
839                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
840                         return
841                 }
842                 if !os.IsNotExist(v.translateError(err)) {
843                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
844                         return
845                 }
846                 err = v.bucket.Del("recent/" + loc)
847                 if err != nil {
848                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
849                 }
850         }
851
852         var wg sync.WaitGroup
853         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
854         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
855                 wg.Add(1)
856                 go func() {
857                         defer wg.Done()
858                         for key := range todo {
859                                 emptyOneKey(key)
860                         }
861                 }()
862         }
863
864         trashL := s3Lister{
865                 Bucket:   v.bucket.Bucket,
866                 Prefix:   "trash/",
867                 PageSize: v.IndexPageSize,
868         }
869         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
870                 todo <- trash
871         }
872         close(todo)
873         wg.Wait()
874
875         if err := trashL.Error(); err != nil {
876                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
877         }
878         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
879 }
880
881 type s3Lister struct {
882         Bucket     *s3.Bucket
883         Prefix     string
884         PageSize   int
885         nextMarker string
886         buf        []s3.Key
887         err        error
888 }
889
890 // First fetches the first page and returns the first item. It returns
891 // nil if the response is the empty set or an error occurs.
892 func (lister *s3Lister) First() *s3.Key {
893         lister.getPage()
894         return lister.pop()
895 }
896
897 // Next returns the next item, fetching the next page if necessary. It
898 // returns nil if the last available item has already been fetched, or
899 // an error occurs.
900 func (lister *s3Lister) Next() *s3.Key {
901         if len(lister.buf) == 0 && lister.nextMarker != "" {
902                 lister.getPage()
903         }
904         return lister.pop()
905 }
906
907 // Return the most recent error encountered by First or Next.
908 func (lister *s3Lister) Error() error {
909         return lister.err
910 }
911
912 func (lister *s3Lister) getPage() {
913         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
914         lister.nextMarker = ""
915         if err != nil {
916                 lister.err = err
917                 return
918         }
919         if resp.IsTruncated {
920                 lister.nextMarker = resp.NextMarker
921         }
922         lister.buf = make([]s3.Key, 0, len(resp.Contents))
923         for _, key := range resp.Contents {
924                 if !strings.HasPrefix(key.Key, lister.Prefix) {
925                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
926                         continue
927                 }
928                 lister.buf = append(lister.buf, key)
929         }
930 }
931
932 func (lister *s3Lister) pop() (k *s3.Key) {
933         if len(lister.buf) > 0 {
934                 k = &lister.buf[0]
935                 lister.buf = lister.buf[1:]
936         }
937         return
938 }
939
940 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
941 type s3bucket struct {
942         *s3.Bucket
943         stats s3bucketStats
944 }
945
946 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
947         rdr, err := b.Bucket.GetReader(path)
948         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
949         b.stats.TickErr(err)
950         return NewCountingReader(rdr, b.stats.TickInBytes), err
951 }
952
953 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
954         resp, err := b.Bucket.Head(path, headers)
955         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
956         b.stats.TickErr(err)
957         return resp, err
958 }
959
960 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
961         if length == 0 {
962                 // goamz will only send Content-Length: 0 when reader
963                 // is nil due to net.http.Request.ContentLength
964                 // behavior.  Otherwise, Content-Length header is
965                 // omitted which will cause some S3 services
966                 // (including AWS and Ceph RadosGW) to fail to create
967                 // empty objects.
968                 r = nil
969         } else {
970                 r = NewCountingReader(r, b.stats.TickOutBytes)
971         }
972         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
973         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
974         b.stats.TickErr(err)
975         return err
976 }
977
978 func (b *s3bucket) Del(path string) error {
979         err := b.Bucket.Del(path)
980         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
981         b.stats.TickErr(err)
982         return err
983 }
984
985 type s3bucketStats struct {
986         statsTicker
987         Ops     uint64
988         GetOps  uint64
989         PutOps  uint64
990         HeadOps uint64
991         DelOps  uint64
992         ListOps uint64
993 }
994
995 func (s *s3bucketStats) TickErr(err error) {
996         if err == nil {
997                 return
998         }
999         errType := fmt.Sprintf("%T", err)
1000         if err, ok := err.(*s3.Error); ok {
1001                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1002         }
1003         s.statsTicker.TickErr(err, errType)
1004 }