CWL spec -> CWL standards
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bufio"
9         "bytes"
10         "context"
11         "crypto/sha256"
12         "encoding/base64"
13         "encoding/hex"
14         "encoding/json"
15         "errors"
16         "fmt"
17         "io"
18         "io/ioutil"
19         "net/http"
20         "os"
21         "regexp"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "time"
26
27         "git.arvados.org/arvados.git/sdk/go/arvados"
28         "github.com/AdRoll/goamz/aws"
29         "github.com/AdRoll/goamz/s3"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32 )
33
34 func init() {
35         driver["S3"] = newS3Volume
36 }
37
38 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
39         v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
40         err := json.Unmarshal(volume.DriverParameters, &v)
41         if err != nil {
42                 return nil, err
43         }
44         v.logger = logger.WithField("Volume", v.String())
45         return v, v.check()
46 }
47
48 func (v *S3Volume) check() error {
49         if v.Bucket == "" {
50                 return errors.New("DriverParameters: Bucket must be provided")
51         }
52         if v.IndexPageSize == 0 {
53                 v.IndexPageSize = 1000
54         }
55         if v.RaceWindow < 0 {
56                 return errors.New("DriverParameters: RaceWindow must not be negative")
57         }
58
59         var ok bool
60         v.region, ok = aws.Regions[v.Region]
61         if v.Endpoint == "" {
62                 if !ok {
63                         return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
64                 }
65         } else if ok {
66                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
67                         "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
68         } else {
69                 v.region = aws.Region{
70                         Name:                 v.Region,
71                         S3Endpoint:           v.Endpoint,
72                         S3LocationConstraint: v.LocationConstraint,
73                 }
74         }
75
76         // Zero timeouts mean "wait forever", which is a bad
77         // default. Default to long timeouts instead.
78         if v.ConnectTimeout == 0 {
79                 v.ConnectTimeout = s3DefaultConnectTimeout
80         }
81         if v.ReadTimeout == 0 {
82                 v.ReadTimeout = s3DefaultReadTimeout
83         }
84
85         v.bucket = &s3bucket{
86                 bucket: &s3.Bucket{
87                         S3:   v.newS3Client(),
88                         Name: v.Bucket,
89                 },
90         }
91         // Set up prometheus metrics
92         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
93         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
94
95         err := v.bootstrapIAMCredentials()
96         if err != nil {
97                 return fmt.Errorf("error getting IAM credentials: %s", err)
98         }
99
100         return nil
101 }
102
103 const (
104         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
105         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
106 )
107
108 var (
109         // ErrS3TrashDisabled is returned by Trash if that operation
110         // is impossible with the current config.
111         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
112
113         s3ACL = s3.Private
114
115         zeroTime time.Time
116 )
117
118 const (
119         maxClockSkew  = 600 * time.Second
120         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
121 )
122
123 func s3regions() (okList []string) {
124         for r := range aws.Regions {
125                 okList = append(okList, r)
126         }
127         return
128 }
129
130 // S3Volume implements Volume using an S3 bucket.
131 type S3Volume struct {
132         AccessKey          string
133         SecretKey          string
134         AuthToken          string    // populated automatically when IAMRole is used
135         AuthExpiration     time.Time // populated automatically when IAMRole is used
136         IAMRole            string
137         Endpoint           string
138         Region             string
139         Bucket             string
140         LocationConstraint bool
141         IndexPageSize      int
142         ConnectTimeout     arvados.Duration
143         ReadTimeout        arvados.Duration
144         RaceWindow         arvados.Duration
145         UnsafeDelete       bool
146
147         cluster   *arvados.Cluster
148         volume    arvados.Volume
149         logger    logrus.FieldLogger
150         metrics   *volumeMetricsVecs
151         bucket    *s3bucket
152         region    aws.Region
153         startOnce sync.Once
154 }
155
156 // GetDeviceID returns a globally unique ID for the storage bucket.
157 func (v *S3Volume) GetDeviceID() string {
158         return "s3://" + v.Endpoint + "/" + v.Bucket
159 }
160
161 func (v *S3Volume) bootstrapIAMCredentials() error {
162         if v.AccessKey != "" || v.SecretKey != "" {
163                 if v.IAMRole != "" {
164                         return errors.New("invalid DriverParameters: AccessKey and SecretKey must be blank if IAMRole is specified")
165                 }
166                 return nil
167         }
168         ttl, err := v.updateIAMCredentials()
169         if err != nil {
170                 return err
171         }
172         go func() {
173                 for {
174                         time.Sleep(ttl)
175                         ttl, err = v.updateIAMCredentials()
176                         if err != nil {
177                                 v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole)
178                                 ttl = time.Second
179                         } else if ttl < time.Second {
180                                 v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole)
181                                 ttl = time.Second
182                         }
183                 }
184         }()
185         return nil
186 }
187
188 func (v *S3Volume) newS3Client() *s3.S3 {
189         auth := aws.NewAuth(v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration)
190         client := s3.New(*auth, v.region)
191         if v.region.EC2Endpoint.Signer == aws.V4Signature {
192                 // Currently affects only eu-central-1
193                 client.Signature = aws.V4Signature
194         }
195         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
196         client.ReadTimeout = time.Duration(v.ReadTimeout)
197         return client
198 }
199
200 // returned by AWS metadata endpoint .../security-credentials/${rolename}
201 type iamCredentials struct {
202         Code            string
203         LastUpdated     time.Time
204         Type            string
205         AccessKeyID     string
206         SecretAccessKey string
207         Token           string
208         Expiration      time.Time
209 }
210
211 // Returns TTL of updated credentials, i.e., time to sleep until next
212 // update.
213 func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
214         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
215         defer cancel()
216
217         metadataBaseURL := "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
218
219         var url string
220         if strings.Contains(v.IAMRole, "://") {
221                 // Configuration provides complete URL (used by tests)
222                 url = v.IAMRole
223         } else if v.IAMRole != "" {
224                 // Configuration provides IAM role name and we use the
225                 // AWS metadata endpoint
226                 url = metadataBaseURL + v.IAMRole
227         } else {
228                 url = metadataBaseURL
229                 v.logger.WithField("URL", url).Debug("looking up IAM role name")
230                 req, err := http.NewRequest("GET", url, nil)
231                 if err != nil {
232                         return 0, fmt.Errorf("error setting up request %s: %s", url, err)
233                 }
234                 resp, err := http.DefaultClient.Do(req.WithContext(ctx))
235                 if err != nil {
236                         return 0, fmt.Errorf("error getting %s: %s", url, err)
237                 }
238                 defer resp.Body.Close()
239                 if resp.StatusCode == http.StatusNotFound {
240                         return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKey and SecretKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status)
241                 } else if resp.StatusCode != http.StatusOK {
242                         return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
243                 }
244                 body := bufio.NewReader(resp.Body)
245                 var role string
246                 _, err = fmt.Fscanf(body, "%s\n", &role)
247                 if err != nil {
248                         return 0, fmt.Errorf("error reading response from %s: %s", url, err)
249                 }
250                 if n, _ := body.Read(make([]byte, 64)); n > 0 {
251                         v.logger.Warnf("ignoring additional data returned by metadata endpoint %s after the single role name that we expected", url)
252                 }
253                 v.logger.WithField("Role", role).Debug("looked up IAM role name")
254                 url = url + role
255         }
256
257         v.logger.WithField("URL", url).Debug("getting credentials")
258         req, err := http.NewRequest("GET", url, nil)
259         if err != nil {
260                 return 0, fmt.Errorf("error setting up request %s: %s", url, err)
261         }
262         resp, err := http.DefaultClient.Do(req.WithContext(ctx))
263         if err != nil {
264                 return 0, fmt.Errorf("error getting %s: %s", url, err)
265         }
266         defer resp.Body.Close()
267         if resp.StatusCode != http.StatusOK {
268                 return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
269         }
270         var cred iamCredentials
271         err = json.NewDecoder(resp.Body).Decode(&cred)
272         if err != nil {
273                 return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
274         }
275         v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
276         v.bucket.SetBucket(&s3.Bucket{
277                 S3:   v.newS3Client(),
278                 Name: v.Bucket,
279         })
280         // TTL is time from now to expiration, minus 5m.  "We make new
281         // credentials available at least five minutes before the
282         // expiration of the old credentials."  --
283         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
284         // (If that's not true, the returned ttl might be zero or
285         // negative, which the caller can handle.)
286         ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute
287         v.logger.WithFields(logrus.Fields{
288                 "AccessKeyID": cred.AccessKeyID,
289                 "LastUpdated": cred.LastUpdated,
290                 "Expiration":  cred.Expiration,
291                 "TTL":         arvados.Duration(ttl),
292         }).Debug("updated credentials")
293         return ttl, nil
294 }
295
296 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
297         ready := make(chan bool)
298         go func() {
299                 rdr, err = v.getReader(loc)
300                 close(ready)
301         }()
302         select {
303         case <-ready:
304                 return
305         case <-ctx.Done():
306                 v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
307                 go func() {
308                         <-ready
309                         if err == nil {
310                                 rdr.Close()
311                         }
312                 }()
313                 return nil, ctx.Err()
314         }
315 }
316
317 // getReader wraps (Bucket)GetReader.
318 //
319 // In situations where (Bucket)GetReader would fail because the block
320 // disappeared in a Trash race, getReader calls fixRace to recover the
321 // data, and tries again.
322 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
323         rdr, err = v.bucket.GetReader(loc)
324         err = v.translateError(err)
325         if err == nil || !os.IsNotExist(err) {
326                 return
327         }
328
329         _, err = v.bucket.Head("recent/"+loc, nil)
330         err = v.translateError(err)
331         if err != nil {
332                 // If we can't read recent/X, there's no point in
333                 // trying fixRace. Give up.
334                 return
335         }
336         if !v.fixRace(loc) {
337                 err = os.ErrNotExist
338                 return
339         }
340
341         rdr, err = v.bucket.GetReader(loc)
342         if err != nil {
343                 v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
344                 err = v.translateError(err)
345         }
346         return
347 }
348
349 // Get a block: copy the block data into buf, and return the number of
350 // bytes copied.
351 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
352         rdr, err := v.getReaderWithContext(ctx, loc)
353         if err != nil {
354                 return 0, err
355         }
356
357         var n int
358         ready := make(chan bool)
359         go func() {
360                 defer close(ready)
361
362                 defer rdr.Close()
363                 n, err = io.ReadFull(rdr, buf)
364
365                 switch err {
366                 case nil, io.EOF, io.ErrUnexpectedEOF:
367                         err = nil
368                 default:
369                         err = v.translateError(err)
370                 }
371         }()
372         select {
373         case <-ctx.Done():
374                 v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
375                 rdr.Close()
376                 // Must wait for ReadFull to return, to ensure it
377                 // doesn't write to buf after we return.
378                 v.logger.Debug("s3: waiting for ReadFull() to fail")
379                 <-ready
380                 return 0, ctx.Err()
381         case <-ready:
382                 return n, err
383         }
384 }
385
386 // Compare the given data with the stored data.
387 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
388         errChan := make(chan error, 1)
389         go func() {
390                 _, err := v.bucket.Head("recent/"+loc, nil)
391                 errChan <- err
392         }()
393         var err error
394         select {
395         case <-ctx.Done():
396                 return ctx.Err()
397         case err = <-errChan:
398         }
399         if err != nil {
400                 // Checking for "loc" itself here would interfere with
401                 // future GET requests.
402                 //
403                 // On AWS, if X doesn't exist, a HEAD or GET request
404                 // for X causes X's non-existence to be cached. Thus,
405                 // if we test for X, then create X and return a
406                 // signature to our client, the client might still get
407                 // 404 from all keepstores when trying to read it.
408                 //
409                 // To avoid this, we avoid doing HEAD X or GET X until
410                 // we know X has been written.
411                 //
412                 // Note that X might exist even though recent/X
413                 // doesn't: for example, the response to HEAD recent/X
414                 // might itself come from a stale cache. In such
415                 // cases, we will return a false negative and
416                 // PutHandler might needlessly create another replica
417                 // on a different volume. That's not ideal, but it's
418                 // better than passing the eventually-consistent
419                 // problem on to our clients.
420                 return v.translateError(err)
421         }
422         rdr, err := v.getReaderWithContext(ctx, loc)
423         if err != nil {
424                 return err
425         }
426         defer rdr.Close()
427         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
428 }
429
430 // Put writes a block.
431 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
432         if v.volume.ReadOnly {
433                 return MethodDisabledError
434         }
435         var opts s3.Options
436         size := len(block)
437         if size > 0 {
438                 md5, err := hex.DecodeString(loc)
439                 if err != nil {
440                         return err
441                 }
442                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
443                 // In AWS regions that use V4 signatures, we need to
444                 // provide ContentSHA256 up front. Otherwise, the S3
445                 // library reads the request body (from our buffer)
446                 // into another new buffer in order to compute the
447                 // SHA256 before sending the request -- which would
448                 // mean consuming 128 MiB of memory for the duration
449                 // of a 64 MiB write.
450                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
451         }
452
453         // Send the block data through a pipe, so that (if we need to)
454         // we can close the pipe early and abandon our PutReader()
455         // goroutine, without worrying about PutReader() accessing our
456         // block buffer after we release it.
457         bufr, bufw := io.Pipe()
458         go func() {
459                 io.Copy(bufw, bytes.NewReader(block))
460                 bufw.Close()
461         }()
462
463         var err error
464         ready := make(chan bool)
465         go func() {
466                 defer func() {
467                         if ctx.Err() != nil {
468                                 v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err)
469                         }
470                 }()
471                 defer close(ready)
472                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
473                 if err != nil {
474                         return
475                 }
476                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
477         }()
478         select {
479         case <-ctx.Done():
480                 v.logger.Debugf("taking PutReader's input away: %s", ctx.Err())
481                 // Our pipe might be stuck in Write(), waiting for
482                 // PutReader() to read. If so, un-stick it. This means
483                 // PutReader will get corrupt data, but that's OK: the
484                 // size and MD5 won't match, so the write will fail.
485                 go io.Copy(ioutil.Discard, bufr)
486                 // CloseWithError() will return once pending I/O is done.
487                 bufw.CloseWithError(ctx.Err())
488                 v.logger.Debugf("abandoning PutReader goroutine")
489                 return ctx.Err()
490         case <-ready:
491                 // Unblock pipe in case PutReader did not consume it.
492                 io.Copy(ioutil.Discard, bufr)
493                 return v.translateError(err)
494         }
495 }
496
497 // Touch sets the timestamp for the given locator to the current time.
498 func (v *S3Volume) Touch(loc string) error {
499         if v.volume.ReadOnly {
500                 return MethodDisabledError
501         }
502         _, err := v.bucket.Head(loc, nil)
503         err = v.translateError(err)
504         if os.IsNotExist(err) && v.fixRace(loc) {
505                 // The data object got trashed in a race, but fixRace
506                 // rescued it.
507         } else if err != nil {
508                 return err
509         }
510         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
511         return v.translateError(err)
512 }
513
514 // Mtime returns the stored timestamp for the given locator.
515 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
516         _, err := v.bucket.Head(loc, nil)
517         if err != nil {
518                 return zeroTime, v.translateError(err)
519         }
520         resp, err := v.bucket.Head("recent/"+loc, nil)
521         err = v.translateError(err)
522         if os.IsNotExist(err) {
523                 // The data object X exists, but recent/X is missing.
524                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
525                 if err != nil {
526                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
527                         return zeroTime, v.translateError(err)
528                 }
529                 v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
530                 resp, err = v.bucket.Head("recent/"+loc, nil)
531                 if err != nil {
532                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
533                         return zeroTime, v.translateError(err)
534                 }
535         } else if err != nil {
536                 // HEAD recent/X failed for some other reason.
537                 return zeroTime, err
538         }
539         return v.lastModified(resp)
540 }
541
542 // IndexTo writes a complete list of locators with the given prefix
543 // for which Get() can retrieve data.
544 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
545         // Use a merge sort to find matching sets of X and recent/X.
546         dataL := s3Lister{
547                 Logger:   v.logger,
548                 Bucket:   v.bucket.Bucket(),
549                 Prefix:   prefix,
550                 PageSize: v.IndexPageSize,
551                 Stats:    &v.bucket.stats,
552         }
553         recentL := s3Lister{
554                 Logger:   v.logger,
555                 Bucket:   v.bucket.Bucket(),
556                 Prefix:   "recent/" + prefix,
557                 PageSize: v.IndexPageSize,
558                 Stats:    &v.bucket.stats,
559         }
560         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
561                 if data.Key >= "g" {
562                         // Conveniently, "recent/*" and "trash/*" are
563                         // lexically greater than all hex-encoded data
564                         // hashes, so stopping here avoids iterating
565                         // over all of them needlessly with dataL.
566                         break
567                 }
568                 if !v.isKeepBlock(data.Key) {
569                         continue
570                 }
571
572                 // stamp is the list entry we should use to report the
573                 // last-modified time for this data block: it will be
574                 // the recent/X entry if one exists, otherwise the
575                 // entry for the data block itself.
576                 stamp := data
577
578                 // Advance to the corresponding recent/X marker, if any
579                 for recent != nil && recentL.Error() == nil {
580                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
581                                 recent = recentL.Next()
582                                 continue
583                         } else if cmp == 0 {
584                                 stamp = recent
585                                 recent = recentL.Next()
586                                 break
587                         } else {
588                                 // recent/X marker is missing: we'll
589                                 // use the timestamp on the data
590                                 // object.
591                                 break
592                         }
593                 }
594                 if err := recentL.Error(); err != nil {
595                         return err
596                 }
597                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
598                 if err != nil {
599                         return err
600                 }
601                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
602         }
603         return dataL.Error()
604 }
605
606 // Trash a Keep block.
607 func (v *S3Volume) Trash(loc string) error {
608         if v.volume.ReadOnly {
609                 return MethodDisabledError
610         }
611         if t, err := v.Mtime(loc); err != nil {
612                 return err
613         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
614                 return nil
615         }
616         if v.cluster.Collections.BlobTrashLifetime == 0 {
617                 if !v.UnsafeDelete {
618                         return ErrS3TrashDisabled
619                 }
620                 return v.translateError(v.bucket.Del(loc))
621         }
622         err := v.checkRaceWindow(loc)
623         if err != nil {
624                 return err
625         }
626         err = v.safeCopy("trash/"+loc, loc)
627         if err != nil {
628                 return err
629         }
630         return v.translateError(v.bucket.Del(loc))
631 }
632
633 // checkRaceWindow returns a non-nil error if trash/loc is, or might
634 // be, in the race window (i.e., it's not safe to trash loc).
635 func (v *S3Volume) checkRaceWindow(loc string) error {
636         resp, err := v.bucket.Head("trash/"+loc, nil)
637         err = v.translateError(err)
638         if os.IsNotExist(err) {
639                 // OK, trash/X doesn't exist so we're not in the race
640                 // window
641                 return nil
642         } else if err != nil {
643                 // Error looking up trash/X. We don't know whether
644                 // we're in the race window
645                 return err
646         }
647         t, err := v.lastModified(resp)
648         if err != nil {
649                 // Can't parse timestamp
650                 return err
651         }
652         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
653         if safeWindow <= 0 {
654                 // We can't count on "touch trash/X" to prolong
655                 // trash/X's lifetime. The new timestamp might not
656                 // become visible until now+raceWindow, and EmptyTrash
657                 // is allowed to delete trash/X before then.
658                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
659         }
660         // trash/X exists, but it won't be eligible for deletion until
661         // after now+raceWindow, so it's safe to overwrite it.
662         return nil
663 }
664
665 // safeCopy calls PutCopy, and checks the response to make sure the
666 // copy succeeded and updated the timestamp on the destination object
667 // (PutCopy returns 200 OK if the request was received, even if the
668 // copy failed).
669 func (v *S3Volume) safeCopy(dst, src string) error {
670         resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{
671                 ContentType:       "application/octet-stream",
672                 MetadataDirective: "REPLACE",
673         }, v.bucket.Bucket().Name+"/"+src)
674         err = v.translateError(err)
675         if os.IsNotExist(err) {
676                 return err
677         } else if err != nil {
678                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err)
679         }
680         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
681                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
682         } else if time.Now().Sub(t) > maxClockSkew {
683                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
684         }
685         return nil
686 }
687
688 // Get the LastModified header from resp, and parse it as RFC1123 or
689 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
690 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
691         s := resp.Header.Get("Last-Modified")
692         t, err = time.Parse(time.RFC1123, s)
693         if err != nil && s != "" {
694                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
695                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
696                 // as required by HTTP spec. If it's not a valid HTTP
697                 // header value, it's probably AWS (or s3test) giving
698                 // us a nearly-RFC1123 timestamp.
699                 t, err = time.Parse(nearlyRFC1123, s)
700         }
701         return
702 }
703
704 // Untrash moves block from trash back into store
705 func (v *S3Volume) Untrash(loc string) error {
706         err := v.safeCopy(loc, "trash/"+loc)
707         if err != nil {
708                 return err
709         }
710         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
711         return v.translateError(err)
712 }
713
714 // Status returns a *VolumeStatus representing the current in-use
715 // storage capacity and a fake available capacity that doesn't make
716 // the volume seem full or nearly-full.
717 func (v *S3Volume) Status() *VolumeStatus {
718         return &VolumeStatus{
719                 DeviceNum: 1,
720                 BytesFree: BlockSize * 1000,
721                 BytesUsed: 1,
722         }
723 }
724
725 // InternalStats returns bucket I/O and API call counters.
726 func (v *S3Volume) InternalStats() interface{} {
727         return &v.bucket.stats
728 }
729
730 // String implements fmt.Stringer.
731 func (v *S3Volume) String() string {
732         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
733 }
734
735 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
736
737 func (v *S3Volume) isKeepBlock(s string) bool {
738         return s3KeepBlockRegexp.MatchString(s)
739 }
740
741 // fixRace(X) is called when "recent/X" exists but "X" doesn't
742 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
743 // there was a race between Put and Trash, fixRace recovers from the
744 // race by Untrashing the block.
745 func (v *S3Volume) fixRace(loc string) bool {
746         trash, err := v.bucket.Head("trash/"+loc, nil)
747         if err != nil {
748                 if !os.IsNotExist(v.translateError(err)) {
749                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
750                 }
751                 return false
752         }
753         trashTime, err := v.lastModified(trash)
754         if err != nil {
755                 v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified"))
756                 return false
757         }
758
759         recent, err := v.bucket.Head("recent/"+loc, nil)
760         if err != nil {
761                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
762                 return false
763         }
764         recentTime, err := v.lastModified(recent)
765         if err != nil {
766                 v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified"))
767                 return false
768         }
769
770         ageWhenTrashed := trashTime.Sub(recentTime)
771         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
772                 // No evidence of a race: block hasn't been written
773                 // since it became eligible for Trash. No fix needed.
774                 return false
775         }
776
777         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
778         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
779         err = v.safeCopy(loc, "trash/"+loc)
780         if err != nil {
781                 v.logger.WithError(err).Error("fixRace: copy failed")
782                 return false
783         }
784         return true
785 }
786
787 func (v *S3Volume) translateError(err error) error {
788         switch err := err.(type) {
789         case *s3.Error:
790                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
791                         strings.Contains(err.Error(), "Not Found") {
792                         return os.ErrNotExist
793                 }
794                 // Other 404 errors like NoSuchVersion and
795                 // NoSuchBucket are different problems which should
796                 // get called out downstream, so we don't convert them
797                 // to os.ErrNotExist.
798         }
799         return err
800 }
801
802 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
803 // and deletes them from the volume.
804 func (v *S3Volume) EmptyTrash() {
805         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
806                 return
807         }
808
809         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
810
811         // Define "ready to delete" as "...when EmptyTrash started".
812         startT := time.Now()
813
814         emptyOneKey := func(trash *s3.Key) {
815                 loc := trash.Key[6:]
816                 if !v.isKeepBlock(loc) {
817                         return
818                 }
819                 atomic.AddInt64(&bytesInTrash, trash.Size)
820                 atomic.AddInt64(&blocksInTrash, 1)
821
822                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
823                 if err != nil {
824                         v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
825                         return
826                 }
827                 recent, err := v.bucket.Head("recent/"+loc, nil)
828                 if err != nil && os.IsNotExist(v.translateError(err)) {
829                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
830                         err = v.Untrash(loc)
831                         if err != nil {
832                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
833                         }
834                         return
835                 } else if err != nil {
836                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
837                         return
838                 }
839                 recentT, err := v.lastModified(recent)
840                 if err != nil {
841                         v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
842                         return
843                 }
844                 if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
845                         if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
846                                 // recent/loc is too old to protect
847                                 // loc from being Trashed again during
848                                 // the raceWindow that starts if we
849                                 // delete trash/X now.
850                                 //
851                                 // Note this means (TrashSweepInterval
852                                 // < BlobSigningTTL - raceWindow) is
853                                 // necessary to avoid starvation.
854                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
855                                 v.fixRace(loc)
856                                 v.Touch(loc)
857                                 return
858                         }
859                         _, err := v.bucket.Head(loc, nil)
860                         if os.IsNotExist(err) {
861                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
862                                 v.fixRace(loc)
863                                 return
864                         } else if err != nil {
865                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
866                                 return
867                         }
868                 }
869                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
870                         return
871                 }
872                 err = v.bucket.Del(trash.Key)
873                 if err != nil {
874                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key)
875                         return
876                 }
877                 atomic.AddInt64(&bytesDeleted, trash.Size)
878                 atomic.AddInt64(&blocksDeleted, 1)
879
880                 _, err = v.bucket.Head(loc, nil)
881                 if err == nil {
882                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
883                         return
884                 }
885                 if !os.IsNotExist(v.translateError(err)) {
886                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
887                         return
888                 }
889                 err = v.bucket.Del("recent/" + loc)
890                 if err != nil {
891                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
892                 }
893         }
894
895         var wg sync.WaitGroup
896         todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency)
897         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
898                 wg.Add(1)
899                 go func() {
900                         defer wg.Done()
901                         for key := range todo {
902                                 emptyOneKey(key)
903                         }
904                 }()
905         }
906
907         trashL := s3Lister{
908                 Logger:   v.logger,
909                 Bucket:   v.bucket.Bucket(),
910                 Prefix:   "trash/",
911                 PageSize: v.IndexPageSize,
912                 Stats:    &v.bucket.stats,
913         }
914         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
915                 todo <- trash
916         }
917         close(todo)
918         wg.Wait()
919
920         if err := trashL.Error(); err != nil {
921                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
922         }
923         v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
924 }
925
926 type s3Lister struct {
927         Logger     logrus.FieldLogger
928         Bucket     *s3.Bucket
929         Prefix     string
930         PageSize   int
931         Stats      *s3bucketStats
932         nextMarker string
933         buf        []s3.Key
934         err        error
935 }
936
937 // First fetches the first page and returns the first item. It returns
938 // nil if the response is the empty set or an error occurs.
939 func (lister *s3Lister) First() *s3.Key {
940         lister.getPage()
941         return lister.pop()
942 }
943
944 // Next returns the next item, fetching the next page if necessary. It
945 // returns nil if the last available item has already been fetched, or
946 // an error occurs.
947 func (lister *s3Lister) Next() *s3.Key {
948         if len(lister.buf) == 0 && lister.nextMarker != "" {
949                 lister.getPage()
950         }
951         return lister.pop()
952 }
953
954 // Return the most recent error encountered by First or Next.
955 func (lister *s3Lister) Error() error {
956         return lister.err
957 }
958
959 func (lister *s3Lister) getPage() {
960         lister.Stats.TickOps("list")
961         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
962         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
963         lister.nextMarker = ""
964         if err != nil {
965                 lister.err = err
966                 return
967         }
968         if resp.IsTruncated {
969                 lister.nextMarker = resp.NextMarker
970         }
971         lister.buf = make([]s3.Key, 0, len(resp.Contents))
972         for _, key := range resp.Contents {
973                 if !strings.HasPrefix(key.Key, lister.Prefix) {
974                         lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
975                         continue
976                 }
977                 lister.buf = append(lister.buf, key)
978         }
979 }
980
981 func (lister *s3Lister) pop() (k *s3.Key) {
982         if len(lister.buf) > 0 {
983                 k = &lister.buf[0]
984                 lister.buf = lister.buf[1:]
985         }
986         return
987 }
988
989 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
990 // wrapped bucket can be replaced atomically with SetBucket in order
991 // to update credentials.
992 type s3bucket struct {
993         bucket *s3.Bucket
994         stats  s3bucketStats
995         mu     sync.Mutex
996 }
997
998 func (b *s3bucket) Bucket() *s3.Bucket {
999         b.mu.Lock()
1000         defer b.mu.Unlock()
1001         return b.bucket
1002 }
1003
1004 func (b *s3bucket) SetBucket(bucket *s3.Bucket) {
1005         b.mu.Lock()
1006         defer b.mu.Unlock()
1007         b.bucket = bucket
1008 }
1009
1010 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
1011         rdr, err := b.Bucket().GetReader(path)
1012         b.stats.TickOps("get")
1013         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
1014         b.stats.TickErr(err)
1015         return NewCountingReader(rdr, b.stats.TickInBytes), err
1016 }
1017
1018 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
1019         resp, err := b.Bucket().Head(path, headers)
1020         b.stats.TickOps("head")
1021         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
1022         b.stats.TickErr(err)
1023         return resp, err
1024 }
1025
1026 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
1027         if length == 0 {
1028                 // goamz will only send Content-Length: 0 when reader
1029                 // is nil due to net.http.Request.ContentLength
1030                 // behavior.  Otherwise, Content-Length header is
1031                 // omitted which will cause some S3 services
1032                 // (including AWS and Ceph RadosGW) to fail to create
1033                 // empty objects.
1034                 r = nil
1035         } else {
1036                 r = NewCountingReader(r, b.stats.TickOutBytes)
1037         }
1038         err := b.Bucket().PutReader(path, r, length, contType, perm, options)
1039         b.stats.TickOps("put")
1040         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
1041         b.stats.TickErr(err)
1042         return err
1043 }
1044
1045 func (b *s3bucket) Del(path string) error {
1046         err := b.Bucket().Del(path)
1047         b.stats.TickOps("delete")
1048         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1049         b.stats.TickErr(err)
1050         return err
1051 }
1052
1053 type s3bucketStats struct {
1054         statsTicker
1055         Ops     uint64
1056         GetOps  uint64
1057         PutOps  uint64
1058         HeadOps uint64
1059         DelOps  uint64
1060         ListOps uint64
1061 }
1062
1063 func (s *s3bucketStats) TickErr(err error) {
1064         if err == nil {
1065                 return
1066         }
1067         errType := fmt.Sprintf("%T", err)
1068         if err, ok := err.(*s3.Error); ok {
1069                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1070         }
1071         s.statsTicker.TickErr(err, errType)
1072 }