13937: Simplified volume specific metric handling (WIP)
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/sha256"
11         "encoding/base64"
12         "encoding/hex"
13         "flag"
14         "fmt"
15         "io"
16         "io/ioutil"
17         "net/http"
18         "os"
19         "regexp"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         "git.curoverse.com/arvados.git/sdk/go/arvados"
26         "github.com/AdRoll/goamz/aws"
27         "github.com/AdRoll/goamz/s3"
28         "github.com/prometheus/client_golang/prometheus"
29 )
30
31 const (
32         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
33         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
34 )
35
36 var (
37         // ErrS3TrashDisabled is returned by Trash if that operation
38         // is impossible with the current config.
39         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
40
41         s3AccessKeyFile string
42         s3SecretKeyFile string
43         s3RegionName    string
44         s3Endpoint      string
45         s3Replication   int
46         s3UnsafeDelete  bool
47         s3RaceWindow    time.Duration
48
49         s3ACL = s3.Private
50
51         zeroTime time.Time
52 )
53
54 const (
55         maxClockSkew  = 600 * time.Second
56         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
57 )
58
59 type s3VolumeAdder struct {
60         *Config
61 }
62
63 // String implements flag.Value
64 func (s *s3VolumeAdder) String() string {
65         return "-"
66 }
67
68 func (s *s3VolumeAdder) Set(bucketName string) error {
69         if bucketName == "" {
70                 return fmt.Errorf("no container name given")
71         }
72         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
73                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
74         }
75         if deprecated.flagSerializeIO {
76                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
77         }
78         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
79                 Bucket:        bucketName,
80                 AccessKeyFile: s3AccessKeyFile,
81                 SecretKeyFile: s3SecretKeyFile,
82                 Endpoint:      s3Endpoint,
83                 Region:        s3RegionName,
84                 RaceWindow:    arvados.Duration(s3RaceWindow),
85                 S3Replication: s3Replication,
86                 UnsafeDelete:  s3UnsafeDelete,
87                 ReadOnly:      deprecated.flagReadonly,
88                 IndexPageSize: 1000,
89         })
90         return nil
91 }
92
93 func s3regions() (okList []string) {
94         for r := range aws.Regions {
95                 okList = append(okList, r)
96         }
97         return
98 }
99
100 func init() {
101         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
102
103         flag.Var(&s3VolumeAdder{theConfig},
104                 "s3-bucket-volume",
105                 "Use the given bucket as a storage volume. Can be given multiple times.")
106         flag.StringVar(
107                 &s3RegionName,
108                 "s3-region",
109                 "",
110                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
111         flag.StringVar(
112                 &s3Endpoint,
113                 "s3-endpoint",
114                 "",
115                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
116         flag.StringVar(
117                 &s3AccessKeyFile,
118                 "s3-access-key-file",
119                 "",
120                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
121         flag.StringVar(
122                 &s3SecretKeyFile,
123                 "s3-secret-key-file",
124                 "",
125                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
126         flag.DurationVar(
127                 &s3RaceWindow,
128                 "s3-race-window",
129                 24*time.Hour,
130                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
131         flag.IntVar(
132                 &s3Replication,
133                 "s3-replication",
134                 2,
135                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
136         flag.BoolVar(
137                 &s3UnsafeDelete,
138                 "s3-unsafe-delete",
139                 false,
140                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
141 }
142
143 // S3Volume implements Volume using an S3 bucket.
144 type S3Volume struct {
145         AccessKeyFile      string
146         SecretKeyFile      string
147         Endpoint           string
148         Region             string
149         Bucket             string
150         LocationConstraint bool
151         IndexPageSize      int
152         S3Replication      int
153         ConnectTimeout     arvados.Duration
154         ReadTimeout        arvados.Duration
155         RaceWindow         arvados.Duration
156         ReadOnly           bool
157         UnsafeDelete       bool
158         StorageClasses     []string
159
160         bucket *s3bucket
161
162         startOnce sync.Once
163 }
164
165 // Examples implements VolumeWithExamples.
166 func (*S3Volume) Examples() []Volume {
167         return []Volume{
168                 &S3Volume{
169                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
170                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
171                         Endpoint:       "",
172                         Region:         "us-east-1",
173                         Bucket:         "example-bucket-name",
174                         IndexPageSize:  1000,
175                         S3Replication:  2,
176                         RaceWindow:     arvados.Duration(24 * time.Hour),
177                         ConnectTimeout: arvados.Duration(time.Minute),
178                         ReadTimeout:    arvados.Duration(5 * time.Minute),
179                 },
180                 &S3Volume{
181                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
182                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
183                         Endpoint:       "https://storage.googleapis.com",
184                         Region:         "",
185                         Bucket:         "example-bucket-name",
186                         IndexPageSize:  1000,
187                         S3Replication:  2,
188                         RaceWindow:     arvados.Duration(24 * time.Hour),
189                         ConnectTimeout: arvados.Duration(time.Minute),
190                         ReadTimeout:    arvados.Duration(5 * time.Minute),
191                 },
192         }
193 }
194
195 // Type implements Volume.
196 func (*S3Volume) Type() string {
197         return "S3"
198 }
199
200 // Start populates private fields and verifies the configuration is
201 // valid.
202 func (v *S3Volume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
203         region, ok := aws.Regions[v.Region]
204         if v.Endpoint == "" {
205                 if !ok {
206                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
207                 }
208         } else if ok {
209                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
210                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
211         } else {
212                 region = aws.Region{
213                         Name:                 v.Region,
214                         S3Endpoint:           v.Endpoint,
215                         S3LocationConstraint: v.LocationConstraint,
216                 }
217         }
218
219         var err error
220         var auth aws.Auth
221         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
222         if err != nil {
223                 return err
224         }
225         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
226         if err != nil {
227                 return err
228         }
229
230         // Zero timeouts mean "wait forever", which is a bad
231         // default. Default to long timeouts instead.
232         if v.ConnectTimeout == 0 {
233                 v.ConnectTimeout = s3DefaultConnectTimeout
234         }
235         if v.ReadTimeout == 0 {
236                 v.ReadTimeout = s3DefaultReadTimeout
237         }
238
239         client := s3.New(auth, region)
240         if region.EC2Endpoint.Signer == aws.V4Signature {
241                 // Currently affects only eu-central-1
242                 client.Signature = aws.V4Signature
243         }
244         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
245         client.ReadTimeout = time.Duration(v.ReadTimeout)
246         v.bucket = &s3bucket{
247                 Bucket: &s3.Bucket{
248                         S3:   client,
249                         Name: v.Bucket,
250                 },
251         }
252         return nil
253 }
254
255 // DeviceID returns a globally unique ID for the storage bucket.
256 func (v *S3Volume) DeviceID() string {
257         return "s3://" + v.Endpoint + "/" + v.Bucket
258 }
259
260 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
261         ready := make(chan bool)
262         go func() {
263                 rdr, err = v.getReader(loc)
264                 close(ready)
265         }()
266         select {
267         case <-ready:
268                 return
269         case <-ctx.Done():
270                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
271                 go func() {
272                         <-ready
273                         if err == nil {
274                                 rdr.Close()
275                         }
276                 }()
277                 return nil, ctx.Err()
278         }
279 }
280
281 // getReader wraps (Bucket)GetReader.
282 //
283 // In situations where (Bucket)GetReader would fail because the block
284 // disappeared in a Trash race, getReader calls fixRace to recover the
285 // data, and tries again.
286 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
287         rdr, err = v.bucket.GetReader(loc)
288         err = v.translateError(err)
289         if err == nil || !os.IsNotExist(err) {
290                 return
291         }
292
293         _, err = v.bucket.Head("recent/"+loc, nil)
294         err = v.translateError(err)
295         if err != nil {
296                 // If we can't read recent/X, there's no point in
297                 // trying fixRace. Give up.
298                 return
299         }
300         if !v.fixRace(loc) {
301                 err = os.ErrNotExist
302                 return
303         }
304
305         rdr, err = v.bucket.GetReader(loc)
306         if err != nil {
307                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
308                 err = v.translateError(err)
309         }
310         return
311 }
312
313 // Get a block: copy the block data into buf, and return the number of
314 // bytes copied.
315 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
316         rdr, err := v.getReaderWithContext(ctx, loc)
317         if err != nil {
318                 return 0, err
319         }
320
321         var n int
322         ready := make(chan bool)
323         go func() {
324                 defer close(ready)
325
326                 defer rdr.Close()
327                 n, err = io.ReadFull(rdr, buf)
328
329                 switch err {
330                 case nil, io.EOF, io.ErrUnexpectedEOF:
331                         err = nil
332                 default:
333                         err = v.translateError(err)
334                 }
335         }()
336         select {
337         case <-ctx.Done():
338                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
339                 rdr.Close()
340                 // Must wait for ReadFull to return, to ensure it
341                 // doesn't write to buf after we return.
342                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
343                 <-ready
344                 return 0, ctx.Err()
345         case <-ready:
346                 return n, err
347         }
348 }
349
350 // Compare the given data with the stored data.
351 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
352         errChan := make(chan error, 1)
353         go func() {
354                 _, err := v.bucket.Head("recent/"+loc, nil)
355                 errChan <- err
356         }()
357         var err error
358         select {
359         case <-ctx.Done():
360                 return ctx.Err()
361         case err = <-errChan:
362         }
363         if err != nil {
364                 // Checking for "loc" itself here would interfere with
365                 // future GET requests.
366                 //
367                 // On AWS, if X doesn't exist, a HEAD or GET request
368                 // for X causes X's non-existence to be cached. Thus,
369                 // if we test for X, then create X and return a
370                 // signature to our client, the client might still get
371                 // 404 from all keepstores when trying to read it.
372                 //
373                 // To avoid this, we avoid doing HEAD X or GET X until
374                 // we know X has been written.
375                 //
376                 // Note that X might exist even though recent/X
377                 // doesn't: for example, the response to HEAD recent/X
378                 // might itself come from a stale cache. In such
379                 // cases, we will return a false negative and
380                 // PutHandler might needlessly create another replica
381                 // on a different volume. That's not ideal, but it's
382                 // better than passing the eventually-consistent
383                 // problem on to our clients.
384                 return v.translateError(err)
385         }
386         rdr, err := v.getReaderWithContext(ctx, loc)
387         if err != nil {
388                 return err
389         }
390         defer rdr.Close()
391         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
392 }
393
394 // Put writes a block.
395 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
396         if v.ReadOnly {
397                 return MethodDisabledError
398         }
399         var opts s3.Options
400         size := len(block)
401         if size > 0 {
402                 md5, err := hex.DecodeString(loc)
403                 if err != nil {
404                         return err
405                 }
406                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
407                 // In AWS regions that use V4 signatures, we need to
408                 // provide ContentSHA256 up front. Otherwise, the S3
409                 // library reads the request body (from our buffer)
410                 // into another new buffer in order to compute the
411                 // SHA256 before sending the request -- which would
412                 // mean consuming 128 MiB of memory for the duration
413                 // of a 64 MiB write.
414                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
415         }
416
417         // Send the block data through a pipe, so that (if we need to)
418         // we can close the pipe early and abandon our PutReader()
419         // goroutine, without worrying about PutReader() accessing our
420         // block buffer after we release it.
421         bufr, bufw := io.Pipe()
422         go func() {
423                 io.Copy(bufw, bytes.NewReader(block))
424                 bufw.Close()
425         }()
426
427         var err error
428         ready := make(chan bool)
429         go func() {
430                 defer func() {
431                         if ctx.Err() != nil {
432                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
433                         }
434                 }()
435                 defer close(ready)
436                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
437                 if err != nil {
438                         return
439                 }
440                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
441         }()
442         select {
443         case <-ctx.Done():
444                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
445                 // Our pipe might be stuck in Write(), waiting for
446                 // PutReader() to read. If so, un-stick it. This means
447                 // PutReader will get corrupt data, but that's OK: the
448                 // size and MD5 won't match, so the write will fail.
449                 go io.Copy(ioutil.Discard, bufr)
450                 // CloseWithError() will return once pending I/O is done.
451                 bufw.CloseWithError(ctx.Err())
452                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
453                 return ctx.Err()
454         case <-ready:
455                 // Unblock pipe in case PutReader did not consume it.
456                 io.Copy(ioutil.Discard, bufr)
457                 return v.translateError(err)
458         }
459 }
460
461 // Touch sets the timestamp for the given locator to the current time.
462 func (v *S3Volume) Touch(loc string) error {
463         if v.ReadOnly {
464                 return MethodDisabledError
465         }
466         _, err := v.bucket.Head(loc, nil)
467         err = v.translateError(err)
468         if os.IsNotExist(err) && v.fixRace(loc) {
469                 // The data object got trashed in a race, but fixRace
470                 // rescued it.
471         } else if err != nil {
472                 return err
473         }
474         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
475         return v.translateError(err)
476 }
477
478 // Mtime returns the stored timestamp for the given locator.
479 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
480         _, err := v.bucket.Head(loc, nil)
481         if err != nil {
482                 return zeroTime, v.translateError(err)
483         }
484         resp, err := v.bucket.Head("recent/"+loc, nil)
485         err = v.translateError(err)
486         if os.IsNotExist(err) {
487                 // The data object X exists, but recent/X is missing.
488                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
489                 if err != nil {
490                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
491                         return zeroTime, v.translateError(err)
492                 }
493                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
494                 resp, err = v.bucket.Head("recent/"+loc, nil)
495                 if err != nil {
496                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
497                         return zeroTime, v.translateError(err)
498                 }
499         } else if err != nil {
500                 // HEAD recent/X failed for some other reason.
501                 return zeroTime, err
502         }
503         return v.lastModified(resp)
504 }
505
506 // IndexTo writes a complete list of locators with the given prefix
507 // for which Get() can retrieve data.
508 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
509         // Use a merge sort to find matching sets of X and recent/X.
510         dataL := s3Lister{
511                 Bucket:   v.bucket.Bucket,
512                 Prefix:   prefix,
513                 PageSize: v.IndexPageSize,
514                 Stats:    &v.bucket.stats,
515         }
516         recentL := s3Lister{
517                 Bucket:   v.bucket.Bucket,
518                 Prefix:   "recent/" + prefix,
519                 PageSize: v.IndexPageSize,
520                 Stats:    &v.bucket.stats,
521         }
522         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
523                 if data.Key >= "g" {
524                         // Conveniently, "recent/*" and "trash/*" are
525                         // lexically greater than all hex-encoded data
526                         // hashes, so stopping here avoids iterating
527                         // over all of them needlessly with dataL.
528                         break
529                 }
530                 if !v.isKeepBlock(data.Key) {
531                         continue
532                 }
533
534                 // stamp is the list entry we should use to report the
535                 // last-modified time for this data block: it will be
536                 // the recent/X entry if one exists, otherwise the
537                 // entry for the data block itself.
538                 stamp := data
539
540                 // Advance to the corresponding recent/X marker, if any
541                 for recent != nil && recentL.Error() == nil {
542                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
543                                 recent = recentL.Next()
544                                 continue
545                         } else if cmp == 0 {
546                                 stamp = recent
547                                 recent = recentL.Next()
548                                 break
549                         } else {
550                                 // recent/X marker is missing: we'll
551                                 // use the timestamp on the data
552                                 // object.
553                                 break
554                         }
555                 }
556                 if err := recentL.Error(); err != nil {
557                         return err
558                 }
559                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
560                 if err != nil {
561                         return err
562                 }
563                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
564         }
565         return dataL.Error()
566 }
567
568 // Trash a Keep block.
569 func (v *S3Volume) Trash(loc string) error {
570         if v.ReadOnly {
571                 return MethodDisabledError
572         }
573         if t, err := v.Mtime(loc); err != nil {
574                 return err
575         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
576                 return nil
577         }
578         if theConfig.TrashLifetime == 0 {
579                 if !s3UnsafeDelete {
580                         return ErrS3TrashDisabled
581                 }
582                 return v.translateError(v.bucket.Del(loc))
583         }
584         err := v.checkRaceWindow(loc)
585         if err != nil {
586                 return err
587         }
588         err = v.safeCopy("trash/"+loc, loc)
589         if err != nil {
590                 return err
591         }
592         return v.translateError(v.bucket.Del(loc))
593 }
594
595 // checkRaceWindow returns a non-nil error if trash/loc is, or might
596 // be, in the race window (i.e., it's not safe to trash loc).
597 func (v *S3Volume) checkRaceWindow(loc string) error {
598         resp, err := v.bucket.Head("trash/"+loc, nil)
599         err = v.translateError(err)
600         if os.IsNotExist(err) {
601                 // OK, trash/X doesn't exist so we're not in the race
602                 // window
603                 return nil
604         } else if err != nil {
605                 // Error looking up trash/X. We don't know whether
606                 // we're in the race window
607                 return err
608         }
609         t, err := v.lastModified(resp)
610         if err != nil {
611                 // Can't parse timestamp
612                 return err
613         }
614         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
615         if safeWindow <= 0 {
616                 // We can't count on "touch trash/X" to prolong
617                 // trash/X's lifetime. The new timestamp might not
618                 // become visible until now+raceWindow, and EmptyTrash
619                 // is allowed to delete trash/X before then.
620                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
621         }
622         // trash/X exists, but it won't be eligible for deletion until
623         // after now+raceWindow, so it's safe to overwrite it.
624         return nil
625 }
626
627 // safeCopy calls PutCopy, and checks the response to make sure the
628 // copy succeeded and updated the timestamp on the destination object
629 // (PutCopy returns 200 OK if the request was received, even if the
630 // copy failed).
631 func (v *S3Volume) safeCopy(dst, src string) error {
632         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
633                 ContentType:       "application/octet-stream",
634                 MetadataDirective: "REPLACE",
635         }, v.bucket.Name+"/"+src)
636         err = v.translateError(err)
637         if os.IsNotExist(err) {
638                 return err
639         } else if err != nil {
640                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
641         }
642         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
643                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
644         } else if time.Now().Sub(t) > maxClockSkew {
645                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
646         }
647         return nil
648 }
649
650 // Get the LastModified header from resp, and parse it as RFC1123 or
651 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
652 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
653         s := resp.Header.Get("Last-Modified")
654         t, err = time.Parse(time.RFC1123, s)
655         if err != nil && s != "" {
656                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
657                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
658                 // as required by HTTP spec. If it's not a valid HTTP
659                 // header value, it's probably AWS (or s3test) giving
660                 // us a nearly-RFC1123 timestamp.
661                 t, err = time.Parse(nearlyRFC1123, s)
662         }
663         return
664 }
665
666 // Untrash moves block from trash back into store
667 func (v *S3Volume) Untrash(loc string) error {
668         err := v.safeCopy(loc, "trash/"+loc)
669         if err != nil {
670                 return err
671         }
672         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
673         return v.translateError(err)
674 }
675
676 // Status returns a *VolumeStatus representing the current in-use
677 // storage capacity and a fake available capacity that doesn't make
678 // the volume seem full or nearly-full.
679 func (v *S3Volume) Status() *VolumeStatus {
680         return &VolumeStatus{
681                 DeviceNum: 1,
682                 BytesFree: BlockSize * 1000,
683                 BytesUsed: 1,
684         }
685 }
686
687 // InternalStats returns bucket I/O and API call counters.
688 func (v *S3Volume) InternalStats() interface{} {
689         return &v.bucket.stats
690 }
691
692 // String implements fmt.Stringer.
693 func (v *S3Volume) String() string {
694         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
695 }
696
697 // Writable returns false if all future Put, Mtime, and Delete calls
698 // are expected to fail.
699 func (v *S3Volume) Writable() bool {
700         return !v.ReadOnly
701 }
702
703 // Replication returns the storage redundancy of the underlying
704 // device. Configured via command line flag.
705 func (v *S3Volume) Replication() int {
706         return v.S3Replication
707 }
708
709 // GetStorageClasses implements Volume
710 func (v *S3Volume) GetStorageClasses() []string {
711         return v.StorageClasses
712 }
713
714 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
715
716 func (v *S3Volume) isKeepBlock(s string) bool {
717         return s3KeepBlockRegexp.MatchString(s)
718 }
719
720 // fixRace(X) is called when "recent/X" exists but "X" doesn't
721 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
722 // there was a race between Put and Trash, fixRace recovers from the
723 // race by Untrashing the block.
724 func (v *S3Volume) fixRace(loc string) bool {
725         trash, err := v.bucket.Head("trash/"+loc, nil)
726         if err != nil {
727                 if !os.IsNotExist(v.translateError(err)) {
728                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
729                 }
730                 return false
731         }
732         trashTime, err := v.lastModified(trash)
733         if err != nil {
734                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
735                 return false
736         }
737
738         recent, err := v.bucket.Head("recent/"+loc, nil)
739         if err != nil {
740                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
741                 return false
742         }
743         recentTime, err := v.lastModified(recent)
744         if err != nil {
745                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
746                 return false
747         }
748
749         ageWhenTrashed := trashTime.Sub(recentTime)
750         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
751                 // No evidence of a race: block hasn't been written
752                 // since it became eligible for Trash. No fix needed.
753                 return false
754         }
755
756         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
757         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
758         err = v.safeCopy(loc, "trash/"+loc)
759         if err != nil {
760                 log.Printf("error: fixRace: %s", err)
761                 return false
762         }
763         return true
764 }
765
766 func (v *S3Volume) translateError(err error) error {
767         switch err := err.(type) {
768         case *s3.Error:
769                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
770                         strings.Contains(err.Error(), "Not Found") {
771                         return os.ErrNotExist
772                 }
773                 // Other 404 errors like NoSuchVersion and
774                 // NoSuchBucket are different problems which should
775                 // get called out downstream, so we don't convert them
776                 // to os.ErrNotExist.
777         }
778         return err
779 }
780
781 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
782 // and deletes them from the volume.
783 func (v *S3Volume) EmptyTrash() {
784         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
785
786         // Define "ready to delete" as "...when EmptyTrash started".
787         startT := time.Now()
788
789         emptyOneKey := func(trash *s3.Key) {
790                 loc := trash.Key[6:]
791                 if !v.isKeepBlock(loc) {
792                         return
793                 }
794                 atomic.AddInt64(&bytesInTrash, trash.Size)
795                 atomic.AddInt64(&blocksInTrash, 1)
796
797                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
798                 if err != nil {
799                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
800                         return
801                 }
802                 recent, err := v.bucket.Head("recent/"+loc, nil)
803                 if err != nil && os.IsNotExist(v.translateError(err)) {
804                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
805                         err = v.Untrash(loc)
806                         if err != nil {
807                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
808                         }
809                         return
810                 } else if err != nil {
811                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
812                         return
813                 }
814                 recentT, err := v.lastModified(recent)
815                 if err != nil {
816                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
817                         return
818                 }
819                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
820                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
821                                 // recent/loc is too old to protect
822                                 // loc from being Trashed again during
823                                 // the raceWindow that starts if we
824                                 // delete trash/X now.
825                                 //
826                                 // Note this means (TrashCheckInterval
827                                 // < BlobSignatureTTL - raceWindow) is
828                                 // necessary to avoid starvation.
829                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
830                                 v.fixRace(loc)
831                                 v.Touch(loc)
832                                 return
833                         }
834                         _, err := v.bucket.Head(loc, nil)
835                         if os.IsNotExist(err) {
836                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
837                                 v.fixRace(loc)
838                                 return
839                         } else if err != nil {
840                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
841                                 return
842                         }
843                 }
844                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
845                         return
846                 }
847                 err = v.bucket.Del(trash.Key)
848                 if err != nil {
849                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
850                         return
851                 }
852                 atomic.AddInt64(&bytesDeleted, trash.Size)
853                 atomic.AddInt64(&blocksDeleted, 1)
854
855                 _, err = v.bucket.Head(loc, nil)
856                 if err == nil {
857                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
858                         return
859                 }
860                 if !os.IsNotExist(v.translateError(err)) {
861                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
862                         return
863                 }
864                 err = v.bucket.Del("recent/" + loc)
865                 if err != nil {
866                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
867                 }
868         }
869
870         var wg sync.WaitGroup
871         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
872         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
873                 wg.Add(1)
874                 go func() {
875                         defer wg.Done()
876                         for key := range todo {
877                                 emptyOneKey(key)
878                         }
879                 }()
880         }
881
882         trashL := s3Lister{
883                 Bucket:   v.bucket.Bucket,
884                 Prefix:   "trash/",
885                 PageSize: v.IndexPageSize,
886                 Stats:    &v.bucket.stats,
887         }
888         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
889                 todo <- trash
890         }
891         close(todo)
892         wg.Wait()
893
894         if err := trashL.Error(); err != nil {
895                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
896         }
897         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
898 }
899
900 type s3Lister struct {
901         Bucket     *s3.Bucket
902         Prefix     string
903         PageSize   int
904         Stats      *s3bucketStats
905         nextMarker string
906         buf        []s3.Key
907         err        error
908 }
909
910 // First fetches the first page and returns the first item. It returns
911 // nil if the response is the empty set or an error occurs.
912 func (lister *s3Lister) First() *s3.Key {
913         lister.getPage()
914         return lister.pop()
915 }
916
917 // Next returns the next item, fetching the next page if necessary. It
918 // returns nil if the last available item has already been fetched, or
919 // an error occurs.
920 func (lister *s3Lister) Next() *s3.Key {
921         if len(lister.buf) == 0 && lister.nextMarker != "" {
922                 lister.getPage()
923         }
924         return lister.pop()
925 }
926
927 // Return the most recent error encountered by First or Next.
928 func (lister *s3Lister) Error() error {
929         return lister.err
930 }
931
932 func (lister *s3Lister) getPage() {
933         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
934         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
935         lister.nextMarker = ""
936         if err != nil {
937                 lister.err = err
938                 return
939         }
940         if resp.IsTruncated {
941                 lister.nextMarker = resp.NextMarker
942         }
943         lister.buf = make([]s3.Key, 0, len(resp.Contents))
944         for _, key := range resp.Contents {
945                 if !strings.HasPrefix(key.Key, lister.Prefix) {
946                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
947                         continue
948                 }
949                 lister.buf = append(lister.buf, key)
950         }
951 }
952
953 func (lister *s3Lister) pop() (k *s3.Key) {
954         if len(lister.buf) > 0 {
955                 k = &lister.buf[0]
956                 lister.buf = lister.buf[1:]
957         }
958         return
959 }
960
961 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
962 type s3bucket struct {
963         *s3.Bucket
964         stats s3bucketStats
965 }
966
967 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
968         rdr, err := b.Bucket.GetReader(path)
969         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
970         b.stats.TickErr(err)
971         return NewCountingReader(rdr, b.stats.TickInBytes), err
972 }
973
974 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
975         resp, err := b.Bucket.Head(path, headers)
976         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
977         b.stats.TickErr(err)
978         return resp, err
979 }
980
981 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
982         if length == 0 {
983                 // goamz will only send Content-Length: 0 when reader
984                 // is nil due to net.http.Request.ContentLength
985                 // behavior.  Otherwise, Content-Length header is
986                 // omitted which will cause some S3 services
987                 // (including AWS and Ceph RadosGW) to fail to create
988                 // empty objects.
989                 r = nil
990         } else {
991                 r = NewCountingReader(r, b.stats.TickOutBytes)
992         }
993         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
994         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
995         b.stats.TickErr(err)
996         return err
997 }
998
999 func (b *s3bucket) Del(path string) error {
1000         err := b.Bucket.Del(path)
1001         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1002         b.stats.TickErr(err)
1003         return err
1004 }
1005
1006 type s3bucketStats struct {
1007         statsTicker
1008         Ops     uint64
1009         GetOps  uint64
1010         PutOps  uint64
1011         HeadOps uint64
1012         DelOps  uint64
1013         ListOps uint64
1014 }
1015
1016 func (s *s3bucketStats) TickErr(err error) {
1017         if err == nil {
1018                 return
1019         }
1020         errType := fmt.Sprintf("%T", err)
1021         if err, ok := err.(*s3.Error); ok {
1022                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1023         }
1024         s.statsTicker.TickErr(err, errType)
1025 }