Merge branch '13517-buffer-leak'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "github.com/AdRoll/goamz/aws"
26         "github.com/AdRoll/goamz/s3"
27 )
28
29 const (
30         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
31         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
32 )
33
34 var (
35         // ErrS3TrashDisabled is returned by Trash if that operation
36         // is impossible with the current config.
37         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
38
39         s3AccessKeyFile string
40         s3SecretKeyFile string
41         s3RegionName    string
42         s3Endpoint      string
43         s3Replication   int
44         s3UnsafeDelete  bool
45         s3RaceWindow    time.Duration
46
47         s3ACL = s3.Private
48
49         zeroTime time.Time
50 )
51
52 const (
53         maxClockSkew  = 600 * time.Second
54         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
55 )
56
57 type s3VolumeAdder struct {
58         *Config
59 }
60
61 // String implements flag.Value
62 func (s *s3VolumeAdder) String() string {
63         return "-"
64 }
65
66 func (s *s3VolumeAdder) Set(bucketName string) error {
67         if bucketName == "" {
68                 return fmt.Errorf("no container name given")
69         }
70         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
71                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
72         }
73         if deprecated.flagSerializeIO {
74                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
75         }
76         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
77                 Bucket:        bucketName,
78                 AccessKeyFile: s3AccessKeyFile,
79                 SecretKeyFile: s3SecretKeyFile,
80                 Endpoint:      s3Endpoint,
81                 Region:        s3RegionName,
82                 RaceWindow:    arvados.Duration(s3RaceWindow),
83                 S3Replication: s3Replication,
84                 UnsafeDelete:  s3UnsafeDelete,
85                 ReadOnly:      deprecated.flagReadonly,
86                 IndexPageSize: 1000,
87         })
88         return nil
89 }
90
91 func s3regions() (okList []string) {
92         for r := range aws.Regions {
93                 okList = append(okList, r)
94         }
95         return
96 }
97
98 func init() {
99         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
100
101         flag.Var(&s3VolumeAdder{theConfig},
102                 "s3-bucket-volume",
103                 "Use the given bucket as a storage volume. Can be given multiple times.")
104         flag.StringVar(
105                 &s3RegionName,
106                 "s3-region",
107                 "",
108                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
109         flag.StringVar(
110                 &s3Endpoint,
111                 "s3-endpoint",
112                 "",
113                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
114         flag.StringVar(
115                 &s3AccessKeyFile,
116                 "s3-access-key-file",
117                 "",
118                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
119         flag.StringVar(
120                 &s3SecretKeyFile,
121                 "s3-secret-key-file",
122                 "",
123                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
124         flag.DurationVar(
125                 &s3RaceWindow,
126                 "s3-race-window",
127                 24*time.Hour,
128                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
129         flag.IntVar(
130                 &s3Replication,
131                 "s3-replication",
132                 2,
133                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
134         flag.BoolVar(
135                 &s3UnsafeDelete,
136                 "s3-unsafe-delete",
137                 false,
138                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
139 }
140
141 // S3Volume implements Volume using an S3 bucket.
142 type S3Volume struct {
143         AccessKeyFile      string
144         SecretKeyFile      string
145         Endpoint           string
146         Region             string
147         Bucket             string
148         LocationConstraint bool
149         IndexPageSize      int
150         S3Replication      int
151         ConnectTimeout     arvados.Duration
152         ReadTimeout        arvados.Duration
153         RaceWindow         arvados.Duration
154         ReadOnly           bool
155         UnsafeDelete       bool
156         StorageClasses     []string
157
158         bucket *s3bucket
159
160         startOnce sync.Once
161 }
162
163 // Examples implements VolumeWithExamples.
164 func (*S3Volume) Examples() []Volume {
165         return []Volume{
166                 &S3Volume{
167                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
168                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
169                         Endpoint:       "",
170                         Region:         "us-east-1",
171                         Bucket:         "example-bucket-name",
172                         IndexPageSize:  1000,
173                         S3Replication:  2,
174                         RaceWindow:     arvados.Duration(24 * time.Hour),
175                         ConnectTimeout: arvados.Duration(time.Minute),
176                         ReadTimeout:    arvados.Duration(5 * time.Minute),
177                 },
178                 &S3Volume{
179                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
180                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
181                         Endpoint:       "https://storage.googleapis.com",
182                         Region:         "",
183                         Bucket:         "example-bucket-name",
184                         IndexPageSize:  1000,
185                         S3Replication:  2,
186                         RaceWindow:     arvados.Duration(24 * time.Hour),
187                         ConnectTimeout: arvados.Duration(time.Minute),
188                         ReadTimeout:    arvados.Duration(5 * time.Minute),
189                 },
190         }
191 }
192
193 // Type implements Volume.
194 func (*S3Volume) Type() string {
195         return "S3"
196 }
197
198 // Start populates private fields and verifies the configuration is
199 // valid.
200 func (v *S3Volume) Start() error {
201         region, ok := aws.Regions[v.Region]
202         if v.Endpoint == "" {
203                 if !ok {
204                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
205                 }
206         } else if ok {
207                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
208                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
209         } else {
210                 region = aws.Region{
211                         Name:                 v.Region,
212                         S3Endpoint:           v.Endpoint,
213                         S3LocationConstraint: v.LocationConstraint,
214                 }
215         }
216
217         var err error
218         var auth aws.Auth
219         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
220         if err != nil {
221                 return err
222         }
223         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
224         if err != nil {
225                 return err
226         }
227
228         // Zero timeouts mean "wait forever", which is a bad
229         // default. Default to long timeouts instead.
230         if v.ConnectTimeout == 0 {
231                 v.ConnectTimeout = s3DefaultConnectTimeout
232         }
233         if v.ReadTimeout == 0 {
234                 v.ReadTimeout = s3DefaultReadTimeout
235         }
236
237         client := s3.New(auth, region)
238         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
239         client.ReadTimeout = time.Duration(v.ReadTimeout)
240         v.bucket = &s3bucket{
241                 Bucket: &s3.Bucket{
242                         S3:   client,
243                         Name: v.Bucket,
244                 },
245         }
246         return nil
247 }
248
249 // DeviceID returns a globally unique ID for the storage bucket.
250 func (v *S3Volume) DeviceID() string {
251         return "s3://" + v.Endpoint + "/" + v.Bucket
252 }
253
254 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
255         ready := make(chan bool)
256         go func() {
257                 rdr, err = v.getReader(loc)
258                 close(ready)
259         }()
260         select {
261         case <-ready:
262                 return
263         case <-ctx.Done():
264                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
265                 go func() {
266                         <-ready
267                         if err == nil {
268                                 rdr.Close()
269                         }
270                 }()
271                 return nil, ctx.Err()
272         }
273 }
274
275 // getReader wraps (Bucket)GetReader.
276 //
277 // In situations where (Bucket)GetReader would fail because the block
278 // disappeared in a Trash race, getReader calls fixRace to recover the
279 // data, and tries again.
280 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
281         rdr, err = v.bucket.GetReader(loc)
282         err = v.translateError(err)
283         if err == nil || !os.IsNotExist(err) {
284                 return
285         }
286
287         _, err = v.bucket.Head("recent/"+loc, nil)
288         err = v.translateError(err)
289         if err != nil {
290                 // If we can't read recent/X, there's no point in
291                 // trying fixRace. Give up.
292                 return
293         }
294         if !v.fixRace(loc) {
295                 err = os.ErrNotExist
296                 return
297         }
298
299         rdr, err = v.bucket.GetReader(loc)
300         if err != nil {
301                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
302                 err = v.translateError(err)
303         }
304         return
305 }
306
307 // Get a block: copy the block data into buf, and return the number of
308 // bytes copied.
309 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
310         rdr, err := v.getReaderWithContext(ctx, loc)
311         if err != nil {
312                 return 0, err
313         }
314
315         var n int
316         ready := make(chan bool)
317         go func() {
318                 defer close(ready)
319
320                 defer rdr.Close()
321                 n, err = io.ReadFull(rdr, buf)
322
323                 switch err {
324                 case nil, io.EOF, io.ErrUnexpectedEOF:
325                         err = nil
326                 default:
327                         err = v.translateError(err)
328                 }
329         }()
330         select {
331         case <-ctx.Done():
332                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
333                 rdr.Close()
334                 // Must wait for ReadFull to return, to ensure it
335                 // doesn't write to buf after we return.
336                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
337                 <-ready
338                 return 0, ctx.Err()
339         case <-ready:
340                 return n, err
341         }
342 }
343
344 // Compare the given data with the stored data.
345 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
346         errChan := make(chan error, 1)
347         go func() {
348                 _, err := v.bucket.Head("recent/"+loc, nil)
349                 errChan <- err
350         }()
351         var err error
352         select {
353         case <-ctx.Done():
354                 return ctx.Err()
355         case err = <-errChan:
356         }
357         if err != nil {
358                 // Checking for "loc" itself here would interfere with
359                 // future GET requests.
360                 //
361                 // On AWS, if X doesn't exist, a HEAD or GET request
362                 // for X causes X's non-existence to be cached. Thus,
363                 // if we test for X, then create X and return a
364                 // signature to our client, the client might still get
365                 // 404 from all keepstores when trying to read it.
366                 //
367                 // To avoid this, we avoid doing HEAD X or GET X until
368                 // we know X has been written.
369                 //
370                 // Note that X might exist even though recent/X
371                 // doesn't: for example, the response to HEAD recent/X
372                 // might itself come from a stale cache. In such
373                 // cases, we will return a false negative and
374                 // PutHandler might needlessly create another replica
375                 // on a different volume. That's not ideal, but it's
376                 // better than passing the eventually-consistent
377                 // problem on to our clients.
378                 return v.translateError(err)
379         }
380         rdr, err := v.getReaderWithContext(ctx, loc)
381         if err != nil {
382                 return err
383         }
384         defer rdr.Close()
385         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
386 }
387
388 // Put writes a block.
389 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
390         if v.ReadOnly {
391                 return MethodDisabledError
392         }
393         var opts s3.Options
394         size := len(block)
395         if size > 0 {
396                 md5, err := hex.DecodeString(loc)
397                 if err != nil {
398                         return err
399                 }
400                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
401         }
402
403         // Send the block data through a pipe, so that (if we need to)
404         // we can close the pipe early and abandon our PutReader()
405         // goroutine, without worrying about PutReader() accessing our
406         // block buffer after we release it.
407         bufr, bufw := io.Pipe()
408         go func() {
409                 io.Copy(bufw, bytes.NewReader(block))
410                 bufw.Close()
411         }()
412
413         var err error
414         ready := make(chan bool)
415         go func() {
416                 defer func() {
417                         if ctx.Err() != nil {
418                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
419                         }
420                 }()
421                 defer close(ready)
422                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
423                 if err != nil {
424                         return
425                 }
426                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
427         }()
428         select {
429         case <-ctx.Done():
430                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
431                 // Our pipe might be stuck in Write(), waiting for
432                 // PutReader() to read. If so, un-stick it. This means
433                 // PutReader will get corrupt data, but that's OK: the
434                 // size and MD5 won't match, so the write will fail.
435                 go io.Copy(ioutil.Discard, bufr)
436                 // CloseWithError() will return once pending I/O is done.
437                 bufw.CloseWithError(ctx.Err())
438                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
439                 return ctx.Err()
440         case <-ready:
441                 // Unblock pipe in case PutReader did not consume it.
442                 io.Copy(ioutil.Discard, bufr)
443                 return v.translateError(err)
444         }
445 }
446
447 // Touch sets the timestamp for the given locator to the current time.
448 func (v *S3Volume) Touch(loc string) error {
449         if v.ReadOnly {
450                 return MethodDisabledError
451         }
452         _, err := v.bucket.Head(loc, nil)
453         err = v.translateError(err)
454         if os.IsNotExist(err) && v.fixRace(loc) {
455                 // The data object got trashed in a race, but fixRace
456                 // rescued it.
457         } else if err != nil {
458                 return err
459         }
460         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
461         return v.translateError(err)
462 }
463
464 // Mtime returns the stored timestamp for the given locator.
465 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
466         _, err := v.bucket.Head(loc, nil)
467         if err != nil {
468                 return zeroTime, v.translateError(err)
469         }
470         resp, err := v.bucket.Head("recent/"+loc, nil)
471         err = v.translateError(err)
472         if os.IsNotExist(err) {
473                 // The data object X exists, but recent/X is missing.
474                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
475                 if err != nil {
476                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
477                         return zeroTime, v.translateError(err)
478                 }
479                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
480                 resp, err = v.bucket.Head("recent/"+loc, nil)
481                 if err != nil {
482                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
483                         return zeroTime, v.translateError(err)
484                 }
485         } else if err != nil {
486                 // HEAD recent/X failed for some other reason.
487                 return zeroTime, err
488         }
489         return v.lastModified(resp)
490 }
491
492 // IndexTo writes a complete list of locators with the given prefix
493 // for which Get() can retrieve data.
494 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
495         // Use a merge sort to find matching sets of X and recent/X.
496         dataL := s3Lister{
497                 Bucket:   v.bucket.Bucket,
498                 Prefix:   prefix,
499                 PageSize: v.IndexPageSize,
500         }
501         recentL := s3Lister{
502                 Bucket:   v.bucket.Bucket,
503                 Prefix:   "recent/" + prefix,
504                 PageSize: v.IndexPageSize,
505         }
506         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
507         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
508         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
509                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
510                 if data.Key >= "g" {
511                         // Conveniently, "recent/*" and "trash/*" are
512                         // lexically greater than all hex-encoded data
513                         // hashes, so stopping here avoids iterating
514                         // over all of them needlessly with dataL.
515                         break
516                 }
517                 if !v.isKeepBlock(data.Key) {
518                         continue
519                 }
520
521                 // stamp is the list entry we should use to report the
522                 // last-modified time for this data block: it will be
523                 // the recent/X entry if one exists, otherwise the
524                 // entry for the data block itself.
525                 stamp := data
526
527                 // Advance to the corresponding recent/X marker, if any
528                 for recent != nil {
529                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
530                                 recent = recentL.Next()
531                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
532                                 continue
533                         } else if cmp == 0 {
534                                 stamp = recent
535                                 recent = recentL.Next()
536                                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
537                                 break
538                         } else {
539                                 // recent/X marker is missing: we'll
540                                 // use the timestamp on the data
541                                 // object.
542                                 break
543                         }
544                 }
545                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
546                 if err != nil {
547                         return err
548                 }
549                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
550         }
551         return nil
552 }
553
554 // Trash a Keep block.
555 func (v *S3Volume) Trash(loc string) error {
556         if v.ReadOnly {
557                 return MethodDisabledError
558         }
559         if t, err := v.Mtime(loc); err != nil {
560                 return err
561         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
562                 return nil
563         }
564         if theConfig.TrashLifetime == 0 {
565                 if !s3UnsafeDelete {
566                         return ErrS3TrashDisabled
567                 }
568                 return v.translateError(v.bucket.Del(loc))
569         }
570         err := v.checkRaceWindow(loc)
571         if err != nil {
572                 return err
573         }
574         err = v.safeCopy("trash/"+loc, loc)
575         if err != nil {
576                 return err
577         }
578         return v.translateError(v.bucket.Del(loc))
579 }
580
581 // checkRaceWindow returns a non-nil error if trash/loc is, or might
582 // be, in the race window (i.e., it's not safe to trash loc).
583 func (v *S3Volume) checkRaceWindow(loc string) error {
584         resp, err := v.bucket.Head("trash/"+loc, nil)
585         err = v.translateError(err)
586         if os.IsNotExist(err) {
587                 // OK, trash/X doesn't exist so we're not in the race
588                 // window
589                 return nil
590         } else if err != nil {
591                 // Error looking up trash/X. We don't know whether
592                 // we're in the race window
593                 return err
594         }
595         t, err := v.lastModified(resp)
596         if err != nil {
597                 // Can't parse timestamp
598                 return err
599         }
600         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
601         if safeWindow <= 0 {
602                 // We can't count on "touch trash/X" to prolong
603                 // trash/X's lifetime. The new timestamp might not
604                 // become visible until now+raceWindow, and EmptyTrash
605                 // is allowed to delete trash/X before then.
606                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
607         }
608         // trash/X exists, but it won't be eligible for deletion until
609         // after now+raceWindow, so it's safe to overwrite it.
610         return nil
611 }
612
613 // safeCopy calls PutCopy, and checks the response to make sure the
614 // copy succeeded and updated the timestamp on the destination object
615 // (PutCopy returns 200 OK if the request was received, even if the
616 // copy failed).
617 func (v *S3Volume) safeCopy(dst, src string) error {
618         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
619                 ContentType:       "application/octet-stream",
620                 MetadataDirective: "REPLACE",
621         }, v.bucket.Name+"/"+src)
622         err = v.translateError(err)
623         if err != nil {
624                 return err
625         }
626         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
627                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
628         } else if time.Now().Sub(t) > maxClockSkew {
629                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
630         }
631         return nil
632 }
633
634 // Get the LastModified header from resp, and parse it as RFC1123 or
635 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
636 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
637         s := resp.Header.Get("Last-Modified")
638         t, err = time.Parse(time.RFC1123, s)
639         if err != nil && s != "" {
640                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
641                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
642                 // as required by HTTP spec. If it's not a valid HTTP
643                 // header value, it's probably AWS (or s3test) giving
644                 // us a nearly-RFC1123 timestamp.
645                 t, err = time.Parse(nearlyRFC1123, s)
646         }
647         return
648 }
649
650 // Untrash moves block from trash back into store
651 func (v *S3Volume) Untrash(loc string) error {
652         err := v.safeCopy(loc, "trash/"+loc)
653         if err != nil {
654                 return err
655         }
656         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
657         return v.translateError(err)
658 }
659
660 // Status returns a *VolumeStatus representing the current in-use
661 // storage capacity and a fake available capacity that doesn't make
662 // the volume seem full or nearly-full.
663 func (v *S3Volume) Status() *VolumeStatus {
664         return &VolumeStatus{
665                 DeviceNum: 1,
666                 BytesFree: BlockSize * 1000,
667                 BytesUsed: 1,
668         }
669 }
670
671 // InternalStats returns bucket I/O and API call counters.
672 func (v *S3Volume) InternalStats() interface{} {
673         return &v.bucket.stats
674 }
675
676 // String implements fmt.Stringer.
677 func (v *S3Volume) String() string {
678         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
679 }
680
681 // Writable returns false if all future Put, Mtime, and Delete calls
682 // are expected to fail.
683 func (v *S3Volume) Writable() bool {
684         return !v.ReadOnly
685 }
686
687 // Replication returns the storage redundancy of the underlying
688 // device. Configured via command line flag.
689 func (v *S3Volume) Replication() int {
690         return v.S3Replication
691 }
692
693 // GetStorageClasses implements Volume
694 func (v *S3Volume) GetStorageClasses() []string {
695         return v.StorageClasses
696 }
697
698 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
699
700 func (v *S3Volume) isKeepBlock(s string) bool {
701         return s3KeepBlockRegexp.MatchString(s)
702 }
703
704 // fixRace(X) is called when "recent/X" exists but "X" doesn't
705 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
706 // there was a race between Put and Trash, fixRace recovers from the
707 // race by Untrashing the block.
708 func (v *S3Volume) fixRace(loc string) bool {
709         trash, err := v.bucket.Head("trash/"+loc, nil)
710         if err != nil {
711                 if !os.IsNotExist(v.translateError(err)) {
712                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
713                 }
714                 return false
715         }
716         trashTime, err := v.lastModified(trash)
717         if err != nil {
718                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
719                 return false
720         }
721
722         recent, err := v.bucket.Head("recent/"+loc, nil)
723         if err != nil {
724                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
725                 return false
726         }
727         recentTime, err := v.lastModified(recent)
728         if err != nil {
729                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
730                 return false
731         }
732
733         ageWhenTrashed := trashTime.Sub(recentTime)
734         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
735                 // No evidence of a race: block hasn't been written
736                 // since it became eligible for Trash. No fix needed.
737                 return false
738         }
739
740         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
741         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
742         err = v.safeCopy(loc, "trash/"+loc)
743         if err != nil {
744                 log.Printf("error: fixRace: %s", err)
745                 return false
746         }
747         return true
748 }
749
750 func (v *S3Volume) translateError(err error) error {
751         switch err := err.(type) {
752         case *s3.Error:
753                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
754                         strings.Contains(err.Error(), "Not Found") {
755                         return os.ErrNotExist
756                 }
757                 // Other 404 errors like NoSuchVersion and
758                 // NoSuchBucket are different problems which should
759                 // get called out downstream, so we don't convert them
760                 // to os.ErrNotExist.
761         }
762         return err
763 }
764
765 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
766 // and deletes them from the volume.
767 func (v *S3Volume) EmptyTrash() {
768         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
769
770         // Define "ready to delete" as "...when EmptyTrash started".
771         startT := time.Now()
772
773         emptyOneKey := func(trash *s3.Key) {
774                 loc := trash.Key[6:]
775                 if !v.isKeepBlock(loc) {
776                         return
777                 }
778                 atomic.AddInt64(&bytesInTrash, trash.Size)
779                 atomic.AddInt64(&blocksInTrash, 1)
780
781                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
782                 if err != nil {
783                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
784                         return
785                 }
786                 recent, err := v.bucket.Head("recent/"+loc, nil)
787                 if err != nil && os.IsNotExist(v.translateError(err)) {
788                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
789                         err = v.Untrash(loc)
790                         if err != nil {
791                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
792                         }
793                         return
794                 } else if err != nil {
795                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
796                         return
797                 }
798                 recentT, err := v.lastModified(recent)
799                 if err != nil {
800                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
801                         return
802                 }
803                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
804                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
805                                 // recent/loc is too old to protect
806                                 // loc from being Trashed again during
807                                 // the raceWindow that starts if we
808                                 // delete trash/X now.
809                                 //
810                                 // Note this means (TrashCheckInterval
811                                 // < BlobSignatureTTL - raceWindow) is
812                                 // necessary to avoid starvation.
813                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
814                                 v.fixRace(loc)
815                                 v.Touch(loc)
816                                 return
817                         }
818                         _, err := v.bucket.Head(loc, nil)
819                         if os.IsNotExist(err) {
820                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
821                                 v.fixRace(loc)
822                                 return
823                         } else if err != nil {
824                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
825                                 return
826                         }
827                 }
828                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
829                         return
830                 }
831                 err = v.bucket.Del(trash.Key)
832                 if err != nil {
833                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
834                         return
835                 }
836                 atomic.AddInt64(&bytesDeleted, trash.Size)
837                 atomic.AddInt64(&blocksDeleted, 1)
838
839                 _, err = v.bucket.Head(loc, nil)
840                 if err == nil {
841                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
842                         return
843                 }
844                 if !os.IsNotExist(v.translateError(err)) {
845                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
846                         return
847                 }
848                 err = v.bucket.Del("recent/" + loc)
849                 if err != nil {
850                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
851                 }
852         }
853
854         var wg sync.WaitGroup
855         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
856         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
857                 wg.Add(1)
858                 go func() {
859                         defer wg.Done()
860                         for key := range todo {
861                                 emptyOneKey(key)
862                         }
863                 }()
864         }
865
866         trashL := s3Lister{
867                 Bucket:   v.bucket.Bucket,
868                 Prefix:   "trash/",
869                 PageSize: v.IndexPageSize,
870         }
871         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
872                 todo <- trash
873         }
874         close(todo)
875         wg.Wait()
876
877         if err := trashL.Error(); err != nil {
878                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
879         }
880         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
881 }
882
883 type s3Lister struct {
884         Bucket     *s3.Bucket
885         Prefix     string
886         PageSize   int
887         nextMarker string
888         buf        []s3.Key
889         err        error
890 }
891
892 // First fetches the first page and returns the first item. It returns
893 // nil if the response is the empty set or an error occurs.
894 func (lister *s3Lister) First() *s3.Key {
895         lister.getPage()
896         return lister.pop()
897 }
898
899 // Next returns the next item, fetching the next page if necessary. It
900 // returns nil if the last available item has already been fetched, or
901 // an error occurs.
902 func (lister *s3Lister) Next() *s3.Key {
903         if len(lister.buf) == 0 && lister.nextMarker != "" {
904                 lister.getPage()
905         }
906         return lister.pop()
907 }
908
909 // Return the most recent error encountered by First or Next.
910 func (lister *s3Lister) Error() error {
911         return lister.err
912 }
913
914 func (lister *s3Lister) getPage() {
915         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
916         lister.nextMarker = ""
917         if err != nil {
918                 lister.err = err
919                 return
920         }
921         if resp.IsTruncated {
922                 lister.nextMarker = resp.NextMarker
923         }
924         lister.buf = make([]s3.Key, 0, len(resp.Contents))
925         for _, key := range resp.Contents {
926                 if !strings.HasPrefix(key.Key, lister.Prefix) {
927                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
928                         continue
929                 }
930                 lister.buf = append(lister.buf, key)
931         }
932 }
933
934 func (lister *s3Lister) pop() (k *s3.Key) {
935         if len(lister.buf) > 0 {
936                 k = &lister.buf[0]
937                 lister.buf = lister.buf[1:]
938         }
939         return
940 }
941
942 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
943 type s3bucket struct {
944         *s3.Bucket
945         stats s3bucketStats
946 }
947
948 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
949         rdr, err := b.Bucket.GetReader(path)
950         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
951         b.stats.TickErr(err)
952         return NewCountingReader(rdr, b.stats.TickInBytes), err
953 }
954
955 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
956         resp, err := b.Bucket.Head(path, headers)
957         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
958         b.stats.TickErr(err)
959         return resp, err
960 }
961
962 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
963         if length == 0 {
964                 // goamz will only send Content-Length: 0 when reader
965                 // is nil due to net.http.Request.ContentLength
966                 // behavior.  Otherwise, Content-Length header is
967                 // omitted which will cause some S3 services
968                 // (including AWS and Ceph RadosGW) to fail to create
969                 // empty objects.
970                 r = nil
971         } else {
972                 r = NewCountingReader(r, b.stats.TickOutBytes)
973         }
974         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
975         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
976         b.stats.TickErr(err)
977         return err
978 }
979
980 func (b *s3bucket) Del(path string) error {
981         err := b.Bucket.Del(path)
982         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
983         b.stats.TickErr(err)
984         return err
985 }
986
987 type s3bucketStats struct {
988         statsTicker
989         Ops     uint64
990         GetOps  uint64
991         PutOps  uint64
992         HeadOps uint64
993         DelOps  uint64
994         ListOps uint64
995 }
996
997 func (s *s3bucketStats) TickErr(err error) {
998         if err == nil {
999                 return
1000         }
1001         errType := fmt.Sprintf("%T", err)
1002         if err, ok := err.(*s3.Error); ok {
1003                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1004         }
1005         s.statsTicker.TickErr(err, errType)
1006 }