14397: Fix unchecked errors in S3 block listing.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "github.com/AdRoll/goamz/aws"
26         "github.com/AdRoll/goamz/s3"
27 )
28
29 const (
30         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
31         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
32 )
33
34 var (
35         // ErrS3TrashDisabled is returned by Trash if that operation
36         // is impossible with the current config.
37         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
38
39         s3AccessKeyFile string
40         s3SecretKeyFile string
41         s3RegionName    string
42         s3Endpoint      string
43         s3Replication   int
44         s3UnsafeDelete  bool
45         s3RaceWindow    time.Duration
46
47         s3ACL = s3.Private
48
49         zeroTime time.Time
50 )
51
52 const (
53         maxClockSkew  = 600 * time.Second
54         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
55 )
56
57 type s3VolumeAdder struct {
58         *Config
59 }
60
61 // String implements flag.Value
62 func (s *s3VolumeAdder) String() string {
63         return "-"
64 }
65
66 func (s *s3VolumeAdder) Set(bucketName string) error {
67         if bucketName == "" {
68                 return fmt.Errorf("no container name given")
69         }
70         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
71                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
72         }
73         if deprecated.flagSerializeIO {
74                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
75         }
76         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
77                 Bucket:        bucketName,
78                 AccessKeyFile: s3AccessKeyFile,
79                 SecretKeyFile: s3SecretKeyFile,
80                 Endpoint:      s3Endpoint,
81                 Region:        s3RegionName,
82                 RaceWindow:    arvados.Duration(s3RaceWindow),
83                 S3Replication: s3Replication,
84                 UnsafeDelete:  s3UnsafeDelete,
85                 ReadOnly:      deprecated.flagReadonly,
86                 IndexPageSize: 1000,
87         })
88         return nil
89 }
90
91 func s3regions() (okList []string) {
92         for r := range aws.Regions {
93                 okList = append(okList, r)
94         }
95         return
96 }
97
98 func init() {
99         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
100
101         flag.Var(&s3VolumeAdder{theConfig},
102                 "s3-bucket-volume",
103                 "Use the given bucket as a storage volume. Can be given multiple times.")
104         flag.StringVar(
105                 &s3RegionName,
106                 "s3-region",
107                 "",
108                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
109         flag.StringVar(
110                 &s3Endpoint,
111                 "s3-endpoint",
112                 "",
113                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
114         flag.StringVar(
115                 &s3AccessKeyFile,
116                 "s3-access-key-file",
117                 "",
118                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
119         flag.StringVar(
120                 &s3SecretKeyFile,
121                 "s3-secret-key-file",
122                 "",
123                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
124         flag.DurationVar(
125                 &s3RaceWindow,
126                 "s3-race-window",
127                 24*time.Hour,
128                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
129         flag.IntVar(
130                 &s3Replication,
131                 "s3-replication",
132                 2,
133                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
134         flag.BoolVar(
135                 &s3UnsafeDelete,
136                 "s3-unsafe-delete",
137                 false,
138                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
139 }
140
141 // S3Volume implements Volume using an S3 bucket.
142 type S3Volume struct {
143         AccessKeyFile      string
144         SecretKeyFile      string
145         Endpoint           string
146         Region             string
147         Bucket             string
148         LocationConstraint bool
149         IndexPageSize      int
150         S3Replication      int
151         ConnectTimeout     arvados.Duration
152         ReadTimeout        arvados.Duration
153         RaceWindow         arvados.Duration
154         ReadOnly           bool
155         UnsafeDelete       bool
156         StorageClasses     []string
157
158         bucket *s3bucket
159
160         startOnce sync.Once
161 }
162
163 // Examples implements VolumeWithExamples.
164 func (*S3Volume) Examples() []Volume {
165         return []Volume{
166                 &S3Volume{
167                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
168                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
169                         Endpoint:       "",
170                         Region:         "us-east-1",
171                         Bucket:         "example-bucket-name",
172                         IndexPageSize:  1000,
173                         S3Replication:  2,
174                         RaceWindow:     arvados.Duration(24 * time.Hour),
175                         ConnectTimeout: arvados.Duration(time.Minute),
176                         ReadTimeout:    arvados.Duration(5 * time.Minute),
177                 },
178                 &S3Volume{
179                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
180                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
181                         Endpoint:       "https://storage.googleapis.com",
182                         Region:         "",
183                         Bucket:         "example-bucket-name",
184                         IndexPageSize:  1000,
185                         S3Replication:  2,
186                         RaceWindow:     arvados.Duration(24 * time.Hour),
187                         ConnectTimeout: arvados.Duration(time.Minute),
188                         ReadTimeout:    arvados.Duration(5 * time.Minute),
189                 },
190         }
191 }
192
193 // Type implements Volume.
194 func (*S3Volume) Type() string {
195         return "S3"
196 }
197
198 // Start populates private fields and verifies the configuration is
199 // valid.
200 func (v *S3Volume) Start() error {
201         region, ok := aws.Regions[v.Region]
202         if v.Endpoint == "" {
203                 if !ok {
204                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
205                 }
206         } else if ok {
207                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
208                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
209         } else {
210                 region = aws.Region{
211                         Name:                 v.Region,
212                         S3Endpoint:           v.Endpoint,
213                         S3LocationConstraint: v.LocationConstraint,
214                 }
215         }
216
217         var err error
218         var auth aws.Auth
219         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
220         if err != nil {
221                 return err
222         }
223         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
224         if err != nil {
225                 return err
226         }
227
228         // Zero timeouts mean "wait forever", which is a bad
229         // default. Default to long timeouts instead.
230         if v.ConnectTimeout == 0 {
231                 v.ConnectTimeout = s3DefaultConnectTimeout
232         }
233         if v.ReadTimeout == 0 {
234                 v.ReadTimeout = s3DefaultReadTimeout
235         }
236
237         client := s3.New(auth, region)
238         if region.EC2Endpoint.Signer == aws.V4Signature {
239                 // Currently affects only eu-central-1
240                 client.Signature = aws.V4Signature
241         }
242         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
243         client.ReadTimeout = time.Duration(v.ReadTimeout)
244         v.bucket = &s3bucket{
245                 Bucket: &s3.Bucket{
246                         S3:   client,
247                         Name: v.Bucket,
248                 },
249         }
250         return nil
251 }
252
253 // DeviceID returns a globally unique ID for the storage bucket.
254 func (v *S3Volume) DeviceID() string {
255         return "s3://" + v.Endpoint + "/" + v.Bucket
256 }
257
258 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
259         ready := make(chan bool)
260         go func() {
261                 rdr, err = v.getReader(loc)
262                 close(ready)
263         }()
264         select {
265         case <-ready:
266                 return
267         case <-ctx.Done():
268                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
269                 go func() {
270                         <-ready
271                         if err == nil {
272                                 rdr.Close()
273                         }
274                 }()
275                 return nil, ctx.Err()
276         }
277 }
278
279 // getReader wraps (Bucket)GetReader.
280 //
281 // In situations where (Bucket)GetReader would fail because the block
282 // disappeared in a Trash race, getReader calls fixRace to recover the
283 // data, and tries again.
284 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
285         rdr, err = v.bucket.GetReader(loc)
286         err = v.translateError(err)
287         if err == nil || !os.IsNotExist(err) {
288                 return
289         }
290
291         _, err = v.bucket.Head("recent/"+loc, nil)
292         err = v.translateError(err)
293         if err != nil {
294                 // If we can't read recent/X, there's no point in
295                 // trying fixRace. Give up.
296                 return
297         }
298         if !v.fixRace(loc) {
299                 err = os.ErrNotExist
300                 return
301         }
302
303         rdr, err = v.bucket.GetReader(loc)
304         if err != nil {
305                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
306                 err = v.translateError(err)
307         }
308         return
309 }
310
311 // Get a block: copy the block data into buf, and return the number of
312 // bytes copied.
313 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
314         rdr, err := v.getReaderWithContext(ctx, loc)
315         if err != nil {
316                 return 0, err
317         }
318
319         var n int
320         ready := make(chan bool)
321         go func() {
322                 defer close(ready)
323
324                 defer rdr.Close()
325                 n, err = io.ReadFull(rdr, buf)
326
327                 switch err {
328                 case nil, io.EOF, io.ErrUnexpectedEOF:
329                         err = nil
330                 default:
331                         err = v.translateError(err)
332                 }
333         }()
334         select {
335         case <-ctx.Done():
336                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
337                 rdr.Close()
338                 // Must wait for ReadFull to return, to ensure it
339                 // doesn't write to buf after we return.
340                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
341                 <-ready
342                 return 0, ctx.Err()
343         case <-ready:
344                 return n, err
345         }
346 }
347
348 // Compare the given data with the stored data.
349 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
350         errChan := make(chan error, 1)
351         go func() {
352                 _, err := v.bucket.Head("recent/"+loc, nil)
353                 errChan <- err
354         }()
355         var err error
356         select {
357         case <-ctx.Done():
358                 return ctx.Err()
359         case err = <-errChan:
360         }
361         if err != nil {
362                 // Checking for "loc" itself here would interfere with
363                 // future GET requests.
364                 //
365                 // On AWS, if X doesn't exist, a HEAD or GET request
366                 // for X causes X's non-existence to be cached. Thus,
367                 // if we test for X, then create X and return a
368                 // signature to our client, the client might still get
369                 // 404 from all keepstores when trying to read it.
370                 //
371                 // To avoid this, we avoid doing HEAD X or GET X until
372                 // we know X has been written.
373                 //
374                 // Note that X might exist even though recent/X
375                 // doesn't: for example, the response to HEAD recent/X
376                 // might itself come from a stale cache. In such
377                 // cases, we will return a false negative and
378                 // PutHandler might needlessly create another replica
379                 // on a different volume. That's not ideal, but it's
380                 // better than passing the eventually-consistent
381                 // problem on to our clients.
382                 return v.translateError(err)
383         }
384         rdr, err := v.getReaderWithContext(ctx, loc)
385         if err != nil {
386                 return err
387         }
388         defer rdr.Close()
389         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
390 }
391
392 // Put writes a block.
393 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
394         if v.ReadOnly {
395                 return MethodDisabledError
396         }
397         var opts s3.Options
398         size := len(block)
399         if size > 0 {
400                 md5, err := hex.DecodeString(loc)
401                 if err != nil {
402                         return err
403                 }
404                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
405         }
406
407         // Send the block data through a pipe, so that (if we need to)
408         // we can close the pipe early and abandon our PutReader()
409         // goroutine, without worrying about PutReader() accessing our
410         // block buffer after we release it.
411         bufr, bufw := io.Pipe()
412         go func() {
413                 io.Copy(bufw, bytes.NewReader(block))
414                 bufw.Close()
415         }()
416
417         var err error
418         ready := make(chan bool)
419         go func() {
420                 defer func() {
421                         if ctx.Err() != nil {
422                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
423                         }
424                 }()
425                 defer close(ready)
426                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
427                 if err != nil {
428                         return
429                 }
430                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
431         }()
432         select {
433         case <-ctx.Done():
434                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
435                 // Our pipe might be stuck in Write(), waiting for
436                 // PutReader() to read. If so, un-stick it. This means
437                 // PutReader will get corrupt data, but that's OK: the
438                 // size and MD5 won't match, so the write will fail.
439                 go io.Copy(ioutil.Discard, bufr)
440                 // CloseWithError() will return once pending I/O is done.
441                 bufw.CloseWithError(ctx.Err())
442                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
443                 return ctx.Err()
444         case <-ready:
445                 // Unblock pipe in case PutReader did not consume it.
446                 io.Copy(ioutil.Discard, bufr)
447                 return v.translateError(err)
448         }
449 }
450
451 // Touch sets the timestamp for the given locator to the current time.
452 func (v *S3Volume) Touch(loc string) error {
453         if v.ReadOnly {
454                 return MethodDisabledError
455         }
456         _, err := v.bucket.Head(loc, nil)
457         err = v.translateError(err)
458         if os.IsNotExist(err) && v.fixRace(loc) {
459                 // The data object got trashed in a race, but fixRace
460                 // rescued it.
461         } else if err != nil {
462                 return err
463         }
464         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
465         return v.translateError(err)
466 }
467
468 // Mtime returns the stored timestamp for the given locator.
469 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
470         _, err := v.bucket.Head(loc, nil)
471         if err != nil {
472                 return zeroTime, v.translateError(err)
473         }
474         resp, err := v.bucket.Head("recent/"+loc, nil)
475         err = v.translateError(err)
476         if os.IsNotExist(err) {
477                 // The data object X exists, but recent/X is missing.
478                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
479                 if err != nil {
480                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
481                         return zeroTime, v.translateError(err)
482                 }
483                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
484                 resp, err = v.bucket.Head("recent/"+loc, nil)
485                 if err != nil {
486                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
487                         return zeroTime, v.translateError(err)
488                 }
489         } else if err != nil {
490                 // HEAD recent/X failed for some other reason.
491                 return zeroTime, err
492         }
493         return v.lastModified(resp)
494 }
495
496 // IndexTo writes a complete list of locators with the given prefix
497 // for which Get() can retrieve data.
498 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
499         // Use a merge sort to find matching sets of X and recent/X.
500         dataL := s3Lister{
501                 Bucket:   v.bucket.Bucket,
502                 Prefix:   prefix,
503                 PageSize: v.IndexPageSize,
504                 Stats:    &v.bucket.stats,
505         }
506         recentL := s3Lister{
507                 Bucket:   v.bucket.Bucket,
508                 Prefix:   "recent/" + prefix,
509                 PageSize: v.IndexPageSize,
510                 Stats:    &v.bucket.stats,
511         }
512         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
513                 if data.Key >= "g" {
514                         // Conveniently, "recent/*" and "trash/*" are
515                         // lexically greater than all hex-encoded data
516                         // hashes, so stopping here avoids iterating
517                         // over all of them needlessly with dataL.
518                         break
519                 }
520                 if !v.isKeepBlock(data.Key) {
521                         continue
522                 }
523
524                 // stamp is the list entry we should use to report the
525                 // last-modified time for this data block: it will be
526                 // the recent/X entry if one exists, otherwise the
527                 // entry for the data block itself.
528                 stamp := data
529
530                 // Advance to the corresponding recent/X marker, if any
531                 for recent != nil && recentL.Error() == nil {
532                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
533                                 recent = recentL.Next()
534                                 continue
535                         } else if cmp == 0 {
536                                 stamp = recent
537                                 recent = recentL.Next()
538                                 break
539                         } else {
540                                 // recent/X marker is missing: we'll
541                                 // use the timestamp on the data
542                                 // object.
543                                 break
544                         }
545                 }
546                 if err := recentL.Error(); err != nil {
547                         return err
548                 }
549                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
550                 if err != nil {
551                         return err
552                 }
553                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
554         }
555         return dataL.Error()
556 }
557
558 // Trash a Keep block.
559 func (v *S3Volume) Trash(loc string) error {
560         if v.ReadOnly {
561                 return MethodDisabledError
562         }
563         if t, err := v.Mtime(loc); err != nil {
564                 return err
565         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
566                 return nil
567         }
568         if theConfig.TrashLifetime == 0 {
569                 if !s3UnsafeDelete {
570                         return ErrS3TrashDisabled
571                 }
572                 return v.translateError(v.bucket.Del(loc))
573         }
574         err := v.checkRaceWindow(loc)
575         if err != nil {
576                 return err
577         }
578         err = v.safeCopy("trash/"+loc, loc)
579         if err != nil {
580                 return err
581         }
582         return v.translateError(v.bucket.Del(loc))
583 }
584
585 // checkRaceWindow returns a non-nil error if trash/loc is, or might
586 // be, in the race window (i.e., it's not safe to trash loc).
587 func (v *S3Volume) checkRaceWindow(loc string) error {
588         resp, err := v.bucket.Head("trash/"+loc, nil)
589         err = v.translateError(err)
590         if os.IsNotExist(err) {
591                 // OK, trash/X doesn't exist so we're not in the race
592                 // window
593                 return nil
594         } else if err != nil {
595                 // Error looking up trash/X. We don't know whether
596                 // we're in the race window
597                 return err
598         }
599         t, err := v.lastModified(resp)
600         if err != nil {
601                 // Can't parse timestamp
602                 return err
603         }
604         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
605         if safeWindow <= 0 {
606                 // We can't count on "touch trash/X" to prolong
607                 // trash/X's lifetime. The new timestamp might not
608                 // become visible until now+raceWindow, and EmptyTrash
609                 // is allowed to delete trash/X before then.
610                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
611         }
612         // trash/X exists, but it won't be eligible for deletion until
613         // after now+raceWindow, so it's safe to overwrite it.
614         return nil
615 }
616
617 // safeCopy calls PutCopy, and checks the response to make sure the
618 // copy succeeded and updated the timestamp on the destination object
619 // (PutCopy returns 200 OK if the request was received, even if the
620 // copy failed).
621 func (v *S3Volume) safeCopy(dst, src string) error {
622         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
623                 ContentType:       "application/octet-stream",
624                 MetadataDirective: "REPLACE",
625         }, v.bucket.Name+"/"+src)
626         err = v.translateError(err)
627         if os.IsNotExist(err) {
628                 return err
629         } else if err != nil {
630                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
631         }
632         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
633                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
634         } else if time.Now().Sub(t) > maxClockSkew {
635                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
636         }
637         return nil
638 }
639
640 // Get the LastModified header from resp, and parse it as RFC1123 or
641 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
642 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
643         s := resp.Header.Get("Last-Modified")
644         t, err = time.Parse(time.RFC1123, s)
645         if err != nil && s != "" {
646                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
647                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
648                 // as required by HTTP spec. If it's not a valid HTTP
649                 // header value, it's probably AWS (or s3test) giving
650                 // us a nearly-RFC1123 timestamp.
651                 t, err = time.Parse(nearlyRFC1123, s)
652         }
653         return
654 }
655
656 // Untrash moves block from trash back into store
657 func (v *S3Volume) Untrash(loc string) error {
658         err := v.safeCopy(loc, "trash/"+loc)
659         if err != nil {
660                 return err
661         }
662         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
663         return v.translateError(err)
664 }
665
666 // Status returns a *VolumeStatus representing the current in-use
667 // storage capacity and a fake available capacity that doesn't make
668 // the volume seem full or nearly-full.
669 func (v *S3Volume) Status() *VolumeStatus {
670         return &VolumeStatus{
671                 DeviceNum: 1,
672                 BytesFree: BlockSize * 1000,
673                 BytesUsed: 1,
674         }
675 }
676
677 // InternalStats returns bucket I/O and API call counters.
678 func (v *S3Volume) InternalStats() interface{} {
679         return &v.bucket.stats
680 }
681
682 // String implements fmt.Stringer.
683 func (v *S3Volume) String() string {
684         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
685 }
686
687 // Writable returns false if all future Put, Mtime, and Delete calls
688 // are expected to fail.
689 func (v *S3Volume) Writable() bool {
690         return !v.ReadOnly
691 }
692
693 // Replication returns the storage redundancy of the underlying
694 // device. Configured via command line flag.
695 func (v *S3Volume) Replication() int {
696         return v.S3Replication
697 }
698
699 // GetStorageClasses implements Volume
700 func (v *S3Volume) GetStorageClasses() []string {
701         return v.StorageClasses
702 }
703
704 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
705
706 func (v *S3Volume) isKeepBlock(s string) bool {
707         return s3KeepBlockRegexp.MatchString(s)
708 }
709
710 // fixRace(X) is called when "recent/X" exists but "X" doesn't
711 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
712 // there was a race between Put and Trash, fixRace recovers from the
713 // race by Untrashing the block.
714 func (v *S3Volume) fixRace(loc string) bool {
715         trash, err := v.bucket.Head("trash/"+loc, nil)
716         if err != nil {
717                 if !os.IsNotExist(v.translateError(err)) {
718                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
719                 }
720                 return false
721         }
722         trashTime, err := v.lastModified(trash)
723         if err != nil {
724                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
725                 return false
726         }
727
728         recent, err := v.bucket.Head("recent/"+loc, nil)
729         if err != nil {
730                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
731                 return false
732         }
733         recentTime, err := v.lastModified(recent)
734         if err != nil {
735                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
736                 return false
737         }
738
739         ageWhenTrashed := trashTime.Sub(recentTime)
740         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
741                 // No evidence of a race: block hasn't been written
742                 // since it became eligible for Trash. No fix needed.
743                 return false
744         }
745
746         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
747         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
748         err = v.safeCopy(loc, "trash/"+loc)
749         if err != nil {
750                 log.Printf("error: fixRace: %s", err)
751                 return false
752         }
753         return true
754 }
755
756 func (v *S3Volume) translateError(err error) error {
757         switch err := err.(type) {
758         case *s3.Error:
759                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
760                         strings.Contains(err.Error(), "Not Found") {
761                         return os.ErrNotExist
762                 }
763                 // Other 404 errors like NoSuchVersion and
764                 // NoSuchBucket are different problems which should
765                 // get called out downstream, so we don't convert them
766                 // to os.ErrNotExist.
767         }
768         return err
769 }
770
771 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
772 // and deletes them from the volume.
773 func (v *S3Volume) EmptyTrash() {
774         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
775
776         // Define "ready to delete" as "...when EmptyTrash started".
777         startT := time.Now()
778
779         emptyOneKey := func(trash *s3.Key) {
780                 loc := trash.Key[6:]
781                 if !v.isKeepBlock(loc) {
782                         return
783                 }
784                 atomic.AddInt64(&bytesInTrash, trash.Size)
785                 atomic.AddInt64(&blocksInTrash, 1)
786
787                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
788                 if err != nil {
789                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
790                         return
791                 }
792                 recent, err := v.bucket.Head("recent/"+loc, nil)
793                 if err != nil && os.IsNotExist(v.translateError(err)) {
794                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
795                         err = v.Untrash(loc)
796                         if err != nil {
797                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
798                         }
799                         return
800                 } else if err != nil {
801                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
802                         return
803                 }
804                 recentT, err := v.lastModified(recent)
805                 if err != nil {
806                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
807                         return
808                 }
809                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
810                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
811                                 // recent/loc is too old to protect
812                                 // loc from being Trashed again during
813                                 // the raceWindow that starts if we
814                                 // delete trash/X now.
815                                 //
816                                 // Note this means (TrashCheckInterval
817                                 // < BlobSignatureTTL - raceWindow) is
818                                 // necessary to avoid starvation.
819                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
820                                 v.fixRace(loc)
821                                 v.Touch(loc)
822                                 return
823                         }
824                         _, err := v.bucket.Head(loc, nil)
825                         if os.IsNotExist(err) {
826                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
827                                 v.fixRace(loc)
828                                 return
829                         } else if err != nil {
830                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
831                                 return
832                         }
833                 }
834                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
835                         return
836                 }
837                 err = v.bucket.Del(trash.Key)
838                 if err != nil {
839                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
840                         return
841                 }
842                 atomic.AddInt64(&bytesDeleted, trash.Size)
843                 atomic.AddInt64(&blocksDeleted, 1)
844
845                 _, err = v.bucket.Head(loc, nil)
846                 if err == nil {
847                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
848                         return
849                 }
850                 if !os.IsNotExist(v.translateError(err)) {
851                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
852                         return
853                 }
854                 err = v.bucket.Del("recent/" + loc)
855                 if err != nil {
856                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
857                 }
858         }
859
860         var wg sync.WaitGroup
861         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
862         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
863                 wg.Add(1)
864                 go func() {
865                         defer wg.Done()
866                         for key := range todo {
867                                 emptyOneKey(key)
868                         }
869                 }()
870         }
871
872         trashL := s3Lister{
873                 Bucket:   v.bucket.Bucket,
874                 Prefix:   "trash/",
875                 PageSize: v.IndexPageSize,
876                 Stats:    &v.bucket.stats,
877         }
878         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
879                 todo <- trash
880         }
881         close(todo)
882         wg.Wait()
883
884         if err := trashL.Error(); err != nil {
885                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
886         }
887         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
888 }
889
890 type s3Lister struct {
891         Bucket     *s3.Bucket
892         Prefix     string
893         PageSize   int
894         Stats      *s3bucketStats
895         nextMarker string
896         buf        []s3.Key
897         err        error
898 }
899
900 // First fetches the first page and returns the first item. It returns
901 // nil if the response is the empty set or an error occurs.
902 func (lister *s3Lister) First() *s3.Key {
903         lister.getPage()
904         return lister.pop()
905 }
906
907 // Next returns the next item, fetching the next page if necessary. It
908 // returns nil if the last available item has already been fetched, or
909 // an error occurs.
910 func (lister *s3Lister) Next() *s3.Key {
911         if len(lister.buf) == 0 && lister.nextMarker != "" {
912                 lister.getPage()
913         }
914         return lister.pop()
915 }
916
917 // Return the most recent error encountered by First or Next.
918 func (lister *s3Lister) Error() error {
919         return lister.err
920 }
921
922 func (lister *s3Lister) getPage() {
923         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
924         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
925         lister.nextMarker = ""
926         if err != nil {
927                 lister.err = err
928                 return
929         }
930         if resp.IsTruncated {
931                 lister.nextMarker = resp.NextMarker
932         }
933         lister.buf = make([]s3.Key, 0, len(resp.Contents))
934         for _, key := range resp.Contents {
935                 if !strings.HasPrefix(key.Key, lister.Prefix) {
936                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
937                         continue
938                 }
939                 lister.buf = append(lister.buf, key)
940         }
941 }
942
943 func (lister *s3Lister) pop() (k *s3.Key) {
944         if len(lister.buf) > 0 {
945                 k = &lister.buf[0]
946                 lister.buf = lister.buf[1:]
947         }
948         return
949 }
950
951 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
952 type s3bucket struct {
953         *s3.Bucket
954         stats s3bucketStats
955 }
956
957 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
958         rdr, err := b.Bucket.GetReader(path)
959         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
960         b.stats.TickErr(err)
961         return NewCountingReader(rdr, b.stats.TickInBytes), err
962 }
963
964 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
965         resp, err := b.Bucket.Head(path, headers)
966         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
967         b.stats.TickErr(err)
968         return resp, err
969 }
970
971 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
972         if length == 0 {
973                 // goamz will only send Content-Length: 0 when reader
974                 // is nil due to net.http.Request.ContentLength
975                 // behavior.  Otherwise, Content-Length header is
976                 // omitted which will cause some S3 services
977                 // (including AWS and Ceph RadosGW) to fail to create
978                 // empty objects.
979                 r = nil
980         } else {
981                 r = NewCountingReader(r, b.stats.TickOutBytes)
982         }
983         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
984         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
985         b.stats.TickErr(err)
986         return err
987 }
988
989 func (b *s3bucket) Del(path string) error {
990         err := b.Bucket.Del(path)
991         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
992         b.stats.TickErr(err)
993         return err
994 }
995
996 type s3bucketStats struct {
997         statsTicker
998         Ops     uint64
999         GetOps  uint64
1000         PutOps  uint64
1001         HeadOps uint64
1002         DelOps  uint64
1003         ListOps uint64
1004 }
1005
1006 func (s *s3bucketStats) TickErr(err error) {
1007         if err == nil {
1008                 return
1009         }
1010         errType := fmt.Sprintf("%T", err)
1011         if err, ok := err.(*s3.Error); ok {
1012                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1013         }
1014         s.statsTicker.TickErr(err, errType)
1015 }