14397: Fix S3 ListOps stats.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "flag"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "net/http"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "github.com/AdRoll/goamz/aws"
26         "github.com/AdRoll/goamz/s3"
27 )
28
29 const (
30         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
31         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
32 )
33
34 var (
35         // ErrS3TrashDisabled is returned by Trash if that operation
36         // is impossible with the current config.
37         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
38
39         s3AccessKeyFile string
40         s3SecretKeyFile string
41         s3RegionName    string
42         s3Endpoint      string
43         s3Replication   int
44         s3UnsafeDelete  bool
45         s3RaceWindow    time.Duration
46
47         s3ACL = s3.Private
48
49         zeroTime time.Time
50 )
51
52 const (
53         maxClockSkew  = 600 * time.Second
54         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
55 )
56
57 type s3VolumeAdder struct {
58         *Config
59 }
60
61 // String implements flag.Value
62 func (s *s3VolumeAdder) String() string {
63         return "-"
64 }
65
66 func (s *s3VolumeAdder) Set(bucketName string) error {
67         if bucketName == "" {
68                 return fmt.Errorf("no container name given")
69         }
70         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
71                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
72         }
73         if deprecated.flagSerializeIO {
74                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
75         }
76         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
77                 Bucket:        bucketName,
78                 AccessKeyFile: s3AccessKeyFile,
79                 SecretKeyFile: s3SecretKeyFile,
80                 Endpoint:      s3Endpoint,
81                 Region:        s3RegionName,
82                 RaceWindow:    arvados.Duration(s3RaceWindow),
83                 S3Replication: s3Replication,
84                 UnsafeDelete:  s3UnsafeDelete,
85                 ReadOnly:      deprecated.flagReadonly,
86                 IndexPageSize: 1000,
87         })
88         return nil
89 }
90
91 func s3regions() (okList []string) {
92         for r := range aws.Regions {
93                 okList = append(okList, r)
94         }
95         return
96 }
97
98 func init() {
99         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
100
101         flag.Var(&s3VolumeAdder{theConfig},
102                 "s3-bucket-volume",
103                 "Use the given bucket as a storage volume. Can be given multiple times.")
104         flag.StringVar(
105                 &s3RegionName,
106                 "s3-region",
107                 "",
108                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
109         flag.StringVar(
110                 &s3Endpoint,
111                 "s3-endpoint",
112                 "",
113                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
114         flag.StringVar(
115                 &s3AccessKeyFile,
116                 "s3-access-key-file",
117                 "",
118                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
119         flag.StringVar(
120                 &s3SecretKeyFile,
121                 "s3-secret-key-file",
122                 "",
123                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
124         flag.DurationVar(
125                 &s3RaceWindow,
126                 "s3-race-window",
127                 24*time.Hour,
128                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
129         flag.IntVar(
130                 &s3Replication,
131                 "s3-replication",
132                 2,
133                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
134         flag.BoolVar(
135                 &s3UnsafeDelete,
136                 "s3-unsafe-delete",
137                 false,
138                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
139 }
140
141 // S3Volume implements Volume using an S3 bucket.
142 type S3Volume struct {
143         AccessKeyFile      string
144         SecretKeyFile      string
145         Endpoint           string
146         Region             string
147         Bucket             string
148         LocationConstraint bool
149         IndexPageSize      int
150         S3Replication      int
151         ConnectTimeout     arvados.Duration
152         ReadTimeout        arvados.Duration
153         RaceWindow         arvados.Duration
154         ReadOnly           bool
155         UnsafeDelete       bool
156         StorageClasses     []string
157
158         bucket *s3bucket
159
160         startOnce sync.Once
161 }
162
163 // Examples implements VolumeWithExamples.
164 func (*S3Volume) Examples() []Volume {
165         return []Volume{
166                 &S3Volume{
167                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
168                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
169                         Endpoint:       "",
170                         Region:         "us-east-1",
171                         Bucket:         "example-bucket-name",
172                         IndexPageSize:  1000,
173                         S3Replication:  2,
174                         RaceWindow:     arvados.Duration(24 * time.Hour),
175                         ConnectTimeout: arvados.Duration(time.Minute),
176                         ReadTimeout:    arvados.Duration(5 * time.Minute),
177                 },
178                 &S3Volume{
179                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
180                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
181                         Endpoint:       "https://storage.googleapis.com",
182                         Region:         "",
183                         Bucket:         "example-bucket-name",
184                         IndexPageSize:  1000,
185                         S3Replication:  2,
186                         RaceWindow:     arvados.Duration(24 * time.Hour),
187                         ConnectTimeout: arvados.Duration(time.Minute),
188                         ReadTimeout:    arvados.Duration(5 * time.Minute),
189                 },
190         }
191 }
192
193 // Type implements Volume.
194 func (*S3Volume) Type() string {
195         return "S3"
196 }
197
198 // Start populates private fields and verifies the configuration is
199 // valid.
200 func (v *S3Volume) Start() error {
201         region, ok := aws.Regions[v.Region]
202         if v.Endpoint == "" {
203                 if !ok {
204                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
205                 }
206         } else if ok {
207                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
208                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
209         } else {
210                 region = aws.Region{
211                         Name:                 v.Region,
212                         S3Endpoint:           v.Endpoint,
213                         S3LocationConstraint: v.LocationConstraint,
214                 }
215         }
216
217         var err error
218         var auth aws.Auth
219         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
220         if err != nil {
221                 return err
222         }
223         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
224         if err != nil {
225                 return err
226         }
227
228         // Zero timeouts mean "wait forever", which is a bad
229         // default. Default to long timeouts instead.
230         if v.ConnectTimeout == 0 {
231                 v.ConnectTimeout = s3DefaultConnectTimeout
232         }
233         if v.ReadTimeout == 0 {
234                 v.ReadTimeout = s3DefaultReadTimeout
235         }
236
237         client := s3.New(auth, region)
238         if region.EC2Endpoint.Signer == aws.V4Signature {
239                 // Currently affects only eu-central-1
240                 client.Signature = aws.V4Signature
241         }
242         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
243         client.ReadTimeout = time.Duration(v.ReadTimeout)
244         v.bucket = &s3bucket{
245                 Bucket: &s3.Bucket{
246                         S3:   client,
247                         Name: v.Bucket,
248                 },
249         }
250         return nil
251 }
252
253 // DeviceID returns a globally unique ID for the storage bucket.
254 func (v *S3Volume) DeviceID() string {
255         return "s3://" + v.Endpoint + "/" + v.Bucket
256 }
257
258 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
259         ready := make(chan bool)
260         go func() {
261                 rdr, err = v.getReader(loc)
262                 close(ready)
263         }()
264         select {
265         case <-ready:
266                 return
267         case <-ctx.Done():
268                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
269                 go func() {
270                         <-ready
271                         if err == nil {
272                                 rdr.Close()
273                         }
274                 }()
275                 return nil, ctx.Err()
276         }
277 }
278
279 // getReader wraps (Bucket)GetReader.
280 //
281 // In situations where (Bucket)GetReader would fail because the block
282 // disappeared in a Trash race, getReader calls fixRace to recover the
283 // data, and tries again.
284 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
285         rdr, err = v.bucket.GetReader(loc)
286         err = v.translateError(err)
287         if err == nil || !os.IsNotExist(err) {
288                 return
289         }
290
291         _, err = v.bucket.Head("recent/"+loc, nil)
292         err = v.translateError(err)
293         if err != nil {
294                 // If we can't read recent/X, there's no point in
295                 // trying fixRace. Give up.
296                 return
297         }
298         if !v.fixRace(loc) {
299                 err = os.ErrNotExist
300                 return
301         }
302
303         rdr, err = v.bucket.GetReader(loc)
304         if err != nil {
305                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
306                 err = v.translateError(err)
307         }
308         return
309 }
310
311 // Get a block: copy the block data into buf, and return the number of
312 // bytes copied.
313 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
314         rdr, err := v.getReaderWithContext(ctx, loc)
315         if err != nil {
316                 return 0, err
317         }
318
319         var n int
320         ready := make(chan bool)
321         go func() {
322                 defer close(ready)
323
324                 defer rdr.Close()
325                 n, err = io.ReadFull(rdr, buf)
326
327                 switch err {
328                 case nil, io.EOF, io.ErrUnexpectedEOF:
329                         err = nil
330                 default:
331                         err = v.translateError(err)
332                 }
333         }()
334         select {
335         case <-ctx.Done():
336                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
337                 rdr.Close()
338                 // Must wait for ReadFull to return, to ensure it
339                 // doesn't write to buf after we return.
340                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
341                 <-ready
342                 return 0, ctx.Err()
343         case <-ready:
344                 return n, err
345         }
346 }
347
348 // Compare the given data with the stored data.
349 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
350         errChan := make(chan error, 1)
351         go func() {
352                 _, err := v.bucket.Head("recent/"+loc, nil)
353                 errChan <- err
354         }()
355         var err error
356         select {
357         case <-ctx.Done():
358                 return ctx.Err()
359         case err = <-errChan:
360         }
361         if err != nil {
362                 // Checking for "loc" itself here would interfere with
363                 // future GET requests.
364                 //
365                 // On AWS, if X doesn't exist, a HEAD or GET request
366                 // for X causes X's non-existence to be cached. Thus,
367                 // if we test for X, then create X and return a
368                 // signature to our client, the client might still get
369                 // 404 from all keepstores when trying to read it.
370                 //
371                 // To avoid this, we avoid doing HEAD X or GET X until
372                 // we know X has been written.
373                 //
374                 // Note that X might exist even though recent/X
375                 // doesn't: for example, the response to HEAD recent/X
376                 // might itself come from a stale cache. In such
377                 // cases, we will return a false negative and
378                 // PutHandler might needlessly create another replica
379                 // on a different volume. That's not ideal, but it's
380                 // better than passing the eventually-consistent
381                 // problem on to our clients.
382                 return v.translateError(err)
383         }
384         rdr, err := v.getReaderWithContext(ctx, loc)
385         if err != nil {
386                 return err
387         }
388         defer rdr.Close()
389         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
390 }
391
392 // Put writes a block.
393 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
394         if v.ReadOnly {
395                 return MethodDisabledError
396         }
397         var opts s3.Options
398         size := len(block)
399         if size > 0 {
400                 md5, err := hex.DecodeString(loc)
401                 if err != nil {
402                         return err
403                 }
404                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
405         }
406
407         // Send the block data through a pipe, so that (if we need to)
408         // we can close the pipe early and abandon our PutReader()
409         // goroutine, without worrying about PutReader() accessing our
410         // block buffer after we release it.
411         bufr, bufw := io.Pipe()
412         go func() {
413                 io.Copy(bufw, bytes.NewReader(block))
414                 bufw.Close()
415         }()
416
417         var err error
418         ready := make(chan bool)
419         go func() {
420                 defer func() {
421                         if ctx.Err() != nil {
422                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
423                         }
424                 }()
425                 defer close(ready)
426                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
427                 if err != nil {
428                         return
429                 }
430                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
431         }()
432         select {
433         case <-ctx.Done():
434                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
435                 // Our pipe might be stuck in Write(), waiting for
436                 // PutReader() to read. If so, un-stick it. This means
437                 // PutReader will get corrupt data, but that's OK: the
438                 // size and MD5 won't match, so the write will fail.
439                 go io.Copy(ioutil.Discard, bufr)
440                 // CloseWithError() will return once pending I/O is done.
441                 bufw.CloseWithError(ctx.Err())
442                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
443                 return ctx.Err()
444         case <-ready:
445                 // Unblock pipe in case PutReader did not consume it.
446                 io.Copy(ioutil.Discard, bufr)
447                 return v.translateError(err)
448         }
449 }
450
451 // Touch sets the timestamp for the given locator to the current time.
452 func (v *S3Volume) Touch(loc string) error {
453         if v.ReadOnly {
454                 return MethodDisabledError
455         }
456         _, err := v.bucket.Head(loc, nil)
457         err = v.translateError(err)
458         if os.IsNotExist(err) && v.fixRace(loc) {
459                 // The data object got trashed in a race, but fixRace
460                 // rescued it.
461         } else if err != nil {
462                 return err
463         }
464         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
465         return v.translateError(err)
466 }
467
468 // Mtime returns the stored timestamp for the given locator.
469 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
470         _, err := v.bucket.Head(loc, nil)
471         if err != nil {
472                 return zeroTime, v.translateError(err)
473         }
474         resp, err := v.bucket.Head("recent/"+loc, nil)
475         err = v.translateError(err)
476         if os.IsNotExist(err) {
477                 // The data object X exists, but recent/X is missing.
478                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
479                 if err != nil {
480                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
481                         return zeroTime, v.translateError(err)
482                 }
483                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
484                 resp, err = v.bucket.Head("recent/"+loc, nil)
485                 if err != nil {
486                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
487                         return zeroTime, v.translateError(err)
488                 }
489         } else if err != nil {
490                 // HEAD recent/X failed for some other reason.
491                 return zeroTime, err
492         }
493         return v.lastModified(resp)
494 }
495
496 // IndexTo writes a complete list of locators with the given prefix
497 // for which Get() can retrieve data.
498 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
499         // Use a merge sort to find matching sets of X and recent/X.
500         dataL := s3Lister{
501                 Bucket:   v.bucket.Bucket,
502                 Prefix:   prefix,
503                 PageSize: v.IndexPageSize,
504                 Stats:    &v.bucket.stats,
505         }
506         recentL := s3Lister{
507                 Bucket:   v.bucket.Bucket,
508                 Prefix:   "recent/" + prefix,
509                 PageSize: v.IndexPageSize,
510                 Stats:    &v.bucket.stats,
511         }
512         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
513                 v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
514                 if data.Key >= "g" {
515                         // Conveniently, "recent/*" and "trash/*" are
516                         // lexically greater than all hex-encoded data
517                         // hashes, so stopping here avoids iterating
518                         // over all of them needlessly with dataL.
519                         break
520                 }
521                 if !v.isKeepBlock(data.Key) {
522                         continue
523                 }
524
525                 // stamp is the list entry we should use to report the
526                 // last-modified time for this data block: it will be
527                 // the recent/X entry if one exists, otherwise the
528                 // entry for the data block itself.
529                 stamp := data
530
531                 // Advance to the corresponding recent/X marker, if any
532                 for recent != nil {
533                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
534                                 recent = recentL.Next()
535                                 continue
536                         } else if cmp == 0 {
537                                 stamp = recent
538                                 recent = recentL.Next()
539                                 break
540                         } else {
541                                 // recent/X marker is missing: we'll
542                                 // use the timestamp on the data
543                                 // object.
544                                 break
545                         }
546                 }
547                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
548                 if err != nil {
549                         return err
550                 }
551                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
552         }
553         return nil
554 }
555
556 // Trash a Keep block.
557 func (v *S3Volume) Trash(loc string) error {
558         if v.ReadOnly {
559                 return MethodDisabledError
560         }
561         if t, err := v.Mtime(loc); err != nil {
562                 return err
563         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
564                 return nil
565         }
566         if theConfig.TrashLifetime == 0 {
567                 if !s3UnsafeDelete {
568                         return ErrS3TrashDisabled
569                 }
570                 return v.translateError(v.bucket.Del(loc))
571         }
572         err := v.checkRaceWindow(loc)
573         if err != nil {
574                 return err
575         }
576         err = v.safeCopy("trash/"+loc, loc)
577         if err != nil {
578                 return err
579         }
580         return v.translateError(v.bucket.Del(loc))
581 }
582
583 // checkRaceWindow returns a non-nil error if trash/loc is, or might
584 // be, in the race window (i.e., it's not safe to trash loc).
585 func (v *S3Volume) checkRaceWindow(loc string) error {
586         resp, err := v.bucket.Head("trash/"+loc, nil)
587         err = v.translateError(err)
588         if os.IsNotExist(err) {
589                 // OK, trash/X doesn't exist so we're not in the race
590                 // window
591                 return nil
592         } else if err != nil {
593                 // Error looking up trash/X. We don't know whether
594                 // we're in the race window
595                 return err
596         }
597         t, err := v.lastModified(resp)
598         if err != nil {
599                 // Can't parse timestamp
600                 return err
601         }
602         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
603         if safeWindow <= 0 {
604                 // We can't count on "touch trash/X" to prolong
605                 // trash/X's lifetime. The new timestamp might not
606                 // become visible until now+raceWindow, and EmptyTrash
607                 // is allowed to delete trash/X before then.
608                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
609         }
610         // trash/X exists, but it won't be eligible for deletion until
611         // after now+raceWindow, so it's safe to overwrite it.
612         return nil
613 }
614
615 // safeCopy calls PutCopy, and checks the response to make sure the
616 // copy succeeded and updated the timestamp on the destination object
617 // (PutCopy returns 200 OK if the request was received, even if the
618 // copy failed).
619 func (v *S3Volume) safeCopy(dst, src string) error {
620         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
621                 ContentType:       "application/octet-stream",
622                 MetadataDirective: "REPLACE",
623         }, v.bucket.Name+"/"+src)
624         err = v.translateError(err)
625         if os.IsNotExist(err) {
626                 return err
627         } else if err != nil {
628                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
629         }
630         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
631                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
632         } else if time.Now().Sub(t) > maxClockSkew {
633                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
634         }
635         return nil
636 }
637
638 // Get the LastModified header from resp, and parse it as RFC1123 or
639 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
640 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
641         s := resp.Header.Get("Last-Modified")
642         t, err = time.Parse(time.RFC1123, s)
643         if err != nil && s != "" {
644                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
645                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
646                 // as required by HTTP spec. If it's not a valid HTTP
647                 // header value, it's probably AWS (or s3test) giving
648                 // us a nearly-RFC1123 timestamp.
649                 t, err = time.Parse(nearlyRFC1123, s)
650         }
651         return
652 }
653
654 // Untrash moves block from trash back into store
655 func (v *S3Volume) Untrash(loc string) error {
656         err := v.safeCopy(loc, "trash/"+loc)
657         if err != nil {
658                 return err
659         }
660         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
661         return v.translateError(err)
662 }
663
664 // Status returns a *VolumeStatus representing the current in-use
665 // storage capacity and a fake available capacity that doesn't make
666 // the volume seem full or nearly-full.
667 func (v *S3Volume) Status() *VolumeStatus {
668         return &VolumeStatus{
669                 DeviceNum: 1,
670                 BytesFree: BlockSize * 1000,
671                 BytesUsed: 1,
672         }
673 }
674
675 // InternalStats returns bucket I/O and API call counters.
676 func (v *S3Volume) InternalStats() interface{} {
677         return &v.bucket.stats
678 }
679
680 // String implements fmt.Stringer.
681 func (v *S3Volume) String() string {
682         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
683 }
684
685 // Writable returns false if all future Put, Mtime, and Delete calls
686 // are expected to fail.
687 func (v *S3Volume) Writable() bool {
688         return !v.ReadOnly
689 }
690
691 // Replication returns the storage redundancy of the underlying
692 // device. Configured via command line flag.
693 func (v *S3Volume) Replication() int {
694         return v.S3Replication
695 }
696
697 // GetStorageClasses implements Volume
698 func (v *S3Volume) GetStorageClasses() []string {
699         return v.StorageClasses
700 }
701
702 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
703
704 func (v *S3Volume) isKeepBlock(s string) bool {
705         return s3KeepBlockRegexp.MatchString(s)
706 }
707
708 // fixRace(X) is called when "recent/X" exists but "X" doesn't
709 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
710 // there was a race between Put and Trash, fixRace recovers from the
711 // race by Untrashing the block.
712 func (v *S3Volume) fixRace(loc string) bool {
713         trash, err := v.bucket.Head("trash/"+loc, nil)
714         if err != nil {
715                 if !os.IsNotExist(v.translateError(err)) {
716                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
717                 }
718                 return false
719         }
720         trashTime, err := v.lastModified(trash)
721         if err != nil {
722                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
723                 return false
724         }
725
726         recent, err := v.bucket.Head("recent/"+loc, nil)
727         if err != nil {
728                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
729                 return false
730         }
731         recentTime, err := v.lastModified(recent)
732         if err != nil {
733                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
734                 return false
735         }
736
737         ageWhenTrashed := trashTime.Sub(recentTime)
738         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
739                 // No evidence of a race: block hasn't been written
740                 // since it became eligible for Trash. No fix needed.
741                 return false
742         }
743
744         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
745         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
746         err = v.safeCopy(loc, "trash/"+loc)
747         if err != nil {
748                 log.Printf("error: fixRace: %s", err)
749                 return false
750         }
751         return true
752 }
753
754 func (v *S3Volume) translateError(err error) error {
755         switch err := err.(type) {
756         case *s3.Error:
757                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
758                         strings.Contains(err.Error(), "Not Found") {
759                         return os.ErrNotExist
760                 }
761                 // Other 404 errors like NoSuchVersion and
762                 // NoSuchBucket are different problems which should
763                 // get called out downstream, so we don't convert them
764                 // to os.ErrNotExist.
765         }
766         return err
767 }
768
769 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
770 // and deletes them from the volume.
771 func (v *S3Volume) EmptyTrash() {
772         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
773
774         // Define "ready to delete" as "...when EmptyTrash started".
775         startT := time.Now()
776
777         emptyOneKey := func(trash *s3.Key) {
778                 loc := trash.Key[6:]
779                 if !v.isKeepBlock(loc) {
780                         return
781                 }
782                 atomic.AddInt64(&bytesInTrash, trash.Size)
783                 atomic.AddInt64(&blocksInTrash, 1)
784
785                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
786                 if err != nil {
787                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
788                         return
789                 }
790                 recent, err := v.bucket.Head("recent/"+loc, nil)
791                 if err != nil && os.IsNotExist(v.translateError(err)) {
792                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
793                         err = v.Untrash(loc)
794                         if err != nil {
795                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
796                         }
797                         return
798                 } else if err != nil {
799                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
800                         return
801                 }
802                 recentT, err := v.lastModified(recent)
803                 if err != nil {
804                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
805                         return
806                 }
807                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
808                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
809                                 // recent/loc is too old to protect
810                                 // loc from being Trashed again during
811                                 // the raceWindow that starts if we
812                                 // delete trash/X now.
813                                 //
814                                 // Note this means (TrashCheckInterval
815                                 // < BlobSignatureTTL - raceWindow) is
816                                 // necessary to avoid starvation.
817                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
818                                 v.fixRace(loc)
819                                 v.Touch(loc)
820                                 return
821                         }
822                         _, err := v.bucket.Head(loc, nil)
823                         if os.IsNotExist(err) {
824                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
825                                 v.fixRace(loc)
826                                 return
827                         } else if err != nil {
828                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
829                                 return
830                         }
831                 }
832                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
833                         return
834                 }
835                 err = v.bucket.Del(trash.Key)
836                 if err != nil {
837                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
838                         return
839                 }
840                 atomic.AddInt64(&bytesDeleted, trash.Size)
841                 atomic.AddInt64(&blocksDeleted, 1)
842
843                 _, err = v.bucket.Head(loc, nil)
844                 if err == nil {
845                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
846                         return
847                 }
848                 if !os.IsNotExist(v.translateError(err)) {
849                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
850                         return
851                 }
852                 err = v.bucket.Del("recent/" + loc)
853                 if err != nil {
854                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
855                 }
856         }
857
858         var wg sync.WaitGroup
859         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
860         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
861                 wg.Add(1)
862                 go func() {
863                         defer wg.Done()
864                         for key := range todo {
865                                 emptyOneKey(key)
866                         }
867                 }()
868         }
869
870         trashL := s3Lister{
871                 Bucket:   v.bucket.Bucket,
872                 Prefix:   "trash/",
873                 PageSize: v.IndexPageSize,
874                 Stats:    &v.bucket.stats,
875         }
876         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
877                 todo <- trash
878         }
879         close(todo)
880         wg.Wait()
881
882         if err := trashL.Error(); err != nil {
883                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
884         }
885         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
886 }
887
888 type s3Lister struct {
889         Bucket     *s3.Bucket
890         Prefix     string
891         PageSize   int
892         Stats      *s3bucketStats
893         nextMarker string
894         buf        []s3.Key
895         err        error
896 }
897
898 // First fetches the first page and returns the first item. It returns
899 // nil if the response is the empty set or an error occurs.
900 func (lister *s3Lister) First() *s3.Key {
901         lister.getPage()
902         return lister.pop()
903 }
904
905 // Next returns the next item, fetching the next page if necessary. It
906 // returns nil if the last available item has already been fetched, or
907 // an error occurs.
908 func (lister *s3Lister) Next() *s3.Key {
909         if len(lister.buf) == 0 && lister.nextMarker != "" {
910                 lister.getPage()
911         }
912         return lister.pop()
913 }
914
915 // Return the most recent error encountered by First or Next.
916 func (lister *s3Lister) Error() error {
917         return lister.err
918 }
919
920 func (lister *s3Lister) getPage() {
921         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
922         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
923         lister.nextMarker = ""
924         if err != nil {
925                 lister.err = err
926                 return
927         }
928         if resp.IsTruncated {
929                 lister.nextMarker = resp.NextMarker
930         }
931         lister.buf = make([]s3.Key, 0, len(resp.Contents))
932         for _, key := range resp.Contents {
933                 if !strings.HasPrefix(key.Key, lister.Prefix) {
934                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
935                         continue
936                 }
937                 lister.buf = append(lister.buf, key)
938         }
939 }
940
941 func (lister *s3Lister) pop() (k *s3.Key) {
942         if len(lister.buf) > 0 {
943                 k = &lister.buf[0]
944                 lister.buf = lister.buf[1:]
945         }
946         return
947 }
948
949 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
950 type s3bucket struct {
951         *s3.Bucket
952         stats s3bucketStats
953 }
954
955 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
956         rdr, err := b.Bucket.GetReader(path)
957         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
958         b.stats.TickErr(err)
959         return NewCountingReader(rdr, b.stats.TickInBytes), err
960 }
961
962 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
963         resp, err := b.Bucket.Head(path, headers)
964         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
965         b.stats.TickErr(err)
966         return resp, err
967 }
968
969 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
970         if length == 0 {
971                 // goamz will only send Content-Length: 0 when reader
972                 // is nil due to net.http.Request.ContentLength
973                 // behavior.  Otherwise, Content-Length header is
974                 // omitted which will cause some S3 services
975                 // (including AWS and Ceph RadosGW) to fail to create
976                 // empty objects.
977                 r = nil
978         } else {
979                 r = NewCountingReader(r, b.stats.TickOutBytes)
980         }
981         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
982         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
983         b.stats.TickErr(err)
984         return err
985 }
986
987 func (b *s3bucket) Del(path string) error {
988         err := b.Bucket.Del(path)
989         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
990         b.stats.TickErr(err)
991         return err
992 }
993
994 type s3bucketStats struct {
995         statsTicker
996         Ops     uint64
997         GetOps  uint64
998         PutOps  uint64
999         HeadOps uint64
1000         DelOps  uint64
1001         ListOps uint64
1002 }
1003
1004 func (s *s3bucketStats) TickErr(err error) {
1005         if err == nil {
1006                 return
1007         }
1008         errType := fmt.Sprintf("%T", err)
1009         if err, ok := err.(*s3.Error); ok {
1010                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1011         }
1012         s.statsTicker.TickErr(err, errType)
1013 }