Merge branch '14397-keepstore-v4-signature'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/sha256"
11         "encoding/base64"
12         "encoding/hex"
13         "flag"
14         "fmt"
15         "io"
16         "io/ioutil"
17         "net/http"
18         "os"
19         "regexp"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         "git.curoverse.com/arvados.git/sdk/go/arvados"
26         "github.com/AdRoll/goamz/aws"
27         "github.com/AdRoll/goamz/s3"
28 )
29
30 const (
31         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
32         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
33 )
34
35 var (
36         // ErrS3TrashDisabled is returned by Trash if that operation
37         // is impossible with the current config.
38         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
39
40         s3AccessKeyFile string
41         s3SecretKeyFile string
42         s3RegionName    string
43         s3Endpoint      string
44         s3Replication   int
45         s3UnsafeDelete  bool
46         s3RaceWindow    time.Duration
47
48         s3ACL = s3.Private
49
50         zeroTime time.Time
51 )
52
53 const (
54         maxClockSkew  = 600 * time.Second
55         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
56 )
57
58 type s3VolumeAdder struct {
59         *Config
60 }
61
62 // String implements flag.Value
63 func (s *s3VolumeAdder) String() string {
64         return "-"
65 }
66
67 func (s *s3VolumeAdder) Set(bucketName string) error {
68         if bucketName == "" {
69                 return fmt.Errorf("no container name given")
70         }
71         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
72                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
73         }
74         if deprecated.flagSerializeIO {
75                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
76         }
77         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
78                 Bucket:        bucketName,
79                 AccessKeyFile: s3AccessKeyFile,
80                 SecretKeyFile: s3SecretKeyFile,
81                 Endpoint:      s3Endpoint,
82                 Region:        s3RegionName,
83                 RaceWindow:    arvados.Duration(s3RaceWindow),
84                 S3Replication: s3Replication,
85                 UnsafeDelete:  s3UnsafeDelete,
86                 ReadOnly:      deprecated.flagReadonly,
87                 IndexPageSize: 1000,
88         })
89         return nil
90 }
91
92 func s3regions() (okList []string) {
93         for r := range aws.Regions {
94                 okList = append(okList, r)
95         }
96         return
97 }
98
99 func init() {
100         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
101
102         flag.Var(&s3VolumeAdder{theConfig},
103                 "s3-bucket-volume",
104                 "Use the given bucket as a storage volume. Can be given multiple times.")
105         flag.StringVar(
106                 &s3RegionName,
107                 "s3-region",
108                 "",
109                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
110         flag.StringVar(
111                 &s3Endpoint,
112                 "s3-endpoint",
113                 "",
114                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
115         flag.StringVar(
116                 &s3AccessKeyFile,
117                 "s3-access-key-file",
118                 "",
119                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
120         flag.StringVar(
121                 &s3SecretKeyFile,
122                 "s3-secret-key-file",
123                 "",
124                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
125         flag.DurationVar(
126                 &s3RaceWindow,
127                 "s3-race-window",
128                 24*time.Hour,
129                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
130         flag.IntVar(
131                 &s3Replication,
132                 "s3-replication",
133                 2,
134                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
135         flag.BoolVar(
136                 &s3UnsafeDelete,
137                 "s3-unsafe-delete",
138                 false,
139                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
140 }
141
142 // S3Volume implements Volume using an S3 bucket.
143 type S3Volume struct {
144         AccessKeyFile      string
145         SecretKeyFile      string
146         Endpoint           string
147         Region             string
148         Bucket             string
149         LocationConstraint bool
150         IndexPageSize      int
151         S3Replication      int
152         ConnectTimeout     arvados.Duration
153         ReadTimeout        arvados.Duration
154         RaceWindow         arvados.Duration
155         ReadOnly           bool
156         UnsafeDelete       bool
157         StorageClasses     []string
158
159         bucket *s3bucket
160
161         startOnce sync.Once
162 }
163
164 // Examples implements VolumeWithExamples.
165 func (*S3Volume) Examples() []Volume {
166         return []Volume{
167                 &S3Volume{
168                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
169                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
170                         Endpoint:       "",
171                         Region:         "us-east-1",
172                         Bucket:         "example-bucket-name",
173                         IndexPageSize:  1000,
174                         S3Replication:  2,
175                         RaceWindow:     arvados.Duration(24 * time.Hour),
176                         ConnectTimeout: arvados.Duration(time.Minute),
177                         ReadTimeout:    arvados.Duration(5 * time.Minute),
178                 },
179                 &S3Volume{
180                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
181                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
182                         Endpoint:       "https://storage.googleapis.com",
183                         Region:         "",
184                         Bucket:         "example-bucket-name",
185                         IndexPageSize:  1000,
186                         S3Replication:  2,
187                         RaceWindow:     arvados.Duration(24 * time.Hour),
188                         ConnectTimeout: arvados.Duration(time.Minute),
189                         ReadTimeout:    arvados.Duration(5 * time.Minute),
190                 },
191         }
192 }
193
194 // Type implements Volume.
195 func (*S3Volume) Type() string {
196         return "S3"
197 }
198
199 // Start populates private fields and verifies the configuration is
200 // valid.
201 func (v *S3Volume) Start() error {
202         region, ok := aws.Regions[v.Region]
203         if v.Endpoint == "" {
204                 if !ok {
205                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
206                 }
207         } else if ok {
208                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
209                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
210         } else {
211                 region = aws.Region{
212                         Name:                 v.Region,
213                         S3Endpoint:           v.Endpoint,
214                         S3LocationConstraint: v.LocationConstraint,
215                 }
216         }
217
218         var err error
219         var auth aws.Auth
220         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
221         if err != nil {
222                 return err
223         }
224         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
225         if err != nil {
226                 return err
227         }
228
229         // Zero timeouts mean "wait forever", which is a bad
230         // default. Default to long timeouts instead.
231         if v.ConnectTimeout == 0 {
232                 v.ConnectTimeout = s3DefaultConnectTimeout
233         }
234         if v.ReadTimeout == 0 {
235                 v.ReadTimeout = s3DefaultReadTimeout
236         }
237
238         client := s3.New(auth, region)
239         if region.EC2Endpoint.Signer == aws.V4Signature {
240                 // Currently affects only eu-central-1
241                 client.Signature = aws.V4Signature
242         }
243         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
244         client.ReadTimeout = time.Duration(v.ReadTimeout)
245         v.bucket = &s3bucket{
246                 Bucket: &s3.Bucket{
247                         S3:   client,
248                         Name: v.Bucket,
249                 },
250         }
251         return nil
252 }
253
254 // DeviceID returns a globally unique ID for the storage bucket.
255 func (v *S3Volume) DeviceID() string {
256         return "s3://" + v.Endpoint + "/" + v.Bucket
257 }
258
259 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
260         ready := make(chan bool)
261         go func() {
262                 rdr, err = v.getReader(loc)
263                 close(ready)
264         }()
265         select {
266         case <-ready:
267                 return
268         case <-ctx.Done():
269                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
270                 go func() {
271                         <-ready
272                         if err == nil {
273                                 rdr.Close()
274                         }
275                 }()
276                 return nil, ctx.Err()
277         }
278 }
279
280 // getReader wraps (Bucket)GetReader.
281 //
282 // In situations where (Bucket)GetReader would fail because the block
283 // disappeared in a Trash race, getReader calls fixRace to recover the
284 // data, and tries again.
285 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
286         rdr, err = v.bucket.GetReader(loc)
287         err = v.translateError(err)
288         if err == nil || !os.IsNotExist(err) {
289                 return
290         }
291
292         _, err = v.bucket.Head("recent/"+loc, nil)
293         err = v.translateError(err)
294         if err != nil {
295                 // If we can't read recent/X, there's no point in
296                 // trying fixRace. Give up.
297                 return
298         }
299         if !v.fixRace(loc) {
300                 err = os.ErrNotExist
301                 return
302         }
303
304         rdr, err = v.bucket.GetReader(loc)
305         if err != nil {
306                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
307                 err = v.translateError(err)
308         }
309         return
310 }
311
312 // Get a block: copy the block data into buf, and return the number of
313 // bytes copied.
314 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
315         rdr, err := v.getReaderWithContext(ctx, loc)
316         if err != nil {
317                 return 0, err
318         }
319
320         var n int
321         ready := make(chan bool)
322         go func() {
323                 defer close(ready)
324
325                 defer rdr.Close()
326                 n, err = io.ReadFull(rdr, buf)
327
328                 switch err {
329                 case nil, io.EOF, io.ErrUnexpectedEOF:
330                         err = nil
331                 default:
332                         err = v.translateError(err)
333                 }
334         }()
335         select {
336         case <-ctx.Done():
337                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
338                 rdr.Close()
339                 // Must wait for ReadFull to return, to ensure it
340                 // doesn't write to buf after we return.
341                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
342                 <-ready
343                 return 0, ctx.Err()
344         case <-ready:
345                 return n, err
346         }
347 }
348
349 // Compare the given data with the stored data.
350 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
351         errChan := make(chan error, 1)
352         go func() {
353                 _, err := v.bucket.Head("recent/"+loc, nil)
354                 errChan <- err
355         }()
356         var err error
357         select {
358         case <-ctx.Done():
359                 return ctx.Err()
360         case err = <-errChan:
361         }
362         if err != nil {
363                 // Checking for "loc" itself here would interfere with
364                 // future GET requests.
365                 //
366                 // On AWS, if X doesn't exist, a HEAD or GET request
367                 // for X causes X's non-existence to be cached. Thus,
368                 // if we test for X, then create X and return a
369                 // signature to our client, the client might still get
370                 // 404 from all keepstores when trying to read it.
371                 //
372                 // To avoid this, we avoid doing HEAD X or GET X until
373                 // we know X has been written.
374                 //
375                 // Note that X might exist even though recent/X
376                 // doesn't: for example, the response to HEAD recent/X
377                 // might itself come from a stale cache. In such
378                 // cases, we will return a false negative and
379                 // PutHandler might needlessly create another replica
380                 // on a different volume. That's not ideal, but it's
381                 // better than passing the eventually-consistent
382                 // problem on to our clients.
383                 return v.translateError(err)
384         }
385         rdr, err := v.getReaderWithContext(ctx, loc)
386         if err != nil {
387                 return err
388         }
389         defer rdr.Close()
390         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
391 }
392
393 // Put writes a block.
394 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
395         if v.ReadOnly {
396                 return MethodDisabledError
397         }
398         var opts s3.Options
399         size := len(block)
400         if size > 0 {
401                 md5, err := hex.DecodeString(loc)
402                 if err != nil {
403                         return err
404                 }
405                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
406                 // In AWS regions that use V4 signatures, we need to
407                 // provide ContentSHA256 up front. Otherwise, the S3
408                 // library reads the request body (from our buffer)
409                 // into another new buffer in order to compute the
410                 // SHA256 before sending the request -- which would
411                 // mean consuming 128 MiB of memory for the duration
412                 // of a 64 MiB write.
413                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
414         }
415
416         // Send the block data through a pipe, so that (if we need to)
417         // we can close the pipe early and abandon our PutReader()
418         // goroutine, without worrying about PutReader() accessing our
419         // block buffer after we release it.
420         bufr, bufw := io.Pipe()
421         go func() {
422                 io.Copy(bufw, bytes.NewReader(block))
423                 bufw.Close()
424         }()
425
426         var err error
427         ready := make(chan bool)
428         go func() {
429                 defer func() {
430                         if ctx.Err() != nil {
431                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
432                         }
433                 }()
434                 defer close(ready)
435                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
436                 if err != nil {
437                         return
438                 }
439                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
440         }()
441         select {
442         case <-ctx.Done():
443                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
444                 // Our pipe might be stuck in Write(), waiting for
445                 // PutReader() to read. If so, un-stick it. This means
446                 // PutReader will get corrupt data, but that's OK: the
447                 // size and MD5 won't match, so the write will fail.
448                 go io.Copy(ioutil.Discard, bufr)
449                 // CloseWithError() will return once pending I/O is done.
450                 bufw.CloseWithError(ctx.Err())
451                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
452                 return ctx.Err()
453         case <-ready:
454                 // Unblock pipe in case PutReader did not consume it.
455                 io.Copy(ioutil.Discard, bufr)
456                 return v.translateError(err)
457         }
458 }
459
460 // Touch sets the timestamp for the given locator to the current time.
461 func (v *S3Volume) Touch(loc string) error {
462         if v.ReadOnly {
463                 return MethodDisabledError
464         }
465         _, err := v.bucket.Head(loc, nil)
466         err = v.translateError(err)
467         if os.IsNotExist(err) && v.fixRace(loc) {
468                 // The data object got trashed in a race, but fixRace
469                 // rescued it.
470         } else if err != nil {
471                 return err
472         }
473         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
474         return v.translateError(err)
475 }
476
477 // Mtime returns the stored timestamp for the given locator.
478 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
479         _, err := v.bucket.Head(loc, nil)
480         if err != nil {
481                 return zeroTime, v.translateError(err)
482         }
483         resp, err := v.bucket.Head("recent/"+loc, nil)
484         err = v.translateError(err)
485         if os.IsNotExist(err) {
486                 // The data object X exists, but recent/X is missing.
487                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
488                 if err != nil {
489                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
490                         return zeroTime, v.translateError(err)
491                 }
492                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
493                 resp, err = v.bucket.Head("recent/"+loc, nil)
494                 if err != nil {
495                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
496                         return zeroTime, v.translateError(err)
497                 }
498         } else if err != nil {
499                 // HEAD recent/X failed for some other reason.
500                 return zeroTime, err
501         }
502         return v.lastModified(resp)
503 }
504
505 // IndexTo writes a complete list of locators with the given prefix
506 // for which Get() can retrieve data.
507 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
508         // Use a merge sort to find matching sets of X and recent/X.
509         dataL := s3Lister{
510                 Bucket:   v.bucket.Bucket,
511                 Prefix:   prefix,
512                 PageSize: v.IndexPageSize,
513                 Stats:    &v.bucket.stats,
514         }
515         recentL := s3Lister{
516                 Bucket:   v.bucket.Bucket,
517                 Prefix:   "recent/" + prefix,
518                 PageSize: v.IndexPageSize,
519                 Stats:    &v.bucket.stats,
520         }
521         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
522                 if data.Key >= "g" {
523                         // Conveniently, "recent/*" and "trash/*" are
524                         // lexically greater than all hex-encoded data
525                         // hashes, so stopping here avoids iterating
526                         // over all of them needlessly with dataL.
527                         break
528                 }
529                 if !v.isKeepBlock(data.Key) {
530                         continue
531                 }
532
533                 // stamp is the list entry we should use to report the
534                 // last-modified time for this data block: it will be
535                 // the recent/X entry if one exists, otherwise the
536                 // entry for the data block itself.
537                 stamp := data
538
539                 // Advance to the corresponding recent/X marker, if any
540                 for recent != nil && recentL.Error() == nil {
541                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
542                                 recent = recentL.Next()
543                                 continue
544                         } else if cmp == 0 {
545                                 stamp = recent
546                                 recent = recentL.Next()
547                                 break
548                         } else {
549                                 // recent/X marker is missing: we'll
550                                 // use the timestamp on the data
551                                 // object.
552                                 break
553                         }
554                 }
555                 if err := recentL.Error(); err != nil {
556                         return err
557                 }
558                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
559                 if err != nil {
560                         return err
561                 }
562                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
563         }
564         return dataL.Error()
565 }
566
567 // Trash a Keep block.
568 func (v *S3Volume) Trash(loc string) error {
569         if v.ReadOnly {
570                 return MethodDisabledError
571         }
572         if t, err := v.Mtime(loc); err != nil {
573                 return err
574         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
575                 return nil
576         }
577         if theConfig.TrashLifetime == 0 {
578                 if !s3UnsafeDelete {
579                         return ErrS3TrashDisabled
580                 }
581                 return v.translateError(v.bucket.Del(loc))
582         }
583         err := v.checkRaceWindow(loc)
584         if err != nil {
585                 return err
586         }
587         err = v.safeCopy("trash/"+loc, loc)
588         if err != nil {
589                 return err
590         }
591         return v.translateError(v.bucket.Del(loc))
592 }
593
594 // checkRaceWindow returns a non-nil error if trash/loc is, or might
595 // be, in the race window (i.e., it's not safe to trash loc).
596 func (v *S3Volume) checkRaceWindow(loc string) error {
597         resp, err := v.bucket.Head("trash/"+loc, nil)
598         err = v.translateError(err)
599         if os.IsNotExist(err) {
600                 // OK, trash/X doesn't exist so we're not in the race
601                 // window
602                 return nil
603         } else if err != nil {
604                 // Error looking up trash/X. We don't know whether
605                 // we're in the race window
606                 return err
607         }
608         t, err := v.lastModified(resp)
609         if err != nil {
610                 // Can't parse timestamp
611                 return err
612         }
613         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
614         if safeWindow <= 0 {
615                 // We can't count on "touch trash/X" to prolong
616                 // trash/X's lifetime. The new timestamp might not
617                 // become visible until now+raceWindow, and EmptyTrash
618                 // is allowed to delete trash/X before then.
619                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
620         }
621         // trash/X exists, but it won't be eligible for deletion until
622         // after now+raceWindow, so it's safe to overwrite it.
623         return nil
624 }
625
626 // safeCopy calls PutCopy, and checks the response to make sure the
627 // copy succeeded and updated the timestamp on the destination object
628 // (PutCopy returns 200 OK if the request was received, even if the
629 // copy failed).
630 func (v *S3Volume) safeCopy(dst, src string) error {
631         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
632                 ContentType:       "application/octet-stream",
633                 MetadataDirective: "REPLACE",
634         }, v.bucket.Name+"/"+src)
635         err = v.translateError(err)
636         if os.IsNotExist(err) {
637                 return err
638         } else if err != nil {
639                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
640         }
641         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
642                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
643         } else if time.Now().Sub(t) > maxClockSkew {
644                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
645         }
646         return nil
647 }
648
649 // Get the LastModified header from resp, and parse it as RFC1123 or
650 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
651 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
652         s := resp.Header.Get("Last-Modified")
653         t, err = time.Parse(time.RFC1123, s)
654         if err != nil && s != "" {
655                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
656                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
657                 // as required by HTTP spec. If it's not a valid HTTP
658                 // header value, it's probably AWS (or s3test) giving
659                 // us a nearly-RFC1123 timestamp.
660                 t, err = time.Parse(nearlyRFC1123, s)
661         }
662         return
663 }
664
665 // Untrash moves block from trash back into store
666 func (v *S3Volume) Untrash(loc string) error {
667         err := v.safeCopy(loc, "trash/"+loc)
668         if err != nil {
669                 return err
670         }
671         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
672         return v.translateError(err)
673 }
674
675 // Status returns a *VolumeStatus representing the current in-use
676 // storage capacity and a fake available capacity that doesn't make
677 // the volume seem full or nearly-full.
678 func (v *S3Volume) Status() *VolumeStatus {
679         return &VolumeStatus{
680                 DeviceNum: 1,
681                 BytesFree: BlockSize * 1000,
682                 BytesUsed: 1,
683         }
684 }
685
686 // InternalStats returns bucket I/O and API call counters.
687 func (v *S3Volume) InternalStats() interface{} {
688         return &v.bucket.stats
689 }
690
691 // String implements fmt.Stringer.
692 func (v *S3Volume) String() string {
693         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
694 }
695
696 // Writable returns false if all future Put, Mtime, and Delete calls
697 // are expected to fail.
698 func (v *S3Volume) Writable() bool {
699         return !v.ReadOnly
700 }
701
702 // Replication returns the storage redundancy of the underlying
703 // device. Configured via command line flag.
704 func (v *S3Volume) Replication() int {
705         return v.S3Replication
706 }
707
708 // GetStorageClasses implements Volume
709 func (v *S3Volume) GetStorageClasses() []string {
710         return v.StorageClasses
711 }
712
713 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
714
715 func (v *S3Volume) isKeepBlock(s string) bool {
716         return s3KeepBlockRegexp.MatchString(s)
717 }
718
719 // fixRace(X) is called when "recent/X" exists but "X" doesn't
720 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
721 // there was a race between Put and Trash, fixRace recovers from the
722 // race by Untrashing the block.
723 func (v *S3Volume) fixRace(loc string) bool {
724         trash, err := v.bucket.Head("trash/"+loc, nil)
725         if err != nil {
726                 if !os.IsNotExist(v.translateError(err)) {
727                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
728                 }
729                 return false
730         }
731         trashTime, err := v.lastModified(trash)
732         if err != nil {
733                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
734                 return false
735         }
736
737         recent, err := v.bucket.Head("recent/"+loc, nil)
738         if err != nil {
739                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
740                 return false
741         }
742         recentTime, err := v.lastModified(recent)
743         if err != nil {
744                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
745                 return false
746         }
747
748         ageWhenTrashed := trashTime.Sub(recentTime)
749         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
750                 // No evidence of a race: block hasn't been written
751                 // since it became eligible for Trash. No fix needed.
752                 return false
753         }
754
755         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
756         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
757         err = v.safeCopy(loc, "trash/"+loc)
758         if err != nil {
759                 log.Printf("error: fixRace: %s", err)
760                 return false
761         }
762         return true
763 }
764
765 func (v *S3Volume) translateError(err error) error {
766         switch err := err.(type) {
767         case *s3.Error:
768                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
769                         strings.Contains(err.Error(), "Not Found") {
770                         return os.ErrNotExist
771                 }
772                 // Other 404 errors like NoSuchVersion and
773                 // NoSuchBucket are different problems which should
774                 // get called out downstream, so we don't convert them
775                 // to os.ErrNotExist.
776         }
777         return err
778 }
779
780 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
781 // and deletes them from the volume.
782 func (v *S3Volume) EmptyTrash() {
783         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
784
785         // Define "ready to delete" as "...when EmptyTrash started".
786         startT := time.Now()
787
788         emptyOneKey := func(trash *s3.Key) {
789                 loc := trash.Key[6:]
790                 if !v.isKeepBlock(loc) {
791                         return
792                 }
793                 atomic.AddInt64(&bytesInTrash, trash.Size)
794                 atomic.AddInt64(&blocksInTrash, 1)
795
796                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
797                 if err != nil {
798                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
799                         return
800                 }
801                 recent, err := v.bucket.Head("recent/"+loc, nil)
802                 if err != nil && os.IsNotExist(v.translateError(err)) {
803                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
804                         err = v.Untrash(loc)
805                         if err != nil {
806                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
807                         }
808                         return
809                 } else if err != nil {
810                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
811                         return
812                 }
813                 recentT, err := v.lastModified(recent)
814                 if err != nil {
815                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
816                         return
817                 }
818                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
819                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
820                                 // recent/loc is too old to protect
821                                 // loc from being Trashed again during
822                                 // the raceWindow that starts if we
823                                 // delete trash/X now.
824                                 //
825                                 // Note this means (TrashCheckInterval
826                                 // < BlobSignatureTTL - raceWindow) is
827                                 // necessary to avoid starvation.
828                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
829                                 v.fixRace(loc)
830                                 v.Touch(loc)
831                                 return
832                         }
833                         _, err := v.bucket.Head(loc, nil)
834                         if os.IsNotExist(err) {
835                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
836                                 v.fixRace(loc)
837                                 return
838                         } else if err != nil {
839                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
840                                 return
841                         }
842                 }
843                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
844                         return
845                 }
846                 err = v.bucket.Del(trash.Key)
847                 if err != nil {
848                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
849                         return
850                 }
851                 atomic.AddInt64(&bytesDeleted, trash.Size)
852                 atomic.AddInt64(&blocksDeleted, 1)
853
854                 _, err = v.bucket.Head(loc, nil)
855                 if err == nil {
856                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
857                         return
858                 }
859                 if !os.IsNotExist(v.translateError(err)) {
860                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
861                         return
862                 }
863                 err = v.bucket.Del("recent/" + loc)
864                 if err != nil {
865                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
866                 }
867         }
868
869         var wg sync.WaitGroup
870         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
871         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
872                 wg.Add(1)
873                 go func() {
874                         defer wg.Done()
875                         for key := range todo {
876                                 emptyOneKey(key)
877                         }
878                 }()
879         }
880
881         trashL := s3Lister{
882                 Bucket:   v.bucket.Bucket,
883                 Prefix:   "trash/",
884                 PageSize: v.IndexPageSize,
885                 Stats:    &v.bucket.stats,
886         }
887         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
888                 todo <- trash
889         }
890         close(todo)
891         wg.Wait()
892
893         if err := trashL.Error(); err != nil {
894                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
895         }
896         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
897 }
898
899 type s3Lister struct {
900         Bucket     *s3.Bucket
901         Prefix     string
902         PageSize   int
903         Stats      *s3bucketStats
904         nextMarker string
905         buf        []s3.Key
906         err        error
907 }
908
909 // First fetches the first page and returns the first item. It returns
910 // nil if the response is the empty set or an error occurs.
911 func (lister *s3Lister) First() *s3.Key {
912         lister.getPage()
913         return lister.pop()
914 }
915
916 // Next returns the next item, fetching the next page if necessary. It
917 // returns nil if the last available item has already been fetched, or
918 // an error occurs.
919 func (lister *s3Lister) Next() *s3.Key {
920         if len(lister.buf) == 0 && lister.nextMarker != "" {
921                 lister.getPage()
922         }
923         return lister.pop()
924 }
925
926 // Return the most recent error encountered by First or Next.
927 func (lister *s3Lister) Error() error {
928         return lister.err
929 }
930
931 func (lister *s3Lister) getPage() {
932         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
933         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
934         lister.nextMarker = ""
935         if err != nil {
936                 lister.err = err
937                 return
938         }
939         if resp.IsTruncated {
940                 lister.nextMarker = resp.NextMarker
941         }
942         lister.buf = make([]s3.Key, 0, len(resp.Contents))
943         for _, key := range resp.Contents {
944                 if !strings.HasPrefix(key.Key, lister.Prefix) {
945                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
946                         continue
947                 }
948                 lister.buf = append(lister.buf, key)
949         }
950 }
951
952 func (lister *s3Lister) pop() (k *s3.Key) {
953         if len(lister.buf) > 0 {
954                 k = &lister.buf[0]
955                 lister.buf = lister.buf[1:]
956         }
957         return
958 }
959
960 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
961 type s3bucket struct {
962         *s3.Bucket
963         stats s3bucketStats
964 }
965
966 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
967         rdr, err := b.Bucket.GetReader(path)
968         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
969         b.stats.TickErr(err)
970         return NewCountingReader(rdr, b.stats.TickInBytes), err
971 }
972
973 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
974         resp, err := b.Bucket.Head(path, headers)
975         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
976         b.stats.TickErr(err)
977         return resp, err
978 }
979
980 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
981         if length == 0 {
982                 // goamz will only send Content-Length: 0 when reader
983                 // is nil due to net.http.Request.ContentLength
984                 // behavior.  Otherwise, Content-Length header is
985                 // omitted which will cause some S3 services
986                 // (including AWS and Ceph RadosGW) to fail to create
987                 // empty objects.
988                 r = nil
989         } else {
990                 r = NewCountingReader(r, b.stats.TickOutBytes)
991         }
992         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
993         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
994         b.stats.TickErr(err)
995         return err
996 }
997
998 func (b *s3bucket) Del(path string) error {
999         err := b.Bucket.Del(path)
1000         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1001         b.stats.TickErr(err)
1002         return err
1003 }
1004
1005 type s3bucketStats struct {
1006         statsTicker
1007         Ops     uint64
1008         GetOps  uint64
1009         PutOps  uint64
1010         HeadOps uint64
1011         DelOps  uint64
1012         ListOps uint64
1013 }
1014
1015 func (s *s3bucketStats) TickErr(err error) {
1016         if err == nil {
1017                 return
1018         }
1019         errType := fmt.Sprintf("%T", err)
1020         if err, ok := err.(*s3.Error); ok {
1021                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1022         }
1023         s.statsTicker.TickErr(err, errType)
1024 }