Merge branch '13593-async-perm-graph-update'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/sha256"
11         "encoding/base64"
12         "encoding/hex"
13         "flag"
14         "fmt"
15         "io"
16         "io/ioutil"
17         "net/http"
18         "os"
19         "regexp"
20         "strings"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         "git.curoverse.com/arvados.git/sdk/go/arvados"
26         "github.com/AdRoll/goamz/aws"
27         "github.com/AdRoll/goamz/s3"
28         "github.com/prometheus/client_golang/prometheus"
29 )
30
31 const (
32         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
33         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
34 )
35
36 var (
37         // ErrS3TrashDisabled is returned by Trash if that operation
38         // is impossible with the current config.
39         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
40
41         s3AccessKeyFile string
42         s3SecretKeyFile string
43         s3RegionName    string
44         s3Endpoint      string
45         s3Replication   int
46         s3UnsafeDelete  bool
47         s3RaceWindow    time.Duration
48
49         s3ACL = s3.Private
50
51         zeroTime time.Time
52 )
53
54 const (
55         maxClockSkew  = 600 * time.Second
56         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
57 )
58
59 type s3VolumeAdder struct {
60         *Config
61 }
62
63 // String implements flag.Value
64 func (s *s3VolumeAdder) String() string {
65         return "-"
66 }
67
68 func (s *s3VolumeAdder) Set(bucketName string) error {
69         if bucketName == "" {
70                 return fmt.Errorf("no container name given")
71         }
72         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
73                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
74         }
75         if deprecated.flagSerializeIO {
76                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
77         }
78         s.Config.Volumes = append(s.Config.Volumes, &S3Volume{
79                 Bucket:        bucketName,
80                 AccessKeyFile: s3AccessKeyFile,
81                 SecretKeyFile: s3SecretKeyFile,
82                 Endpoint:      s3Endpoint,
83                 Region:        s3RegionName,
84                 RaceWindow:    arvados.Duration(s3RaceWindow),
85                 S3Replication: s3Replication,
86                 UnsafeDelete:  s3UnsafeDelete,
87                 ReadOnly:      deprecated.flagReadonly,
88                 IndexPageSize: 1000,
89         })
90         return nil
91 }
92
93 func s3regions() (okList []string) {
94         for r := range aws.Regions {
95                 okList = append(okList, r)
96         }
97         return
98 }
99
100 func init() {
101         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &S3Volume{} })
102
103         flag.Var(&s3VolumeAdder{theConfig},
104                 "s3-bucket-volume",
105                 "Use the given bucket as a storage volume. Can be given multiple times.")
106         flag.StringVar(
107                 &s3RegionName,
108                 "s3-region",
109                 "",
110                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
111         flag.StringVar(
112                 &s3Endpoint,
113                 "s3-endpoint",
114                 "",
115                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
116         flag.StringVar(
117                 &s3AccessKeyFile,
118                 "s3-access-key-file",
119                 "",
120                 "`File` containing the access key used for subsequent -s3-bucket-volume arguments.")
121         flag.StringVar(
122                 &s3SecretKeyFile,
123                 "s3-secret-key-file",
124                 "",
125                 "`File` containing the secret key used for subsequent -s3-bucket-volume arguments.")
126         flag.DurationVar(
127                 &s3RaceWindow,
128                 "s3-race-window",
129                 24*time.Hour,
130                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
131         flag.IntVar(
132                 &s3Replication,
133                 "s3-replication",
134                 2,
135                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
136         flag.BoolVar(
137                 &s3UnsafeDelete,
138                 "s3-unsafe-delete",
139                 false,
140                 "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
141 }
142
143 // S3Volume implements Volume using an S3 bucket.
144 type S3Volume struct {
145         AccessKeyFile      string
146         SecretKeyFile      string
147         Endpoint           string
148         Region             string
149         Bucket             string
150         LocationConstraint bool
151         IndexPageSize      int
152         S3Replication      int
153         ConnectTimeout     arvados.Duration
154         ReadTimeout        arvados.Duration
155         RaceWindow         arvados.Duration
156         ReadOnly           bool
157         UnsafeDelete       bool
158         StorageClasses     []string
159
160         bucket *s3bucket
161
162         startOnce sync.Once
163 }
164
165 // Examples implements VolumeWithExamples.
166 func (*S3Volume) Examples() []Volume {
167         return []Volume{
168                 &S3Volume{
169                         AccessKeyFile:  "/etc/aws_s3_access_key.txt",
170                         SecretKeyFile:  "/etc/aws_s3_secret_key.txt",
171                         Endpoint:       "",
172                         Region:         "us-east-1",
173                         Bucket:         "example-bucket-name",
174                         IndexPageSize:  1000,
175                         S3Replication:  2,
176                         RaceWindow:     arvados.Duration(24 * time.Hour),
177                         ConnectTimeout: arvados.Duration(time.Minute),
178                         ReadTimeout:    arvados.Duration(5 * time.Minute),
179                 },
180                 &S3Volume{
181                         AccessKeyFile:  "/etc/gce_s3_access_key.txt",
182                         SecretKeyFile:  "/etc/gce_s3_secret_key.txt",
183                         Endpoint:       "https://storage.googleapis.com",
184                         Region:         "",
185                         Bucket:         "example-bucket-name",
186                         IndexPageSize:  1000,
187                         S3Replication:  2,
188                         RaceWindow:     arvados.Duration(24 * time.Hour),
189                         ConnectTimeout: arvados.Duration(time.Minute),
190                         ReadTimeout:    arvados.Duration(5 * time.Minute),
191                 },
192         }
193 }
194
195 // Type implements Volume.
196 func (*S3Volume) Type() string {
197         return "S3"
198 }
199
200 // Start populates private fields and verifies the configuration is
201 // valid.
202 func (v *S3Volume) Start(vm *volumeMetricsVecs) error {
203         region, ok := aws.Regions[v.Region]
204         if v.Endpoint == "" {
205                 if !ok {
206                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", v.Region)
207                 }
208         } else if ok {
209                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
210                         "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", v.Region, v.Endpoint)
211         } else {
212                 region = aws.Region{
213                         Name:                 v.Region,
214                         S3Endpoint:           v.Endpoint,
215                         S3LocationConstraint: v.LocationConstraint,
216                 }
217         }
218
219         var err error
220         var auth aws.Auth
221         auth.AccessKey, err = readKeyFromFile(v.AccessKeyFile)
222         if err != nil {
223                 return err
224         }
225         auth.SecretKey, err = readKeyFromFile(v.SecretKeyFile)
226         if err != nil {
227                 return err
228         }
229
230         // Zero timeouts mean "wait forever", which is a bad
231         // default. Default to long timeouts instead.
232         if v.ConnectTimeout == 0 {
233                 v.ConnectTimeout = s3DefaultConnectTimeout
234         }
235         if v.ReadTimeout == 0 {
236                 v.ReadTimeout = s3DefaultReadTimeout
237         }
238
239         client := s3.New(auth, region)
240         if region.EC2Endpoint.Signer == aws.V4Signature {
241                 // Currently affects only eu-central-1
242                 client.Signature = aws.V4Signature
243         }
244         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
245         client.ReadTimeout = time.Duration(v.ReadTimeout)
246         v.bucket = &s3bucket{
247                 Bucket: &s3.Bucket{
248                         S3:   client,
249                         Name: v.Bucket,
250                 },
251         }
252         // Set up prometheus metrics
253         lbls := prometheus.Labels{"device_id": v.DeviceID()}
254         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = vm.getCounterVecsFor(lbls)
255
256         return nil
257 }
258
259 // DeviceID returns a globally unique ID for the storage bucket.
260 func (v *S3Volume) DeviceID() string {
261         return "s3://" + v.Endpoint + "/" + v.Bucket
262 }
263
264 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
265         ready := make(chan bool)
266         go func() {
267                 rdr, err = v.getReader(loc)
268                 close(ready)
269         }()
270         select {
271         case <-ready:
272                 return
273         case <-ctx.Done():
274                 theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
275                 go func() {
276                         <-ready
277                         if err == nil {
278                                 rdr.Close()
279                         }
280                 }()
281                 return nil, ctx.Err()
282         }
283 }
284
285 // getReader wraps (Bucket)GetReader.
286 //
287 // In situations where (Bucket)GetReader would fail because the block
288 // disappeared in a Trash race, getReader calls fixRace to recover the
289 // data, and tries again.
290 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
291         rdr, err = v.bucket.GetReader(loc)
292         err = v.translateError(err)
293         if err == nil || !os.IsNotExist(err) {
294                 return
295         }
296
297         _, err = v.bucket.Head("recent/"+loc, nil)
298         err = v.translateError(err)
299         if err != nil {
300                 // If we can't read recent/X, there's no point in
301                 // trying fixRace. Give up.
302                 return
303         }
304         if !v.fixRace(loc) {
305                 err = os.ErrNotExist
306                 return
307         }
308
309         rdr, err = v.bucket.GetReader(loc)
310         if err != nil {
311                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
312                 err = v.translateError(err)
313         }
314         return
315 }
316
317 // Get a block: copy the block data into buf, and return the number of
318 // bytes copied.
319 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
320         rdr, err := v.getReaderWithContext(ctx, loc)
321         if err != nil {
322                 return 0, err
323         }
324
325         var n int
326         ready := make(chan bool)
327         go func() {
328                 defer close(ready)
329
330                 defer rdr.Close()
331                 n, err = io.ReadFull(rdr, buf)
332
333                 switch err {
334                 case nil, io.EOF, io.ErrUnexpectedEOF:
335                         err = nil
336                 default:
337                         err = v.translateError(err)
338                 }
339         }()
340         select {
341         case <-ctx.Done():
342                 theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
343                 rdr.Close()
344                 // Must wait for ReadFull to return, to ensure it
345                 // doesn't write to buf after we return.
346                 theConfig.debugLogf("s3: waiting for ReadFull() to fail")
347                 <-ready
348                 return 0, ctx.Err()
349         case <-ready:
350                 return n, err
351         }
352 }
353
354 // Compare the given data with the stored data.
355 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
356         errChan := make(chan error, 1)
357         go func() {
358                 _, err := v.bucket.Head("recent/"+loc, nil)
359                 errChan <- err
360         }()
361         var err error
362         select {
363         case <-ctx.Done():
364                 return ctx.Err()
365         case err = <-errChan:
366         }
367         if err != nil {
368                 // Checking for "loc" itself here would interfere with
369                 // future GET requests.
370                 //
371                 // On AWS, if X doesn't exist, a HEAD or GET request
372                 // for X causes X's non-existence to be cached. Thus,
373                 // if we test for X, then create X and return a
374                 // signature to our client, the client might still get
375                 // 404 from all keepstores when trying to read it.
376                 //
377                 // To avoid this, we avoid doing HEAD X or GET X until
378                 // we know X has been written.
379                 //
380                 // Note that X might exist even though recent/X
381                 // doesn't: for example, the response to HEAD recent/X
382                 // might itself come from a stale cache. In such
383                 // cases, we will return a false negative and
384                 // PutHandler might needlessly create another replica
385                 // on a different volume. That's not ideal, but it's
386                 // better than passing the eventually-consistent
387                 // problem on to our clients.
388                 return v.translateError(err)
389         }
390         rdr, err := v.getReaderWithContext(ctx, loc)
391         if err != nil {
392                 return err
393         }
394         defer rdr.Close()
395         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
396 }
397
398 // Put writes a block.
399 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
400         if v.ReadOnly {
401                 return MethodDisabledError
402         }
403         var opts s3.Options
404         size := len(block)
405         if size > 0 {
406                 md5, err := hex.DecodeString(loc)
407                 if err != nil {
408                         return err
409                 }
410                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
411                 // In AWS regions that use V4 signatures, we need to
412                 // provide ContentSHA256 up front. Otherwise, the S3
413                 // library reads the request body (from our buffer)
414                 // into another new buffer in order to compute the
415                 // SHA256 before sending the request -- which would
416                 // mean consuming 128 MiB of memory for the duration
417                 // of a 64 MiB write.
418                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
419         }
420
421         // Send the block data through a pipe, so that (if we need to)
422         // we can close the pipe early and abandon our PutReader()
423         // goroutine, without worrying about PutReader() accessing our
424         // block buffer after we release it.
425         bufr, bufw := io.Pipe()
426         go func() {
427                 io.Copy(bufw, bytes.NewReader(block))
428                 bufw.Close()
429         }()
430
431         var err error
432         ready := make(chan bool)
433         go func() {
434                 defer func() {
435                         if ctx.Err() != nil {
436                                 theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
437                         }
438                 }()
439                 defer close(ready)
440                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
441                 if err != nil {
442                         return
443                 }
444                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
445         }()
446         select {
447         case <-ctx.Done():
448                 theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
449                 // Our pipe might be stuck in Write(), waiting for
450                 // PutReader() to read. If so, un-stick it. This means
451                 // PutReader will get corrupt data, but that's OK: the
452                 // size and MD5 won't match, so the write will fail.
453                 go io.Copy(ioutil.Discard, bufr)
454                 // CloseWithError() will return once pending I/O is done.
455                 bufw.CloseWithError(ctx.Err())
456                 theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
457                 return ctx.Err()
458         case <-ready:
459                 // Unblock pipe in case PutReader did not consume it.
460                 io.Copy(ioutil.Discard, bufr)
461                 return v.translateError(err)
462         }
463 }
464
465 // Touch sets the timestamp for the given locator to the current time.
466 func (v *S3Volume) Touch(loc string) error {
467         if v.ReadOnly {
468                 return MethodDisabledError
469         }
470         _, err := v.bucket.Head(loc, nil)
471         err = v.translateError(err)
472         if os.IsNotExist(err) && v.fixRace(loc) {
473                 // The data object got trashed in a race, but fixRace
474                 // rescued it.
475         } else if err != nil {
476                 return err
477         }
478         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
479         return v.translateError(err)
480 }
481
482 // Mtime returns the stored timestamp for the given locator.
483 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
484         _, err := v.bucket.Head(loc, nil)
485         if err != nil {
486                 return zeroTime, v.translateError(err)
487         }
488         resp, err := v.bucket.Head("recent/"+loc, nil)
489         err = v.translateError(err)
490         if os.IsNotExist(err) {
491                 // The data object X exists, but recent/X is missing.
492                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
493                 if err != nil {
494                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
495                         return zeroTime, v.translateError(err)
496                 }
497                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
498                 resp, err = v.bucket.Head("recent/"+loc, nil)
499                 if err != nil {
500                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
501                         return zeroTime, v.translateError(err)
502                 }
503         } else if err != nil {
504                 // HEAD recent/X failed for some other reason.
505                 return zeroTime, err
506         }
507         return v.lastModified(resp)
508 }
509
510 // IndexTo writes a complete list of locators with the given prefix
511 // for which Get() can retrieve data.
512 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
513         // Use a merge sort to find matching sets of X and recent/X.
514         dataL := s3Lister{
515                 Bucket:   v.bucket.Bucket,
516                 Prefix:   prefix,
517                 PageSize: v.IndexPageSize,
518                 Stats:    &v.bucket.stats,
519         }
520         recentL := s3Lister{
521                 Bucket:   v.bucket.Bucket,
522                 Prefix:   "recent/" + prefix,
523                 PageSize: v.IndexPageSize,
524                 Stats:    &v.bucket.stats,
525         }
526         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
527                 if data.Key >= "g" {
528                         // Conveniently, "recent/*" and "trash/*" are
529                         // lexically greater than all hex-encoded data
530                         // hashes, so stopping here avoids iterating
531                         // over all of them needlessly with dataL.
532                         break
533                 }
534                 if !v.isKeepBlock(data.Key) {
535                         continue
536                 }
537
538                 // stamp is the list entry we should use to report the
539                 // last-modified time for this data block: it will be
540                 // the recent/X entry if one exists, otherwise the
541                 // entry for the data block itself.
542                 stamp := data
543
544                 // Advance to the corresponding recent/X marker, if any
545                 for recent != nil && recentL.Error() == nil {
546                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
547                                 recent = recentL.Next()
548                                 continue
549                         } else if cmp == 0 {
550                                 stamp = recent
551                                 recent = recentL.Next()
552                                 break
553                         } else {
554                                 // recent/X marker is missing: we'll
555                                 // use the timestamp on the data
556                                 // object.
557                                 break
558                         }
559                 }
560                 if err := recentL.Error(); err != nil {
561                         return err
562                 }
563                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
564                 if err != nil {
565                         return err
566                 }
567                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
568         }
569         return dataL.Error()
570 }
571
572 // Trash a Keep block.
573 func (v *S3Volume) Trash(loc string) error {
574         if v.ReadOnly {
575                 return MethodDisabledError
576         }
577         if t, err := v.Mtime(loc); err != nil {
578                 return err
579         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
580                 return nil
581         }
582         if theConfig.TrashLifetime == 0 {
583                 if !s3UnsafeDelete {
584                         return ErrS3TrashDisabled
585                 }
586                 return v.translateError(v.bucket.Del(loc))
587         }
588         err := v.checkRaceWindow(loc)
589         if err != nil {
590                 return err
591         }
592         err = v.safeCopy("trash/"+loc, loc)
593         if err != nil {
594                 return err
595         }
596         return v.translateError(v.bucket.Del(loc))
597 }
598
599 // checkRaceWindow returns a non-nil error if trash/loc is, or might
600 // be, in the race window (i.e., it's not safe to trash loc).
601 func (v *S3Volume) checkRaceWindow(loc string) error {
602         resp, err := v.bucket.Head("trash/"+loc, nil)
603         err = v.translateError(err)
604         if os.IsNotExist(err) {
605                 // OK, trash/X doesn't exist so we're not in the race
606                 // window
607                 return nil
608         } else if err != nil {
609                 // Error looking up trash/X. We don't know whether
610                 // we're in the race window
611                 return err
612         }
613         t, err := v.lastModified(resp)
614         if err != nil {
615                 // Can't parse timestamp
616                 return err
617         }
618         safeWindow := t.Add(theConfig.TrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
619         if safeWindow <= 0 {
620                 // We can't count on "touch trash/X" to prolong
621                 // trash/X's lifetime. The new timestamp might not
622                 // become visible until now+raceWindow, and EmptyTrash
623                 // is allowed to delete trash/X before then.
624                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
625         }
626         // trash/X exists, but it won't be eligible for deletion until
627         // after now+raceWindow, so it's safe to overwrite it.
628         return nil
629 }
630
631 // safeCopy calls PutCopy, and checks the response to make sure the
632 // copy succeeded and updated the timestamp on the destination object
633 // (PutCopy returns 200 OK if the request was received, even if the
634 // copy failed).
635 func (v *S3Volume) safeCopy(dst, src string) error {
636         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
637                 ContentType:       "application/octet-stream",
638                 MetadataDirective: "REPLACE",
639         }, v.bucket.Name+"/"+src)
640         err = v.translateError(err)
641         if os.IsNotExist(err) {
642                 return err
643         } else if err != nil {
644                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
645         }
646         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
647                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
648         } else if time.Now().Sub(t) > maxClockSkew {
649                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
650         }
651         return nil
652 }
653
654 // Get the LastModified header from resp, and parse it as RFC1123 or
655 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
656 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
657         s := resp.Header.Get("Last-Modified")
658         t, err = time.Parse(time.RFC1123, s)
659         if err != nil && s != "" {
660                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
661                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
662                 // as required by HTTP spec. If it's not a valid HTTP
663                 // header value, it's probably AWS (or s3test) giving
664                 // us a nearly-RFC1123 timestamp.
665                 t, err = time.Parse(nearlyRFC1123, s)
666         }
667         return
668 }
669
670 // Untrash moves block from trash back into store
671 func (v *S3Volume) Untrash(loc string) error {
672         err := v.safeCopy(loc, "trash/"+loc)
673         if err != nil {
674                 return err
675         }
676         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
677         return v.translateError(err)
678 }
679
680 // Status returns a *VolumeStatus representing the current in-use
681 // storage capacity and a fake available capacity that doesn't make
682 // the volume seem full or nearly-full.
683 func (v *S3Volume) Status() *VolumeStatus {
684         return &VolumeStatus{
685                 DeviceNum: 1,
686                 BytesFree: BlockSize * 1000,
687                 BytesUsed: 1,
688         }
689 }
690
691 // InternalStats returns bucket I/O and API call counters.
692 func (v *S3Volume) InternalStats() interface{} {
693         return &v.bucket.stats
694 }
695
696 // String implements fmt.Stringer.
697 func (v *S3Volume) String() string {
698         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
699 }
700
701 // Writable returns false if all future Put, Mtime, and Delete calls
702 // are expected to fail.
703 func (v *S3Volume) Writable() bool {
704         return !v.ReadOnly
705 }
706
707 // Replication returns the storage redundancy of the underlying
708 // device. Configured via command line flag.
709 func (v *S3Volume) Replication() int {
710         return v.S3Replication
711 }
712
713 // GetStorageClasses implements Volume
714 func (v *S3Volume) GetStorageClasses() []string {
715         return v.StorageClasses
716 }
717
718 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
719
720 func (v *S3Volume) isKeepBlock(s string) bool {
721         return s3KeepBlockRegexp.MatchString(s)
722 }
723
724 // fixRace(X) is called when "recent/X" exists but "X" doesn't
725 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
726 // there was a race between Put and Trash, fixRace recovers from the
727 // race by Untrashing the block.
728 func (v *S3Volume) fixRace(loc string) bool {
729         trash, err := v.bucket.Head("trash/"+loc, nil)
730         if err != nil {
731                 if !os.IsNotExist(v.translateError(err)) {
732                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
733                 }
734                 return false
735         }
736         trashTime, err := v.lastModified(trash)
737         if err != nil {
738                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
739                 return false
740         }
741
742         recent, err := v.bucket.Head("recent/"+loc, nil)
743         if err != nil {
744                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
745                 return false
746         }
747         recentTime, err := v.lastModified(recent)
748         if err != nil {
749                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
750                 return false
751         }
752
753         ageWhenTrashed := trashTime.Sub(recentTime)
754         if ageWhenTrashed >= theConfig.BlobSignatureTTL.Duration() {
755                 // No evidence of a race: block hasn't been written
756                 // since it became eligible for Trash. No fix needed.
757                 return false
758         }
759
760         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, theConfig.BlobSignatureTTL)
761         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
762         err = v.safeCopy(loc, "trash/"+loc)
763         if err != nil {
764                 log.Printf("error: fixRace: %s", err)
765                 return false
766         }
767         return true
768 }
769
770 func (v *S3Volume) translateError(err error) error {
771         switch err := err.(type) {
772         case *s3.Error:
773                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
774                         strings.Contains(err.Error(), "Not Found") {
775                         return os.ErrNotExist
776                 }
777                 // Other 404 errors like NoSuchVersion and
778                 // NoSuchBucket are different problems which should
779                 // get called out downstream, so we don't convert them
780                 // to os.ErrNotExist.
781         }
782         return err
783 }
784
785 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
786 // and deletes them from the volume.
787 func (v *S3Volume) EmptyTrash() {
788         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
789
790         // Define "ready to delete" as "...when EmptyTrash started".
791         startT := time.Now()
792
793         emptyOneKey := func(trash *s3.Key) {
794                 loc := trash.Key[6:]
795                 if !v.isKeepBlock(loc) {
796                         return
797                 }
798                 atomic.AddInt64(&bytesInTrash, trash.Size)
799                 atomic.AddInt64(&blocksInTrash, 1)
800
801                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
802                 if err != nil {
803                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
804                         return
805                 }
806                 recent, err := v.bucket.Head("recent/"+loc, nil)
807                 if err != nil && os.IsNotExist(v.translateError(err)) {
808                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
809                         err = v.Untrash(loc)
810                         if err != nil {
811                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
812                         }
813                         return
814                 } else if err != nil {
815                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
816                         return
817                 }
818                 recentT, err := v.lastModified(recent)
819                 if err != nil {
820                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
821                         return
822                 }
823                 if trashT.Sub(recentT) < theConfig.BlobSignatureTTL.Duration() {
824                         if age := startT.Sub(recentT); age >= theConfig.BlobSignatureTTL.Duration()-time.Duration(v.RaceWindow) {
825                                 // recent/loc is too old to protect
826                                 // loc from being Trashed again during
827                                 // the raceWindow that starts if we
828                                 // delete trash/X now.
829                                 //
830                                 // Note this means (TrashCheckInterval
831                                 // < BlobSignatureTTL - raceWindow) is
832                                 // necessary to avoid starvation.
833                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
834                                 v.fixRace(loc)
835                                 v.Touch(loc)
836                                 return
837                         }
838                         _, err := v.bucket.Head(loc, nil)
839                         if os.IsNotExist(err) {
840                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
841                                 v.fixRace(loc)
842                                 return
843                         } else if err != nil {
844                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
845                                 return
846                         }
847                 }
848                 if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() {
849                         return
850                 }
851                 err = v.bucket.Del(trash.Key)
852                 if err != nil {
853                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
854                         return
855                 }
856                 atomic.AddInt64(&bytesDeleted, trash.Size)
857                 atomic.AddInt64(&blocksDeleted, 1)
858
859                 _, err = v.bucket.Head(loc, nil)
860                 if err == nil {
861                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
862                         return
863                 }
864                 if !os.IsNotExist(v.translateError(err)) {
865                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
866                         return
867                 }
868                 err = v.bucket.Del("recent/" + loc)
869                 if err != nil {
870                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
871                 }
872         }
873
874         var wg sync.WaitGroup
875         todo := make(chan *s3.Key, theConfig.EmptyTrashWorkers)
876         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
877                 wg.Add(1)
878                 go func() {
879                         defer wg.Done()
880                         for key := range todo {
881                                 emptyOneKey(key)
882                         }
883                 }()
884         }
885
886         trashL := s3Lister{
887                 Bucket:   v.bucket.Bucket,
888                 Prefix:   "trash/",
889                 PageSize: v.IndexPageSize,
890                 Stats:    &v.bucket.stats,
891         }
892         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
893                 todo <- trash
894         }
895         close(todo)
896         wg.Wait()
897
898         if err := trashL.Error(); err != nil {
899                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
900         }
901         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
902 }
903
904 type s3Lister struct {
905         Bucket     *s3.Bucket
906         Prefix     string
907         PageSize   int
908         Stats      *s3bucketStats
909         nextMarker string
910         buf        []s3.Key
911         err        error
912 }
913
914 // First fetches the first page and returns the first item. It returns
915 // nil if the response is the empty set or an error occurs.
916 func (lister *s3Lister) First() *s3.Key {
917         lister.getPage()
918         return lister.pop()
919 }
920
921 // Next returns the next item, fetching the next page if necessary. It
922 // returns nil if the last available item has already been fetched, or
923 // an error occurs.
924 func (lister *s3Lister) Next() *s3.Key {
925         if len(lister.buf) == 0 && lister.nextMarker != "" {
926                 lister.getPage()
927         }
928         return lister.pop()
929 }
930
931 // Return the most recent error encountered by First or Next.
932 func (lister *s3Lister) Error() error {
933         return lister.err
934 }
935
936 func (lister *s3Lister) getPage() {
937         lister.Stats.TickOps("list")
938         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
939         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
940         lister.nextMarker = ""
941         if err != nil {
942                 lister.err = err
943                 return
944         }
945         if resp.IsTruncated {
946                 lister.nextMarker = resp.NextMarker
947         }
948         lister.buf = make([]s3.Key, 0, len(resp.Contents))
949         for _, key := range resp.Contents {
950                 if !strings.HasPrefix(key.Key, lister.Prefix) {
951                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
952                         continue
953                 }
954                 lister.buf = append(lister.buf, key)
955         }
956 }
957
958 func (lister *s3Lister) pop() (k *s3.Key) {
959         if len(lister.buf) > 0 {
960                 k = &lister.buf[0]
961                 lister.buf = lister.buf[1:]
962         }
963         return
964 }
965
966 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
967 type s3bucket struct {
968         *s3.Bucket
969         stats s3bucketStats
970 }
971
972 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
973         rdr, err := b.Bucket.GetReader(path)
974         b.stats.TickOps("get")
975         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
976         b.stats.TickErr(err)
977         return NewCountingReader(rdr, b.stats.TickInBytes), err
978 }
979
980 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
981         resp, err := b.Bucket.Head(path, headers)
982         b.stats.TickOps("head")
983         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
984         b.stats.TickErr(err)
985         return resp, err
986 }
987
988 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
989         if length == 0 {
990                 // goamz will only send Content-Length: 0 when reader
991                 // is nil due to net.http.Request.ContentLength
992                 // behavior.  Otherwise, Content-Length header is
993                 // omitted which will cause some S3 services
994                 // (including AWS and Ceph RadosGW) to fail to create
995                 // empty objects.
996                 r = nil
997         } else {
998                 r = NewCountingReader(r, b.stats.TickOutBytes)
999         }
1000         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
1001         b.stats.TickOps("put")
1002         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
1003         b.stats.TickErr(err)
1004         return err
1005 }
1006
1007 func (b *s3bucket) Del(path string) error {
1008         err := b.Bucket.Del(path)
1009         b.stats.TickOps("delete")
1010         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1011         b.stats.TickErr(err)
1012         return err
1013 }
1014
1015 type s3bucketStats struct {
1016         statsTicker
1017         Ops     uint64
1018         GetOps  uint64
1019         PutOps  uint64
1020         HeadOps uint64
1021         DelOps  uint64
1022         ListOps uint64
1023 }
1024
1025 func (s *s3bucketStats) TickErr(err error) {
1026         if err == nil {
1027                 return
1028         }
1029         errType := fmt.Sprintf("%T", err)
1030         if err, ok := err.(*s3.Error); ok {
1031                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1032         }
1033         s.statsTicker.TickErr(err, errType)
1034 }