15599: Warn if multiple roles are assigned.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bufio"
9         "bytes"
10         "context"
11         "crypto/sha256"
12         "encoding/base64"
13         "encoding/hex"
14         "encoding/json"
15         "errors"
16         "fmt"
17         "io"
18         "io/ioutil"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "strings"
24         "sync"
25         "sync/atomic"
26         "time"
27
28         "git.curoverse.com/arvados.git/sdk/go/arvados"
29         "github.com/AdRoll/goamz/aws"
30         "github.com/AdRoll/goamz/s3"
31         "github.com/prometheus/client_golang/prometheus"
32         "github.com/sirupsen/logrus"
33 )
34
35 func init() {
36         driver["S3"] = newS3Volume
37 }
38
39 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
40         v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
41         err := json.Unmarshal(volume.DriverParameters, &v)
42         if err != nil {
43                 return nil, err
44         }
45         return v, v.check()
46 }
47
48 func (v *S3Volume) check() error {
49         if v.Bucket == "" {
50                 return errors.New("DriverParameters: Bucket must be provided")
51         }
52         if v.IndexPageSize == 0 {
53                 v.IndexPageSize = 1000
54         }
55         if v.RaceWindow < 0 {
56                 return errors.New("DriverParameters: RaceWindow must not be negative")
57         }
58
59         var ok bool
60         v.region, ok = aws.Regions[v.Region]
61         if v.Endpoint == "" {
62                 if !ok {
63                         return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
64                 }
65         } else if ok {
66                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
67                         "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
68         } else {
69                 v.region = aws.Region{
70                         Name:                 v.Region,
71                         S3Endpoint:           v.Endpoint,
72                         S3LocationConstraint: v.LocationConstraint,
73                 }
74         }
75
76         // Zero timeouts mean "wait forever", which is a bad
77         // default. Default to long timeouts instead.
78         if v.ConnectTimeout == 0 {
79                 v.ConnectTimeout = s3DefaultConnectTimeout
80         }
81         if v.ReadTimeout == 0 {
82                 v.ReadTimeout = s3DefaultReadTimeout
83         }
84
85         v.bucket = &s3bucket{
86                 bucket: &s3.Bucket{
87                         S3:   v.newS3Client(),
88                         Name: v.Bucket,
89                 },
90         }
91         // Set up prometheus metrics
92         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
93         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
94
95         err := v.bootstrapIAMCredentials()
96         if err != nil {
97                 return fmt.Errorf("error getting IAM credentials: %s", err)
98         }
99
100         return nil
101 }
102
103 const (
104         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
105         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
106 )
107
108 var (
109         // ErrS3TrashDisabled is returned by Trash if that operation
110         // is impossible with the current config.
111         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
112
113         s3ACL = s3.Private
114
115         zeroTime time.Time
116 )
117
118 const (
119         maxClockSkew  = 600 * time.Second
120         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
121 )
122
123 func s3regions() (okList []string) {
124         for r := range aws.Regions {
125                 okList = append(okList, r)
126         }
127         return
128 }
129
130 // S3Volume implements Volume using an S3 bucket.
131 type S3Volume struct {
132         AccessKey          string
133         SecretKey          string
134         AuthToken          string    // populated automatically when IAMRole is used
135         AuthExpiration     time.Time // populated automatically when IAMRole is used
136         IAMRole            string
137         Endpoint           string
138         Region             string
139         Bucket             string
140         LocationConstraint bool
141         IndexPageSize      int
142         ConnectTimeout     arvados.Duration
143         ReadTimeout        arvados.Duration
144         RaceWindow         arvados.Duration
145         UnsafeDelete       bool
146
147         cluster   *arvados.Cluster
148         volume    arvados.Volume
149         logger    logrus.FieldLogger
150         metrics   *volumeMetricsVecs
151         bucket    *s3bucket
152         region    aws.Region
153         startOnce sync.Once
154 }
155
156 // GetDeviceID returns a globally unique ID for the storage bucket.
157 func (v *S3Volume) GetDeviceID() string {
158         return "s3://" + v.Endpoint + "/" + v.Bucket
159 }
160
161 func (v *S3Volume) bootstrapIAMCredentials() error {
162         if v.AccessKey != "" || v.SecretKey != "" {
163                 return nil
164         }
165         ttl, err := v.updateIAMCredentials()
166         if err != nil {
167                 return err
168         }
169         go func() {
170                 for {
171                         time.Sleep(ttl)
172                         ttl, err = v.updateIAMCredentials()
173                         if err != nil {
174                                 v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole)
175                                 ttl = time.Second
176                         } else if ttl < time.Second {
177                                 v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole)
178                                 ttl = time.Second
179                         }
180                 }
181         }()
182         return nil
183 }
184
185 func (v *S3Volume) newS3Client() *s3.S3 {
186         auth := aws.NewAuth(v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration)
187         client := s3.New(*auth, v.region)
188         if v.region.EC2Endpoint.Signer == aws.V4Signature {
189                 // Currently affects only eu-central-1
190                 client.Signature = aws.V4Signature
191         }
192         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
193         client.ReadTimeout = time.Duration(v.ReadTimeout)
194         return client
195 }
196
197 // returned by AWS metadata endpoint .../security-credentials/${rolename}
198 type iamCredentials struct {
199         Code            string
200         LastUpdated     time.Time
201         Type            string
202         AccessKeyID     string
203         SecretAccessKey string
204         Token           string
205         Expiration      time.Time
206 }
207
208 // Returns TTL of updated credentials, i.e., time to sleep until next
209 // update.
210 func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
211         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
212         defer cancel()
213
214         metadataBaseURL := "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
215
216         var url string
217         if strings.Contains(v.IAMRole, "://") {
218                 // Configuration provides complete URL (used by tests)
219                 url = v.IAMRole
220         } else if v.IAMRole != "" {
221                 // Configuration provides IAM role name and we use the
222                 // AWS metadata endpoint
223                 url = metadataBaseURL + v.IAMRole
224         } else {
225                 url = metadataBaseURL
226                 v.logger.WithField("URL", url).Debug("looking up IAM role name")
227                 req, err := http.NewRequest("GET", url, nil)
228                 if err != nil {
229                         return 0, fmt.Errorf("error setting up request %s: %s", url, err)
230                 }
231                 resp, err := http.DefaultClient.Do(req.WithContext(ctx))
232                 if err != nil {
233                         return 0, fmt.Errorf("error getting %s: %s", url, err)
234                 }
235                 defer resp.Body.Close()
236                 if resp.StatusCode != http.StatusOK {
237                         return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
238                 }
239                 body := bufio.NewReader(resp.Body)
240                 var role string
241                 _, err = fmt.Fscanf(body, "%s\n", &role)
242                 if err != nil {
243                         return 0, fmt.Errorf("error reading response from %s: %s", url, err)
244                 }
245                 if n, _ := body.Read(make([]byte, 64)); n > 0 {
246                         v.logger.Warnf("ignoring additional data returned by metadata endpoint %s after the single role name that we expected", url)
247                 }
248                 v.logger.WithField("Role", role).Debug("looked up IAM role name")
249                 url = url + role
250         }
251
252         v.logger.WithField("URL", url).Debug("getting credentials")
253         req, err := http.NewRequest("GET", url, nil)
254         if err != nil {
255                 return 0, fmt.Errorf("error setting up request %s: %s", url, err)
256         }
257         resp, err := http.DefaultClient.Do(req.WithContext(ctx))
258         if err != nil {
259                 return 0, fmt.Errorf("error getting %s: %s", url, err)
260         }
261         defer resp.Body.Close()
262         if resp.StatusCode != http.StatusOK {
263                 return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
264         }
265         var cred iamCredentials
266         err = json.NewDecoder(resp.Body).Decode(&cred)
267         if err != nil {
268                 return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
269         }
270         v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
271         v.bucket.SetBucket(&s3.Bucket{
272                 S3:   v.newS3Client(),
273                 Name: v.Bucket,
274         })
275         // TTL is time from now to expiration, minus 5m.  "We make new
276         // credentials available at least five minutes before the
277         // expiration of the old credentials."  --
278         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
279         // (If that's not true, the returned ttl might be zero or
280         // negative, which the caller can handle.)
281         ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute
282         v.logger.WithFields(logrus.Fields{
283                 "AccessKeyID": cred.AccessKeyID,
284                 "LastUpdated": cred.LastUpdated,
285                 "Expiration":  cred.Expiration,
286                 "TTL":         arvados.Duration(ttl),
287         }).Debug("updated credentials")
288         return ttl, nil
289 }
290
291 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
292         ready := make(chan bool)
293         go func() {
294                 rdr, err = v.getReader(loc)
295                 close(ready)
296         }()
297         select {
298         case <-ready:
299                 return
300         case <-ctx.Done():
301                 v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
302                 go func() {
303                         <-ready
304                         if err == nil {
305                                 rdr.Close()
306                         }
307                 }()
308                 return nil, ctx.Err()
309         }
310 }
311
312 // getReader wraps (Bucket)GetReader.
313 //
314 // In situations where (Bucket)GetReader would fail because the block
315 // disappeared in a Trash race, getReader calls fixRace to recover the
316 // data, and tries again.
317 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
318         rdr, err = v.bucket.GetReader(loc)
319         err = v.translateError(err)
320         if err == nil || !os.IsNotExist(err) {
321                 return
322         }
323
324         _, err = v.bucket.Head("recent/"+loc, nil)
325         err = v.translateError(err)
326         if err != nil {
327                 // If we can't read recent/X, there's no point in
328                 // trying fixRace. Give up.
329                 return
330         }
331         if !v.fixRace(loc) {
332                 err = os.ErrNotExist
333                 return
334         }
335
336         rdr, err = v.bucket.GetReader(loc)
337         if err != nil {
338                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
339                 err = v.translateError(err)
340         }
341         return
342 }
343
344 // Get a block: copy the block data into buf, and return the number of
345 // bytes copied.
346 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
347         rdr, err := v.getReaderWithContext(ctx, loc)
348         if err != nil {
349                 return 0, err
350         }
351
352         var n int
353         ready := make(chan bool)
354         go func() {
355                 defer close(ready)
356
357                 defer rdr.Close()
358                 n, err = io.ReadFull(rdr, buf)
359
360                 switch err {
361                 case nil, io.EOF, io.ErrUnexpectedEOF:
362                         err = nil
363                 default:
364                         err = v.translateError(err)
365                 }
366         }()
367         select {
368         case <-ctx.Done():
369                 v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
370                 rdr.Close()
371                 // Must wait for ReadFull to return, to ensure it
372                 // doesn't write to buf after we return.
373                 v.logger.Debug("s3: waiting for ReadFull() to fail")
374                 <-ready
375                 return 0, ctx.Err()
376         case <-ready:
377                 return n, err
378         }
379 }
380
381 // Compare the given data with the stored data.
382 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
383         errChan := make(chan error, 1)
384         go func() {
385                 _, err := v.bucket.Head("recent/"+loc, nil)
386                 errChan <- err
387         }()
388         var err error
389         select {
390         case <-ctx.Done():
391                 return ctx.Err()
392         case err = <-errChan:
393         }
394         if err != nil {
395                 // Checking for "loc" itself here would interfere with
396                 // future GET requests.
397                 //
398                 // On AWS, if X doesn't exist, a HEAD or GET request
399                 // for X causes X's non-existence to be cached. Thus,
400                 // if we test for X, then create X and return a
401                 // signature to our client, the client might still get
402                 // 404 from all keepstores when trying to read it.
403                 //
404                 // To avoid this, we avoid doing HEAD X or GET X until
405                 // we know X has been written.
406                 //
407                 // Note that X might exist even though recent/X
408                 // doesn't: for example, the response to HEAD recent/X
409                 // might itself come from a stale cache. In such
410                 // cases, we will return a false negative and
411                 // PutHandler might needlessly create another replica
412                 // on a different volume. That's not ideal, but it's
413                 // better than passing the eventually-consistent
414                 // problem on to our clients.
415                 return v.translateError(err)
416         }
417         rdr, err := v.getReaderWithContext(ctx, loc)
418         if err != nil {
419                 return err
420         }
421         defer rdr.Close()
422         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
423 }
424
425 // Put writes a block.
426 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
427         if v.volume.ReadOnly {
428                 return MethodDisabledError
429         }
430         var opts s3.Options
431         size := len(block)
432         if size > 0 {
433                 md5, err := hex.DecodeString(loc)
434                 if err != nil {
435                         return err
436                 }
437                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
438                 // In AWS regions that use V4 signatures, we need to
439                 // provide ContentSHA256 up front. Otherwise, the S3
440                 // library reads the request body (from our buffer)
441                 // into another new buffer in order to compute the
442                 // SHA256 before sending the request -- which would
443                 // mean consuming 128 MiB of memory for the duration
444                 // of a 64 MiB write.
445                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
446         }
447
448         // Send the block data through a pipe, so that (if we need to)
449         // we can close the pipe early and abandon our PutReader()
450         // goroutine, without worrying about PutReader() accessing our
451         // block buffer after we release it.
452         bufr, bufw := io.Pipe()
453         go func() {
454                 io.Copy(bufw, bytes.NewReader(block))
455                 bufw.Close()
456         }()
457
458         var err error
459         ready := make(chan bool)
460         go func() {
461                 defer func() {
462                         if ctx.Err() != nil {
463                                 v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
464                         }
465                 }()
466                 defer close(ready)
467                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
468                 if err != nil {
469                         return
470                 }
471                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
472         }()
473         select {
474         case <-ctx.Done():
475                 v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
476                 // Our pipe might be stuck in Write(), waiting for
477                 // PutReader() to read. If so, un-stick it. This means
478                 // PutReader will get corrupt data, but that's OK: the
479                 // size and MD5 won't match, so the write will fail.
480                 go io.Copy(ioutil.Discard, bufr)
481                 // CloseWithError() will return once pending I/O is done.
482                 bufw.CloseWithError(ctx.Err())
483                 v.logger.Debugf("%s: abandoning PutReader goroutine", v)
484                 return ctx.Err()
485         case <-ready:
486                 // Unblock pipe in case PutReader did not consume it.
487                 io.Copy(ioutil.Discard, bufr)
488                 return v.translateError(err)
489         }
490 }
491
492 // Touch sets the timestamp for the given locator to the current time.
493 func (v *S3Volume) Touch(loc string) error {
494         if v.volume.ReadOnly {
495                 return MethodDisabledError
496         }
497         _, err := v.bucket.Head(loc, nil)
498         err = v.translateError(err)
499         if os.IsNotExist(err) && v.fixRace(loc) {
500                 // The data object got trashed in a race, but fixRace
501                 // rescued it.
502         } else if err != nil {
503                 return err
504         }
505         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
506         return v.translateError(err)
507 }
508
509 // Mtime returns the stored timestamp for the given locator.
510 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
511         _, err := v.bucket.Head(loc, nil)
512         if err != nil {
513                 return zeroTime, v.translateError(err)
514         }
515         resp, err := v.bucket.Head("recent/"+loc, nil)
516         err = v.translateError(err)
517         if os.IsNotExist(err) {
518                 // The data object X exists, but recent/X is missing.
519                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
520                 if err != nil {
521                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
522                         return zeroTime, v.translateError(err)
523                 }
524                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
525                 resp, err = v.bucket.Head("recent/"+loc, nil)
526                 if err != nil {
527                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
528                         return zeroTime, v.translateError(err)
529                 }
530         } else if err != nil {
531                 // HEAD recent/X failed for some other reason.
532                 return zeroTime, err
533         }
534         return v.lastModified(resp)
535 }
536
537 // IndexTo writes a complete list of locators with the given prefix
538 // for which Get() can retrieve data.
539 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
540         // Use a merge sort to find matching sets of X and recent/X.
541         dataL := s3Lister{
542                 Bucket:   v.bucket.Bucket(),
543                 Prefix:   prefix,
544                 PageSize: v.IndexPageSize,
545                 Stats:    &v.bucket.stats,
546         }
547         recentL := s3Lister{
548                 Bucket:   v.bucket.Bucket(),
549                 Prefix:   "recent/" + prefix,
550                 PageSize: v.IndexPageSize,
551                 Stats:    &v.bucket.stats,
552         }
553         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
554                 if data.Key >= "g" {
555                         // Conveniently, "recent/*" and "trash/*" are
556                         // lexically greater than all hex-encoded data
557                         // hashes, so stopping here avoids iterating
558                         // over all of them needlessly with dataL.
559                         break
560                 }
561                 if !v.isKeepBlock(data.Key) {
562                         continue
563                 }
564
565                 // stamp is the list entry we should use to report the
566                 // last-modified time for this data block: it will be
567                 // the recent/X entry if one exists, otherwise the
568                 // entry for the data block itself.
569                 stamp := data
570
571                 // Advance to the corresponding recent/X marker, if any
572                 for recent != nil && recentL.Error() == nil {
573                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
574                                 recent = recentL.Next()
575                                 continue
576                         } else if cmp == 0 {
577                                 stamp = recent
578                                 recent = recentL.Next()
579                                 break
580                         } else {
581                                 // recent/X marker is missing: we'll
582                                 // use the timestamp on the data
583                                 // object.
584                                 break
585                         }
586                 }
587                 if err := recentL.Error(); err != nil {
588                         return err
589                 }
590                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
591                 if err != nil {
592                         return err
593                 }
594                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
595         }
596         return dataL.Error()
597 }
598
599 // Trash a Keep block.
600 func (v *S3Volume) Trash(loc string) error {
601         if v.volume.ReadOnly {
602                 return MethodDisabledError
603         }
604         if t, err := v.Mtime(loc); err != nil {
605                 return err
606         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
607                 return nil
608         }
609         if v.cluster.Collections.BlobTrashLifetime == 0 {
610                 if !v.UnsafeDelete {
611                         return ErrS3TrashDisabled
612                 }
613                 return v.translateError(v.bucket.Del(loc))
614         }
615         err := v.checkRaceWindow(loc)
616         if err != nil {
617                 return err
618         }
619         err = v.safeCopy("trash/"+loc, loc)
620         if err != nil {
621                 return err
622         }
623         return v.translateError(v.bucket.Del(loc))
624 }
625
626 // checkRaceWindow returns a non-nil error if trash/loc is, or might
627 // be, in the race window (i.e., it's not safe to trash loc).
628 func (v *S3Volume) checkRaceWindow(loc string) error {
629         resp, err := v.bucket.Head("trash/"+loc, nil)
630         err = v.translateError(err)
631         if os.IsNotExist(err) {
632                 // OK, trash/X doesn't exist so we're not in the race
633                 // window
634                 return nil
635         } else if err != nil {
636                 // Error looking up trash/X. We don't know whether
637                 // we're in the race window
638                 return err
639         }
640         t, err := v.lastModified(resp)
641         if err != nil {
642                 // Can't parse timestamp
643                 return err
644         }
645         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
646         if safeWindow <= 0 {
647                 // We can't count on "touch trash/X" to prolong
648                 // trash/X's lifetime. The new timestamp might not
649                 // become visible until now+raceWindow, and EmptyTrash
650                 // is allowed to delete trash/X before then.
651                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
652         }
653         // trash/X exists, but it won't be eligible for deletion until
654         // after now+raceWindow, so it's safe to overwrite it.
655         return nil
656 }
657
658 // safeCopy calls PutCopy, and checks the response to make sure the
659 // copy succeeded and updated the timestamp on the destination object
660 // (PutCopy returns 200 OK if the request was received, even if the
661 // copy failed).
662 func (v *S3Volume) safeCopy(dst, src string) error {
663         resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{
664                 ContentType:       "application/octet-stream",
665                 MetadataDirective: "REPLACE",
666         }, v.bucket.Bucket().Name+"/"+src)
667         err = v.translateError(err)
668         if os.IsNotExist(err) {
669                 return err
670         } else if err != nil {
671                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err)
672         }
673         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
674                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
675         } else if time.Now().Sub(t) > maxClockSkew {
676                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
677         }
678         return nil
679 }
680
681 // Get the LastModified header from resp, and parse it as RFC1123 or
682 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
683 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
684         s := resp.Header.Get("Last-Modified")
685         t, err = time.Parse(time.RFC1123, s)
686         if err != nil && s != "" {
687                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
688                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
689                 // as required by HTTP spec. If it's not a valid HTTP
690                 // header value, it's probably AWS (or s3test) giving
691                 // us a nearly-RFC1123 timestamp.
692                 t, err = time.Parse(nearlyRFC1123, s)
693         }
694         return
695 }
696
697 // Untrash moves block from trash back into store
698 func (v *S3Volume) Untrash(loc string) error {
699         err := v.safeCopy(loc, "trash/"+loc)
700         if err != nil {
701                 return err
702         }
703         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
704         return v.translateError(err)
705 }
706
707 // Status returns a *VolumeStatus representing the current in-use
708 // storage capacity and a fake available capacity that doesn't make
709 // the volume seem full or nearly-full.
710 func (v *S3Volume) Status() *VolumeStatus {
711         return &VolumeStatus{
712                 DeviceNum: 1,
713                 BytesFree: BlockSize * 1000,
714                 BytesUsed: 1,
715         }
716 }
717
718 // InternalStats returns bucket I/O and API call counters.
719 func (v *S3Volume) InternalStats() interface{} {
720         return &v.bucket.stats
721 }
722
723 // String implements fmt.Stringer.
724 func (v *S3Volume) String() string {
725         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
726 }
727
728 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
729
730 func (v *S3Volume) isKeepBlock(s string) bool {
731         return s3KeepBlockRegexp.MatchString(s)
732 }
733
734 // fixRace(X) is called when "recent/X" exists but "X" doesn't
735 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
736 // there was a race between Put and Trash, fixRace recovers from the
737 // race by Untrashing the block.
738 func (v *S3Volume) fixRace(loc string) bool {
739         trash, err := v.bucket.Head("trash/"+loc, nil)
740         if err != nil {
741                 if !os.IsNotExist(v.translateError(err)) {
742                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
743                 }
744                 return false
745         }
746         trashTime, err := v.lastModified(trash)
747         if err != nil {
748                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
749                 return false
750         }
751
752         recent, err := v.bucket.Head("recent/"+loc, nil)
753         if err != nil {
754                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
755                 return false
756         }
757         recentTime, err := v.lastModified(recent)
758         if err != nil {
759                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
760                 return false
761         }
762
763         ageWhenTrashed := trashTime.Sub(recentTime)
764         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
765                 // No evidence of a race: block hasn't been written
766                 // since it became eligible for Trash. No fix needed.
767                 return false
768         }
769
770         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
771         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
772         err = v.safeCopy(loc, "trash/"+loc)
773         if err != nil {
774                 log.Printf("error: fixRace: %s", err)
775                 return false
776         }
777         return true
778 }
779
780 func (v *S3Volume) translateError(err error) error {
781         switch err := err.(type) {
782         case *s3.Error:
783                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
784                         strings.Contains(err.Error(), "Not Found") {
785                         return os.ErrNotExist
786                 }
787                 // Other 404 errors like NoSuchVersion and
788                 // NoSuchBucket are different problems which should
789                 // get called out downstream, so we don't convert them
790                 // to os.ErrNotExist.
791         }
792         return err
793 }
794
795 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
796 // and deletes them from the volume.
797 func (v *S3Volume) EmptyTrash() {
798         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
799                 return
800         }
801
802         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
803
804         // Define "ready to delete" as "...when EmptyTrash started".
805         startT := time.Now()
806
807         emptyOneKey := func(trash *s3.Key) {
808                 loc := trash.Key[6:]
809                 if !v.isKeepBlock(loc) {
810                         return
811                 }
812                 atomic.AddInt64(&bytesInTrash, trash.Size)
813                 atomic.AddInt64(&blocksInTrash, 1)
814
815                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
816                 if err != nil {
817                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
818                         return
819                 }
820                 recent, err := v.bucket.Head("recent/"+loc, nil)
821                 if err != nil && os.IsNotExist(v.translateError(err)) {
822                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
823                         err = v.Untrash(loc)
824                         if err != nil {
825                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
826                         }
827                         return
828                 } else if err != nil {
829                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
830                         return
831                 }
832                 recentT, err := v.lastModified(recent)
833                 if err != nil {
834                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
835                         return
836                 }
837                 if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
838                         if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
839                                 // recent/loc is too old to protect
840                                 // loc from being Trashed again during
841                                 // the raceWindow that starts if we
842                                 // delete trash/X now.
843                                 //
844                                 // Note this means (TrashSweepInterval
845                                 // < BlobSigningTTL - raceWindow) is
846                                 // necessary to avoid starvation.
847                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
848                                 v.fixRace(loc)
849                                 v.Touch(loc)
850                                 return
851                         }
852                         _, err := v.bucket.Head(loc, nil)
853                         if os.IsNotExist(err) {
854                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
855                                 v.fixRace(loc)
856                                 return
857                         } else if err != nil {
858                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
859                                 return
860                         }
861                 }
862                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
863                         return
864                 }
865                 err = v.bucket.Del(trash.Key)
866                 if err != nil {
867                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
868                         return
869                 }
870                 atomic.AddInt64(&bytesDeleted, trash.Size)
871                 atomic.AddInt64(&blocksDeleted, 1)
872
873                 _, err = v.bucket.Head(loc, nil)
874                 if err == nil {
875                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
876                         return
877                 }
878                 if !os.IsNotExist(v.translateError(err)) {
879                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
880                         return
881                 }
882                 err = v.bucket.Del("recent/" + loc)
883                 if err != nil {
884                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
885                 }
886         }
887
888         var wg sync.WaitGroup
889         todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency)
890         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
891                 wg.Add(1)
892                 go func() {
893                         defer wg.Done()
894                         for key := range todo {
895                                 emptyOneKey(key)
896                         }
897                 }()
898         }
899
900         trashL := s3Lister{
901                 Bucket:   v.bucket.Bucket(),
902                 Prefix:   "trash/",
903                 PageSize: v.IndexPageSize,
904                 Stats:    &v.bucket.stats,
905         }
906         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
907                 todo <- trash
908         }
909         close(todo)
910         wg.Wait()
911
912         if err := trashL.Error(); err != nil {
913                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
914         }
915         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
916 }
917
918 type s3Lister struct {
919         Bucket     *s3.Bucket
920         Prefix     string
921         PageSize   int
922         Stats      *s3bucketStats
923         nextMarker string
924         buf        []s3.Key
925         err        error
926 }
927
928 // First fetches the first page and returns the first item. It returns
929 // nil if the response is the empty set or an error occurs.
930 func (lister *s3Lister) First() *s3.Key {
931         lister.getPage()
932         return lister.pop()
933 }
934
935 // Next returns the next item, fetching the next page if necessary. It
936 // returns nil if the last available item has already been fetched, or
937 // an error occurs.
938 func (lister *s3Lister) Next() *s3.Key {
939         if len(lister.buf) == 0 && lister.nextMarker != "" {
940                 lister.getPage()
941         }
942         return lister.pop()
943 }
944
945 // Return the most recent error encountered by First or Next.
946 func (lister *s3Lister) Error() error {
947         return lister.err
948 }
949
950 func (lister *s3Lister) getPage() {
951         lister.Stats.TickOps("list")
952         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
953         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
954         lister.nextMarker = ""
955         if err != nil {
956                 lister.err = err
957                 return
958         }
959         if resp.IsTruncated {
960                 lister.nextMarker = resp.NextMarker
961         }
962         lister.buf = make([]s3.Key, 0, len(resp.Contents))
963         for _, key := range resp.Contents {
964                 if !strings.HasPrefix(key.Key, lister.Prefix) {
965                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
966                         continue
967                 }
968                 lister.buf = append(lister.buf, key)
969         }
970 }
971
972 func (lister *s3Lister) pop() (k *s3.Key) {
973         if len(lister.buf) > 0 {
974                 k = &lister.buf[0]
975                 lister.buf = lister.buf[1:]
976         }
977         return
978 }
979
980 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
981 // wrapped bucket can be replaced atomically with SetBucket in order
982 // to update credentials.
983 type s3bucket struct {
984         bucket *s3.Bucket
985         stats  s3bucketStats
986         mu     sync.Mutex
987 }
988
989 func (b *s3bucket) Bucket() *s3.Bucket {
990         b.mu.Lock()
991         defer b.mu.Unlock()
992         return b.bucket
993 }
994
995 func (b *s3bucket) SetBucket(bucket *s3.Bucket) {
996         b.mu.Lock()
997         defer b.mu.Unlock()
998         b.bucket = bucket
999 }
1000
1001 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
1002         rdr, err := b.Bucket().GetReader(path)
1003         b.stats.TickOps("get")
1004         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
1005         b.stats.TickErr(err)
1006         return NewCountingReader(rdr, b.stats.TickInBytes), err
1007 }
1008
1009 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
1010         resp, err := b.Bucket().Head(path, headers)
1011         b.stats.TickOps("head")
1012         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
1013         b.stats.TickErr(err)
1014         return resp, err
1015 }
1016
1017 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
1018         if length == 0 {
1019                 // goamz will only send Content-Length: 0 when reader
1020                 // is nil due to net.http.Request.ContentLength
1021                 // behavior.  Otherwise, Content-Length header is
1022                 // omitted which will cause some S3 services
1023                 // (including AWS and Ceph RadosGW) to fail to create
1024                 // empty objects.
1025                 r = nil
1026         } else {
1027                 r = NewCountingReader(r, b.stats.TickOutBytes)
1028         }
1029         err := b.Bucket().PutReader(path, r, length, contType, perm, options)
1030         b.stats.TickOps("put")
1031         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
1032         b.stats.TickErr(err)
1033         return err
1034 }
1035
1036 func (b *s3bucket) Del(path string) error {
1037         err := b.Bucket().Del(path)
1038         b.stats.TickOps("delete")
1039         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1040         b.stats.TickErr(err)
1041         return err
1042 }
1043
1044 type s3bucketStats struct {
1045         statsTicker
1046         Ops     uint64
1047         GetOps  uint64
1048         PutOps  uint64
1049         HeadOps uint64
1050         DelOps  uint64
1051         ListOps uint64
1052 }
1053
1054 func (s *s3bucketStats) TickErr(err error) {
1055         if err == nil {
1056                 return
1057         }
1058         errType := fmt.Sprintf("%T", err)
1059         if err, ok := err.(*s3.Error); ok {
1060                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1061         }
1062         s.statsTicker.TickErr(err, errType)
1063 }