15599: Look up IAM role name from metadata if not configured.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/sha256"
11         "encoding/base64"
12         "encoding/hex"
13         "encoding/json"
14         "errors"
15         "fmt"
16         "io"
17         "io/ioutil"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "time"
26
27         "git.curoverse.com/arvados.git/sdk/go/arvados"
28         "github.com/AdRoll/goamz/aws"
29         "github.com/AdRoll/goamz/s3"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32 )
33
34 func init() {
35         driver["S3"] = newS3Volume
36 }
37
38 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
39         v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
40         err := json.Unmarshal(volume.DriverParameters, &v)
41         if err != nil {
42                 return nil, err
43         }
44         return v, v.check()
45 }
46
47 func (v *S3Volume) check() error {
48         if v.Bucket == "" {
49                 return errors.New("DriverParameters: Bucket must be provided")
50         }
51         if v.IndexPageSize == 0 {
52                 v.IndexPageSize = 1000
53         }
54         if v.RaceWindow < 0 {
55                 return errors.New("DriverParameters: RaceWindow must not be negative")
56         }
57
58         var ok bool
59         v.region, ok = aws.Regions[v.Region]
60         if v.Endpoint == "" {
61                 if !ok {
62                         return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
63                 }
64         } else if ok {
65                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
66                         "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
67         } else {
68                 v.region = aws.Region{
69                         Name:                 v.Region,
70                         S3Endpoint:           v.Endpoint,
71                         S3LocationConstraint: v.LocationConstraint,
72                 }
73         }
74
75         // Zero timeouts mean "wait forever", which is a bad
76         // default. Default to long timeouts instead.
77         if v.ConnectTimeout == 0 {
78                 v.ConnectTimeout = s3DefaultConnectTimeout
79         }
80         if v.ReadTimeout == 0 {
81                 v.ReadTimeout = s3DefaultReadTimeout
82         }
83
84         v.bucket = &s3bucket{
85                 bucket: &s3.Bucket{
86                         S3:   v.newS3Client(),
87                         Name: v.Bucket,
88                 },
89         }
90         // Set up prometheus metrics
91         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
92         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
93
94         err := v.bootstrapIAMCredentials()
95         if err != nil {
96                 return fmt.Errorf("error getting IAM credentials: %s", err)
97         }
98
99         return nil
100 }
101
102 const (
103         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
104         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
105 )
106
107 var (
108         // ErrS3TrashDisabled is returned by Trash if that operation
109         // is impossible with the current config.
110         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
111
112         s3ACL = s3.Private
113
114         zeroTime time.Time
115 )
116
117 const (
118         maxClockSkew  = 600 * time.Second
119         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
120 )
121
122 func s3regions() (okList []string) {
123         for r := range aws.Regions {
124                 okList = append(okList, r)
125         }
126         return
127 }
128
129 // S3Volume implements Volume using an S3 bucket.
130 type S3Volume struct {
131         AccessKey          string
132         SecretKey          string
133         AuthToken          string    // populated automatically when IAMRole is used
134         AuthExpiration     time.Time // populated automatically when IAMRole is used
135         IAMRole            string
136         Endpoint           string
137         Region             string
138         Bucket             string
139         LocationConstraint bool
140         IndexPageSize      int
141         ConnectTimeout     arvados.Duration
142         ReadTimeout        arvados.Duration
143         RaceWindow         arvados.Duration
144         UnsafeDelete       bool
145
146         cluster   *arvados.Cluster
147         volume    arvados.Volume
148         logger    logrus.FieldLogger
149         metrics   *volumeMetricsVecs
150         bucket    *s3bucket
151         region    aws.Region
152         startOnce sync.Once
153 }
154
155 // GetDeviceID returns a globally unique ID for the storage bucket.
156 func (v *S3Volume) GetDeviceID() string {
157         return "s3://" + v.Endpoint + "/" + v.Bucket
158 }
159
160 func (v *S3Volume) bootstrapIAMCredentials() error {
161         if v.AccessKey != "" || v.SecretKey != "" {
162                 return nil
163         }
164         ttl, err := v.updateIAMCredentials()
165         if err != nil {
166                 return err
167         }
168         go func() {
169                 for {
170                         time.Sleep(ttl)
171                         ttl, err = v.updateIAMCredentials()
172                         if err != nil {
173                                 v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole)
174                                 ttl = time.Second
175                         } else if ttl < time.Second {
176                                 v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole)
177                                 ttl = time.Second
178                         }
179                 }
180         }()
181         return nil
182 }
183
184 func (v *S3Volume) newS3Client() *s3.S3 {
185         auth := aws.NewAuth(v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration)
186         client := s3.New(*auth, v.region)
187         if v.region.EC2Endpoint.Signer == aws.V4Signature {
188                 // Currently affects only eu-central-1
189                 client.Signature = aws.V4Signature
190         }
191         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
192         client.ReadTimeout = time.Duration(v.ReadTimeout)
193         return client
194 }
195
196 // returned by AWS metadata endpoint .../security-credentials/${rolename}
197 type iamCredentials struct {
198         Code            string
199         LastUpdated     time.Time
200         Type            string
201         AccessKeyID     string
202         SecretAccessKey string
203         Token           string
204         Expiration      time.Time
205 }
206
207 // Returns TTL of updated credentials, i.e., time to sleep until next
208 // update.
209 func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
210         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
211         defer cancel()
212
213         metadataBaseURL := "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
214
215         var url string
216         if strings.Contains(v.IAMRole, "://") {
217                 // Configuration provides complete URL (used by tests)
218                 url = v.IAMRole
219         } else if v.IAMRole != "" {
220                 // Configuration provides IAM role name and we use the
221                 // AWS metadata endpoint
222                 url = metadataBaseURL + v.IAMRole
223         } else {
224                 url = metadataBaseURL
225                 v.logger.WithField("URL", url).Debug("looking up IAM role name")
226                 req, err := http.NewRequest("GET", url, nil)
227                 if err != nil {
228                         return 0, fmt.Errorf("error setting up request %s: %s", url, err)
229                 }
230                 resp, err := http.DefaultClient.Do(req.WithContext(ctx))
231                 if err != nil {
232                         return 0, fmt.Errorf("error getting %s: %s", url, err)
233                 }
234                 defer resp.Body.Close()
235                 if resp.StatusCode != http.StatusOK {
236                         return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
237                 }
238                 var role string
239                 _, err = fmt.Fscanf(resp.Body, "%s\n", &role)
240                 if err != nil {
241                         return 0, fmt.Errorf("error reading response from %s: %s", url, err)
242                 }
243                 v.logger.WithField("Role", role).Debug("looked up IAM role name")
244                 url = url + role
245         }
246
247         v.logger.WithField("URL", url).Debug("getting credentials")
248         req, err := http.NewRequest("GET", url, nil)
249         if err != nil {
250                 return 0, fmt.Errorf("error setting up request %s: %s", url, err)
251         }
252         resp, err := http.DefaultClient.Do(req.WithContext(ctx))
253         if err != nil {
254                 return 0, fmt.Errorf("error getting %s: %s", url, err)
255         }
256         defer resp.Body.Close()
257         if resp.StatusCode != http.StatusOK {
258                 return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
259         }
260         var cred iamCredentials
261         err = json.NewDecoder(resp.Body).Decode(&cred)
262         if err != nil {
263                 return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
264         }
265         v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
266         v.bucket.SetBucket(&s3.Bucket{
267                 S3:   v.newS3Client(),
268                 Name: v.Bucket,
269         })
270         // TTL is time from now to expiration, minus 5m.  "We make new
271         // credentials available at least five minutes before the
272         // expiration of the old credentials."  --
273         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
274         // (If that's not true, the returned ttl might be zero or
275         // negative, which the caller can handle.)
276         ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute
277         v.logger.WithFields(logrus.Fields{
278                 "AccessKeyID": cred.AccessKeyID,
279                 "LastUpdated": cred.LastUpdated,
280                 "Expiration":  cred.Expiration,
281                 "TTL":         arvados.Duration(ttl),
282         }).Debug("updated credentials")
283         return ttl, nil
284 }
285
286 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
287         ready := make(chan bool)
288         go func() {
289                 rdr, err = v.getReader(loc)
290                 close(ready)
291         }()
292         select {
293         case <-ready:
294                 return
295         case <-ctx.Done():
296                 v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
297                 go func() {
298                         <-ready
299                         if err == nil {
300                                 rdr.Close()
301                         }
302                 }()
303                 return nil, ctx.Err()
304         }
305 }
306
307 // getReader wraps (Bucket)GetReader.
308 //
309 // In situations where (Bucket)GetReader would fail because the block
310 // disappeared in a Trash race, getReader calls fixRace to recover the
311 // data, and tries again.
312 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
313         rdr, err = v.bucket.GetReader(loc)
314         err = v.translateError(err)
315         if err == nil || !os.IsNotExist(err) {
316                 return
317         }
318
319         _, err = v.bucket.Head("recent/"+loc, nil)
320         err = v.translateError(err)
321         if err != nil {
322                 // If we can't read recent/X, there's no point in
323                 // trying fixRace. Give up.
324                 return
325         }
326         if !v.fixRace(loc) {
327                 err = os.ErrNotExist
328                 return
329         }
330
331         rdr, err = v.bucket.GetReader(loc)
332         if err != nil {
333                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
334                 err = v.translateError(err)
335         }
336         return
337 }
338
339 // Get a block: copy the block data into buf, and return the number of
340 // bytes copied.
341 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
342         rdr, err := v.getReaderWithContext(ctx, loc)
343         if err != nil {
344                 return 0, err
345         }
346
347         var n int
348         ready := make(chan bool)
349         go func() {
350                 defer close(ready)
351
352                 defer rdr.Close()
353                 n, err = io.ReadFull(rdr, buf)
354
355                 switch err {
356                 case nil, io.EOF, io.ErrUnexpectedEOF:
357                         err = nil
358                 default:
359                         err = v.translateError(err)
360                 }
361         }()
362         select {
363         case <-ctx.Done():
364                 v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
365                 rdr.Close()
366                 // Must wait for ReadFull to return, to ensure it
367                 // doesn't write to buf after we return.
368                 v.logger.Debug("s3: waiting for ReadFull() to fail")
369                 <-ready
370                 return 0, ctx.Err()
371         case <-ready:
372                 return n, err
373         }
374 }
375
376 // Compare the given data with the stored data.
377 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
378         errChan := make(chan error, 1)
379         go func() {
380                 _, err := v.bucket.Head("recent/"+loc, nil)
381                 errChan <- err
382         }()
383         var err error
384         select {
385         case <-ctx.Done():
386                 return ctx.Err()
387         case err = <-errChan:
388         }
389         if err != nil {
390                 // Checking for "loc" itself here would interfere with
391                 // future GET requests.
392                 //
393                 // On AWS, if X doesn't exist, a HEAD or GET request
394                 // for X causes X's non-existence to be cached. Thus,
395                 // if we test for X, then create X and return a
396                 // signature to our client, the client might still get
397                 // 404 from all keepstores when trying to read it.
398                 //
399                 // To avoid this, we avoid doing HEAD X or GET X until
400                 // we know X has been written.
401                 //
402                 // Note that X might exist even though recent/X
403                 // doesn't: for example, the response to HEAD recent/X
404                 // might itself come from a stale cache. In such
405                 // cases, we will return a false negative and
406                 // PutHandler might needlessly create another replica
407                 // on a different volume. That's not ideal, but it's
408                 // better than passing the eventually-consistent
409                 // problem on to our clients.
410                 return v.translateError(err)
411         }
412         rdr, err := v.getReaderWithContext(ctx, loc)
413         if err != nil {
414                 return err
415         }
416         defer rdr.Close()
417         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
418 }
419
420 // Put writes a block.
421 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
422         if v.volume.ReadOnly {
423                 return MethodDisabledError
424         }
425         var opts s3.Options
426         size := len(block)
427         if size > 0 {
428                 md5, err := hex.DecodeString(loc)
429                 if err != nil {
430                         return err
431                 }
432                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
433                 // In AWS regions that use V4 signatures, we need to
434                 // provide ContentSHA256 up front. Otherwise, the S3
435                 // library reads the request body (from our buffer)
436                 // into another new buffer in order to compute the
437                 // SHA256 before sending the request -- which would
438                 // mean consuming 128 MiB of memory for the duration
439                 // of a 64 MiB write.
440                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
441         }
442
443         // Send the block data through a pipe, so that (if we need to)
444         // we can close the pipe early and abandon our PutReader()
445         // goroutine, without worrying about PutReader() accessing our
446         // block buffer after we release it.
447         bufr, bufw := io.Pipe()
448         go func() {
449                 io.Copy(bufw, bytes.NewReader(block))
450                 bufw.Close()
451         }()
452
453         var err error
454         ready := make(chan bool)
455         go func() {
456                 defer func() {
457                         if ctx.Err() != nil {
458                                 v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
459                         }
460                 }()
461                 defer close(ready)
462                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
463                 if err != nil {
464                         return
465                 }
466                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
467         }()
468         select {
469         case <-ctx.Done():
470                 v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
471                 // Our pipe might be stuck in Write(), waiting for
472                 // PutReader() to read. If so, un-stick it. This means
473                 // PutReader will get corrupt data, but that's OK: the
474                 // size and MD5 won't match, so the write will fail.
475                 go io.Copy(ioutil.Discard, bufr)
476                 // CloseWithError() will return once pending I/O is done.
477                 bufw.CloseWithError(ctx.Err())
478                 v.logger.Debugf("%s: abandoning PutReader goroutine", v)
479                 return ctx.Err()
480         case <-ready:
481                 // Unblock pipe in case PutReader did not consume it.
482                 io.Copy(ioutil.Discard, bufr)
483                 return v.translateError(err)
484         }
485 }
486
487 // Touch sets the timestamp for the given locator to the current time.
488 func (v *S3Volume) Touch(loc string) error {
489         if v.volume.ReadOnly {
490                 return MethodDisabledError
491         }
492         _, err := v.bucket.Head(loc, nil)
493         err = v.translateError(err)
494         if os.IsNotExist(err) && v.fixRace(loc) {
495                 // The data object got trashed in a race, but fixRace
496                 // rescued it.
497         } else if err != nil {
498                 return err
499         }
500         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
501         return v.translateError(err)
502 }
503
504 // Mtime returns the stored timestamp for the given locator.
505 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
506         _, err := v.bucket.Head(loc, nil)
507         if err != nil {
508                 return zeroTime, v.translateError(err)
509         }
510         resp, err := v.bucket.Head("recent/"+loc, nil)
511         err = v.translateError(err)
512         if os.IsNotExist(err) {
513                 // The data object X exists, but recent/X is missing.
514                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
515                 if err != nil {
516                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
517                         return zeroTime, v.translateError(err)
518                 }
519                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
520                 resp, err = v.bucket.Head("recent/"+loc, nil)
521                 if err != nil {
522                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
523                         return zeroTime, v.translateError(err)
524                 }
525         } else if err != nil {
526                 // HEAD recent/X failed for some other reason.
527                 return zeroTime, err
528         }
529         return v.lastModified(resp)
530 }
531
532 // IndexTo writes a complete list of locators with the given prefix
533 // for which Get() can retrieve data.
534 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
535         // Use a merge sort to find matching sets of X and recent/X.
536         dataL := s3Lister{
537                 Bucket:   v.bucket.Bucket(),
538                 Prefix:   prefix,
539                 PageSize: v.IndexPageSize,
540                 Stats:    &v.bucket.stats,
541         }
542         recentL := s3Lister{
543                 Bucket:   v.bucket.Bucket(),
544                 Prefix:   "recent/" + prefix,
545                 PageSize: v.IndexPageSize,
546                 Stats:    &v.bucket.stats,
547         }
548         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
549                 if data.Key >= "g" {
550                         // Conveniently, "recent/*" and "trash/*" are
551                         // lexically greater than all hex-encoded data
552                         // hashes, so stopping here avoids iterating
553                         // over all of them needlessly with dataL.
554                         break
555                 }
556                 if !v.isKeepBlock(data.Key) {
557                         continue
558                 }
559
560                 // stamp is the list entry we should use to report the
561                 // last-modified time for this data block: it will be
562                 // the recent/X entry if one exists, otherwise the
563                 // entry for the data block itself.
564                 stamp := data
565
566                 // Advance to the corresponding recent/X marker, if any
567                 for recent != nil && recentL.Error() == nil {
568                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
569                                 recent = recentL.Next()
570                                 continue
571                         } else if cmp == 0 {
572                                 stamp = recent
573                                 recent = recentL.Next()
574                                 break
575                         } else {
576                                 // recent/X marker is missing: we'll
577                                 // use the timestamp on the data
578                                 // object.
579                                 break
580                         }
581                 }
582                 if err := recentL.Error(); err != nil {
583                         return err
584                 }
585                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
586                 if err != nil {
587                         return err
588                 }
589                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
590         }
591         return dataL.Error()
592 }
593
594 // Trash a Keep block.
595 func (v *S3Volume) Trash(loc string) error {
596         if v.volume.ReadOnly {
597                 return MethodDisabledError
598         }
599         if t, err := v.Mtime(loc); err != nil {
600                 return err
601         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
602                 return nil
603         }
604         if v.cluster.Collections.BlobTrashLifetime == 0 {
605                 if !v.UnsafeDelete {
606                         return ErrS3TrashDisabled
607                 }
608                 return v.translateError(v.bucket.Del(loc))
609         }
610         err := v.checkRaceWindow(loc)
611         if err != nil {
612                 return err
613         }
614         err = v.safeCopy("trash/"+loc, loc)
615         if err != nil {
616                 return err
617         }
618         return v.translateError(v.bucket.Del(loc))
619 }
620
621 // checkRaceWindow returns a non-nil error if trash/loc is, or might
622 // be, in the race window (i.e., it's not safe to trash loc).
623 func (v *S3Volume) checkRaceWindow(loc string) error {
624         resp, err := v.bucket.Head("trash/"+loc, nil)
625         err = v.translateError(err)
626         if os.IsNotExist(err) {
627                 // OK, trash/X doesn't exist so we're not in the race
628                 // window
629                 return nil
630         } else if err != nil {
631                 // Error looking up trash/X. We don't know whether
632                 // we're in the race window
633                 return err
634         }
635         t, err := v.lastModified(resp)
636         if err != nil {
637                 // Can't parse timestamp
638                 return err
639         }
640         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
641         if safeWindow <= 0 {
642                 // We can't count on "touch trash/X" to prolong
643                 // trash/X's lifetime. The new timestamp might not
644                 // become visible until now+raceWindow, and EmptyTrash
645                 // is allowed to delete trash/X before then.
646                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
647         }
648         // trash/X exists, but it won't be eligible for deletion until
649         // after now+raceWindow, so it's safe to overwrite it.
650         return nil
651 }
652
653 // safeCopy calls PutCopy, and checks the response to make sure the
654 // copy succeeded and updated the timestamp on the destination object
655 // (PutCopy returns 200 OK if the request was received, even if the
656 // copy failed).
657 func (v *S3Volume) safeCopy(dst, src string) error {
658         resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{
659                 ContentType:       "application/octet-stream",
660                 MetadataDirective: "REPLACE",
661         }, v.bucket.Bucket().Name+"/"+src)
662         err = v.translateError(err)
663         if os.IsNotExist(err) {
664                 return err
665         } else if err != nil {
666                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err)
667         }
668         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
669                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
670         } else if time.Now().Sub(t) > maxClockSkew {
671                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
672         }
673         return nil
674 }
675
676 // Get the LastModified header from resp, and parse it as RFC1123 or
677 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
678 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
679         s := resp.Header.Get("Last-Modified")
680         t, err = time.Parse(time.RFC1123, s)
681         if err != nil && s != "" {
682                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
683                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
684                 // as required by HTTP spec. If it's not a valid HTTP
685                 // header value, it's probably AWS (or s3test) giving
686                 // us a nearly-RFC1123 timestamp.
687                 t, err = time.Parse(nearlyRFC1123, s)
688         }
689         return
690 }
691
692 // Untrash moves block from trash back into store
693 func (v *S3Volume) Untrash(loc string) error {
694         err := v.safeCopy(loc, "trash/"+loc)
695         if err != nil {
696                 return err
697         }
698         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
699         return v.translateError(err)
700 }
701
702 // Status returns a *VolumeStatus representing the current in-use
703 // storage capacity and a fake available capacity that doesn't make
704 // the volume seem full or nearly-full.
705 func (v *S3Volume) Status() *VolumeStatus {
706         return &VolumeStatus{
707                 DeviceNum: 1,
708                 BytesFree: BlockSize * 1000,
709                 BytesUsed: 1,
710         }
711 }
712
713 // InternalStats returns bucket I/O and API call counters.
714 func (v *S3Volume) InternalStats() interface{} {
715         return &v.bucket.stats
716 }
717
718 // String implements fmt.Stringer.
719 func (v *S3Volume) String() string {
720         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
721 }
722
723 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
724
725 func (v *S3Volume) isKeepBlock(s string) bool {
726         return s3KeepBlockRegexp.MatchString(s)
727 }
728
729 // fixRace(X) is called when "recent/X" exists but "X" doesn't
730 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
731 // there was a race between Put and Trash, fixRace recovers from the
732 // race by Untrashing the block.
733 func (v *S3Volume) fixRace(loc string) bool {
734         trash, err := v.bucket.Head("trash/"+loc, nil)
735         if err != nil {
736                 if !os.IsNotExist(v.translateError(err)) {
737                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
738                 }
739                 return false
740         }
741         trashTime, err := v.lastModified(trash)
742         if err != nil {
743                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
744                 return false
745         }
746
747         recent, err := v.bucket.Head("recent/"+loc, nil)
748         if err != nil {
749                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
750                 return false
751         }
752         recentTime, err := v.lastModified(recent)
753         if err != nil {
754                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
755                 return false
756         }
757
758         ageWhenTrashed := trashTime.Sub(recentTime)
759         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
760                 // No evidence of a race: block hasn't been written
761                 // since it became eligible for Trash. No fix needed.
762                 return false
763         }
764
765         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
766         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
767         err = v.safeCopy(loc, "trash/"+loc)
768         if err != nil {
769                 log.Printf("error: fixRace: %s", err)
770                 return false
771         }
772         return true
773 }
774
775 func (v *S3Volume) translateError(err error) error {
776         switch err := err.(type) {
777         case *s3.Error:
778                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
779                         strings.Contains(err.Error(), "Not Found") {
780                         return os.ErrNotExist
781                 }
782                 // Other 404 errors like NoSuchVersion and
783                 // NoSuchBucket are different problems which should
784                 // get called out downstream, so we don't convert them
785                 // to os.ErrNotExist.
786         }
787         return err
788 }
789
790 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
791 // and deletes them from the volume.
792 func (v *S3Volume) EmptyTrash() {
793         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
794                 return
795         }
796
797         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
798
799         // Define "ready to delete" as "...when EmptyTrash started".
800         startT := time.Now()
801
802         emptyOneKey := func(trash *s3.Key) {
803                 loc := trash.Key[6:]
804                 if !v.isKeepBlock(loc) {
805                         return
806                 }
807                 atomic.AddInt64(&bytesInTrash, trash.Size)
808                 atomic.AddInt64(&blocksInTrash, 1)
809
810                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
811                 if err != nil {
812                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
813                         return
814                 }
815                 recent, err := v.bucket.Head("recent/"+loc, nil)
816                 if err != nil && os.IsNotExist(v.translateError(err)) {
817                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
818                         err = v.Untrash(loc)
819                         if err != nil {
820                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
821                         }
822                         return
823                 } else if err != nil {
824                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
825                         return
826                 }
827                 recentT, err := v.lastModified(recent)
828                 if err != nil {
829                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
830                         return
831                 }
832                 if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
833                         if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
834                                 // recent/loc is too old to protect
835                                 // loc from being Trashed again during
836                                 // the raceWindow that starts if we
837                                 // delete trash/X now.
838                                 //
839                                 // Note this means (TrashSweepInterval
840                                 // < BlobSigningTTL - raceWindow) is
841                                 // necessary to avoid starvation.
842                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
843                                 v.fixRace(loc)
844                                 v.Touch(loc)
845                                 return
846                         }
847                         _, err := v.bucket.Head(loc, nil)
848                         if os.IsNotExist(err) {
849                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
850                                 v.fixRace(loc)
851                                 return
852                         } else if err != nil {
853                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
854                                 return
855                         }
856                 }
857                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
858                         return
859                 }
860                 err = v.bucket.Del(trash.Key)
861                 if err != nil {
862                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
863                         return
864                 }
865                 atomic.AddInt64(&bytesDeleted, trash.Size)
866                 atomic.AddInt64(&blocksDeleted, 1)
867
868                 _, err = v.bucket.Head(loc, nil)
869                 if err == nil {
870                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
871                         return
872                 }
873                 if !os.IsNotExist(v.translateError(err)) {
874                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
875                         return
876                 }
877                 err = v.bucket.Del("recent/" + loc)
878                 if err != nil {
879                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
880                 }
881         }
882
883         var wg sync.WaitGroup
884         todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency)
885         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
886                 wg.Add(1)
887                 go func() {
888                         defer wg.Done()
889                         for key := range todo {
890                                 emptyOneKey(key)
891                         }
892                 }()
893         }
894
895         trashL := s3Lister{
896                 Bucket:   v.bucket.Bucket(),
897                 Prefix:   "trash/",
898                 PageSize: v.IndexPageSize,
899                 Stats:    &v.bucket.stats,
900         }
901         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
902                 todo <- trash
903         }
904         close(todo)
905         wg.Wait()
906
907         if err := trashL.Error(); err != nil {
908                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
909         }
910         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
911 }
912
913 type s3Lister struct {
914         Bucket     *s3.Bucket
915         Prefix     string
916         PageSize   int
917         Stats      *s3bucketStats
918         nextMarker string
919         buf        []s3.Key
920         err        error
921 }
922
923 // First fetches the first page and returns the first item. It returns
924 // nil if the response is the empty set or an error occurs.
925 func (lister *s3Lister) First() *s3.Key {
926         lister.getPage()
927         return lister.pop()
928 }
929
930 // Next returns the next item, fetching the next page if necessary. It
931 // returns nil if the last available item has already been fetched, or
932 // an error occurs.
933 func (lister *s3Lister) Next() *s3.Key {
934         if len(lister.buf) == 0 && lister.nextMarker != "" {
935                 lister.getPage()
936         }
937         return lister.pop()
938 }
939
940 // Return the most recent error encountered by First or Next.
941 func (lister *s3Lister) Error() error {
942         return lister.err
943 }
944
945 func (lister *s3Lister) getPage() {
946         lister.Stats.TickOps("list")
947         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
948         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
949         lister.nextMarker = ""
950         if err != nil {
951                 lister.err = err
952                 return
953         }
954         if resp.IsTruncated {
955                 lister.nextMarker = resp.NextMarker
956         }
957         lister.buf = make([]s3.Key, 0, len(resp.Contents))
958         for _, key := range resp.Contents {
959                 if !strings.HasPrefix(key.Key, lister.Prefix) {
960                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
961                         continue
962                 }
963                 lister.buf = append(lister.buf, key)
964         }
965 }
966
967 func (lister *s3Lister) pop() (k *s3.Key) {
968         if len(lister.buf) > 0 {
969                 k = &lister.buf[0]
970                 lister.buf = lister.buf[1:]
971         }
972         return
973 }
974
975 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
976 // wrapped bucket can be replaced atomically with SetBucket in order
977 // to update credentials.
978 type s3bucket struct {
979         bucket *s3.Bucket
980         stats  s3bucketStats
981         mu     sync.Mutex
982 }
983
984 func (b *s3bucket) Bucket() *s3.Bucket {
985         b.mu.Lock()
986         defer b.mu.Unlock()
987         return b.bucket
988 }
989
990 func (b *s3bucket) SetBucket(bucket *s3.Bucket) {
991         b.mu.Lock()
992         defer b.mu.Unlock()
993         b.bucket = bucket
994 }
995
996 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
997         rdr, err := b.Bucket().GetReader(path)
998         b.stats.TickOps("get")
999         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
1000         b.stats.TickErr(err)
1001         return NewCountingReader(rdr, b.stats.TickInBytes), err
1002 }
1003
1004 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
1005         resp, err := b.Bucket().Head(path, headers)
1006         b.stats.TickOps("head")
1007         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
1008         b.stats.TickErr(err)
1009         return resp, err
1010 }
1011
1012 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
1013         if length == 0 {
1014                 // goamz will only send Content-Length: 0 when reader
1015                 // is nil due to net.http.Request.ContentLength
1016                 // behavior.  Otherwise, Content-Length header is
1017                 // omitted which will cause some S3 services
1018                 // (including AWS and Ceph RadosGW) to fail to create
1019                 // empty objects.
1020                 r = nil
1021         } else {
1022                 r = NewCountingReader(r, b.stats.TickOutBytes)
1023         }
1024         err := b.Bucket().PutReader(path, r, length, contType, perm, options)
1025         b.stats.TickOps("put")
1026         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
1027         b.stats.TickErr(err)
1028         return err
1029 }
1030
1031 func (b *s3bucket) Del(path string) error {
1032         err := b.Bucket().Del(path)
1033         b.stats.TickOps("delete")
1034         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1035         b.stats.TickErr(err)
1036         return err
1037 }
1038
1039 type s3bucketStats struct {
1040         statsTicker
1041         Ops     uint64
1042         GetOps  uint64
1043         PutOps  uint64
1044         HeadOps uint64
1045         DelOps  uint64
1046         ListOps uint64
1047 }
1048
1049 func (s *s3bucketStats) TickErr(err error) {
1050         if err == nil {
1051                 return
1052         }
1053         errType := fmt.Sprintf("%T", err)
1054         if err, ok := err.(*s3.Error); ok {
1055                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1056         }
1057         s.statsTicker.TickErr(err, errType)
1058 }