15599: Improve error message when no role is defined.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bufio"
9         "bytes"
10         "context"
11         "crypto/sha256"
12         "encoding/base64"
13         "encoding/hex"
14         "encoding/json"
15         "errors"
16         "fmt"
17         "io"
18         "io/ioutil"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "strings"
24         "sync"
25         "sync/atomic"
26         "time"
27
28         "git.curoverse.com/arvados.git/sdk/go/arvados"
29         "github.com/AdRoll/goamz/aws"
30         "github.com/AdRoll/goamz/s3"
31         "github.com/prometheus/client_golang/prometheus"
32         "github.com/sirupsen/logrus"
33 )
34
35 func init() {
36         driver["S3"] = newS3Volume
37 }
38
39 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
40         v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
41         err := json.Unmarshal(volume.DriverParameters, &v)
42         if err != nil {
43                 return nil, err
44         }
45         return v, v.check()
46 }
47
48 func (v *S3Volume) check() error {
49         if v.Bucket == "" {
50                 return errors.New("DriverParameters: Bucket must be provided")
51         }
52         if v.IndexPageSize == 0 {
53                 v.IndexPageSize = 1000
54         }
55         if v.RaceWindow < 0 {
56                 return errors.New("DriverParameters: RaceWindow must not be negative")
57         }
58
59         var ok bool
60         v.region, ok = aws.Regions[v.Region]
61         if v.Endpoint == "" {
62                 if !ok {
63                         return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
64                 }
65         } else if ok {
66                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
67                         "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
68         } else {
69                 v.region = aws.Region{
70                         Name:                 v.Region,
71                         S3Endpoint:           v.Endpoint,
72                         S3LocationConstraint: v.LocationConstraint,
73                 }
74         }
75
76         // Zero timeouts mean "wait forever", which is a bad
77         // default. Default to long timeouts instead.
78         if v.ConnectTimeout == 0 {
79                 v.ConnectTimeout = s3DefaultConnectTimeout
80         }
81         if v.ReadTimeout == 0 {
82                 v.ReadTimeout = s3DefaultReadTimeout
83         }
84
85         v.bucket = &s3bucket{
86                 bucket: &s3.Bucket{
87                         S3:   v.newS3Client(),
88                         Name: v.Bucket,
89                 },
90         }
91         // Set up prometheus metrics
92         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
93         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
94
95         err := v.bootstrapIAMCredentials()
96         if err != nil {
97                 return fmt.Errorf("error getting IAM credentials: %s", err)
98         }
99
100         return nil
101 }
102
103 const (
104         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
105         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
106 )
107
108 var (
109         // ErrS3TrashDisabled is returned by Trash if that operation
110         // is impossible with the current config.
111         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
112
113         s3ACL = s3.Private
114
115         zeroTime time.Time
116 )
117
118 const (
119         maxClockSkew  = 600 * time.Second
120         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
121 )
122
123 func s3regions() (okList []string) {
124         for r := range aws.Regions {
125                 okList = append(okList, r)
126         }
127         return
128 }
129
130 // S3Volume implements Volume using an S3 bucket.
131 type S3Volume struct {
132         AccessKey          string
133         SecretKey          string
134         AuthToken          string    // populated automatically when IAMRole is used
135         AuthExpiration     time.Time // populated automatically when IAMRole is used
136         IAMRole            string
137         Endpoint           string
138         Region             string
139         Bucket             string
140         LocationConstraint bool
141         IndexPageSize      int
142         ConnectTimeout     arvados.Duration
143         ReadTimeout        arvados.Duration
144         RaceWindow         arvados.Duration
145         UnsafeDelete       bool
146
147         cluster   *arvados.Cluster
148         volume    arvados.Volume
149         logger    logrus.FieldLogger
150         metrics   *volumeMetricsVecs
151         bucket    *s3bucket
152         region    aws.Region
153         startOnce sync.Once
154 }
155
156 // GetDeviceID returns a globally unique ID for the storage bucket.
157 func (v *S3Volume) GetDeviceID() string {
158         return "s3://" + v.Endpoint + "/" + v.Bucket
159 }
160
161 func (v *S3Volume) bootstrapIAMCredentials() error {
162         if v.AccessKey != "" || v.SecretKey != "" {
163                 return nil
164         }
165         ttl, err := v.updateIAMCredentials()
166         if err != nil {
167                 return err
168         }
169         go func() {
170                 for {
171                         time.Sleep(ttl)
172                         ttl, err = v.updateIAMCredentials()
173                         if err != nil {
174                                 v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole)
175                                 ttl = time.Second
176                         } else if ttl < time.Second {
177                                 v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole)
178                                 ttl = time.Second
179                         }
180                 }
181         }()
182         return nil
183 }
184
185 func (v *S3Volume) newS3Client() *s3.S3 {
186         auth := aws.NewAuth(v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration)
187         client := s3.New(*auth, v.region)
188         if v.region.EC2Endpoint.Signer == aws.V4Signature {
189                 // Currently affects only eu-central-1
190                 client.Signature = aws.V4Signature
191         }
192         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
193         client.ReadTimeout = time.Duration(v.ReadTimeout)
194         return client
195 }
196
197 // returned by AWS metadata endpoint .../security-credentials/${rolename}
198 type iamCredentials struct {
199         Code            string
200         LastUpdated     time.Time
201         Type            string
202         AccessKeyID     string
203         SecretAccessKey string
204         Token           string
205         Expiration      time.Time
206 }
207
208 // Returns TTL of updated credentials, i.e., time to sleep until next
209 // update.
210 func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
211         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
212         defer cancel()
213
214         metadataBaseURL := "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
215
216         var url string
217         if strings.Contains(v.IAMRole, "://") {
218                 // Configuration provides complete URL (used by tests)
219                 url = v.IAMRole
220         } else if v.IAMRole != "" {
221                 // Configuration provides IAM role name and we use the
222                 // AWS metadata endpoint
223                 url = metadataBaseURL + v.IAMRole
224         } else {
225                 url = metadataBaseURL
226                 v.logger.WithField("URL", url).Debug("looking up IAM role name")
227                 req, err := http.NewRequest("GET", url, nil)
228                 if err != nil {
229                         return 0, fmt.Errorf("error setting up request %s: %s", url, err)
230                 }
231                 resp, err := http.DefaultClient.Do(req.WithContext(ctx))
232                 if err != nil {
233                         return 0, fmt.Errorf("error getting %s: %s", url, err)
234                 }
235                 defer resp.Body.Close()
236                 if resp.StatusCode == http.StatusNotFound {
237                         return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKey and SecretKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status)
238                 } else if resp.StatusCode != http.StatusOK {
239                         return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
240                 }
241                 body := bufio.NewReader(resp.Body)
242                 var role string
243                 _, err = fmt.Fscanf(body, "%s\n", &role)
244                 if err != nil {
245                         return 0, fmt.Errorf("error reading response from %s: %s", url, err)
246                 }
247                 if n, _ := body.Read(make([]byte, 64)); n > 0 {
248                         v.logger.Warnf("ignoring additional data returned by metadata endpoint %s after the single role name that we expected", url)
249                 }
250                 v.logger.WithField("Role", role).Debug("looked up IAM role name")
251                 url = url + role
252         }
253
254         v.logger.WithField("URL", url).Debug("getting credentials")
255         req, err := http.NewRequest("GET", url, nil)
256         if err != nil {
257                 return 0, fmt.Errorf("error setting up request %s: %s", url, err)
258         }
259         resp, err := http.DefaultClient.Do(req.WithContext(ctx))
260         if err != nil {
261                 return 0, fmt.Errorf("error getting %s: %s", url, err)
262         }
263         defer resp.Body.Close()
264         if resp.StatusCode != http.StatusOK {
265                 return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
266         }
267         var cred iamCredentials
268         err = json.NewDecoder(resp.Body).Decode(&cred)
269         if err != nil {
270                 return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
271         }
272         v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
273         v.bucket.SetBucket(&s3.Bucket{
274                 S3:   v.newS3Client(),
275                 Name: v.Bucket,
276         })
277         // TTL is time from now to expiration, minus 5m.  "We make new
278         // credentials available at least five minutes before the
279         // expiration of the old credentials."  --
280         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
281         // (If that's not true, the returned ttl might be zero or
282         // negative, which the caller can handle.)
283         ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute
284         v.logger.WithFields(logrus.Fields{
285                 "AccessKeyID": cred.AccessKeyID,
286                 "LastUpdated": cred.LastUpdated,
287                 "Expiration":  cred.Expiration,
288                 "TTL":         arvados.Duration(ttl),
289         }).Debug("updated credentials")
290         return ttl, nil
291 }
292
293 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
294         ready := make(chan bool)
295         go func() {
296                 rdr, err = v.getReader(loc)
297                 close(ready)
298         }()
299         select {
300         case <-ready:
301                 return
302         case <-ctx.Done():
303                 v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
304                 go func() {
305                         <-ready
306                         if err == nil {
307                                 rdr.Close()
308                         }
309                 }()
310                 return nil, ctx.Err()
311         }
312 }
313
314 // getReader wraps (Bucket)GetReader.
315 //
316 // In situations where (Bucket)GetReader would fail because the block
317 // disappeared in a Trash race, getReader calls fixRace to recover the
318 // data, and tries again.
319 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
320         rdr, err = v.bucket.GetReader(loc)
321         err = v.translateError(err)
322         if err == nil || !os.IsNotExist(err) {
323                 return
324         }
325
326         _, err = v.bucket.Head("recent/"+loc, nil)
327         err = v.translateError(err)
328         if err != nil {
329                 // If we can't read recent/X, there's no point in
330                 // trying fixRace. Give up.
331                 return
332         }
333         if !v.fixRace(loc) {
334                 err = os.ErrNotExist
335                 return
336         }
337
338         rdr, err = v.bucket.GetReader(loc)
339         if err != nil {
340                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
341                 err = v.translateError(err)
342         }
343         return
344 }
345
346 // Get a block: copy the block data into buf, and return the number of
347 // bytes copied.
348 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
349         rdr, err := v.getReaderWithContext(ctx, loc)
350         if err != nil {
351                 return 0, err
352         }
353
354         var n int
355         ready := make(chan bool)
356         go func() {
357                 defer close(ready)
358
359                 defer rdr.Close()
360                 n, err = io.ReadFull(rdr, buf)
361
362                 switch err {
363                 case nil, io.EOF, io.ErrUnexpectedEOF:
364                         err = nil
365                 default:
366                         err = v.translateError(err)
367                 }
368         }()
369         select {
370         case <-ctx.Done():
371                 v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
372                 rdr.Close()
373                 // Must wait for ReadFull to return, to ensure it
374                 // doesn't write to buf after we return.
375                 v.logger.Debug("s3: waiting for ReadFull() to fail")
376                 <-ready
377                 return 0, ctx.Err()
378         case <-ready:
379                 return n, err
380         }
381 }
382
383 // Compare the given data with the stored data.
384 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
385         errChan := make(chan error, 1)
386         go func() {
387                 _, err := v.bucket.Head("recent/"+loc, nil)
388                 errChan <- err
389         }()
390         var err error
391         select {
392         case <-ctx.Done():
393                 return ctx.Err()
394         case err = <-errChan:
395         }
396         if err != nil {
397                 // Checking for "loc" itself here would interfere with
398                 // future GET requests.
399                 //
400                 // On AWS, if X doesn't exist, a HEAD or GET request
401                 // for X causes X's non-existence to be cached. Thus,
402                 // if we test for X, then create X and return a
403                 // signature to our client, the client might still get
404                 // 404 from all keepstores when trying to read it.
405                 //
406                 // To avoid this, we avoid doing HEAD X or GET X until
407                 // we know X has been written.
408                 //
409                 // Note that X might exist even though recent/X
410                 // doesn't: for example, the response to HEAD recent/X
411                 // might itself come from a stale cache. In such
412                 // cases, we will return a false negative and
413                 // PutHandler might needlessly create another replica
414                 // on a different volume. That's not ideal, but it's
415                 // better than passing the eventually-consistent
416                 // problem on to our clients.
417                 return v.translateError(err)
418         }
419         rdr, err := v.getReaderWithContext(ctx, loc)
420         if err != nil {
421                 return err
422         }
423         defer rdr.Close()
424         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
425 }
426
427 // Put writes a block.
428 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
429         if v.volume.ReadOnly {
430                 return MethodDisabledError
431         }
432         var opts s3.Options
433         size := len(block)
434         if size > 0 {
435                 md5, err := hex.DecodeString(loc)
436                 if err != nil {
437                         return err
438                 }
439                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
440                 // In AWS regions that use V4 signatures, we need to
441                 // provide ContentSHA256 up front. Otherwise, the S3
442                 // library reads the request body (from our buffer)
443                 // into another new buffer in order to compute the
444                 // SHA256 before sending the request -- which would
445                 // mean consuming 128 MiB of memory for the duration
446                 // of a 64 MiB write.
447                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
448         }
449
450         // Send the block data through a pipe, so that (if we need to)
451         // we can close the pipe early and abandon our PutReader()
452         // goroutine, without worrying about PutReader() accessing our
453         // block buffer after we release it.
454         bufr, bufw := io.Pipe()
455         go func() {
456                 io.Copy(bufw, bytes.NewReader(block))
457                 bufw.Close()
458         }()
459
460         var err error
461         ready := make(chan bool)
462         go func() {
463                 defer func() {
464                         if ctx.Err() != nil {
465                                 v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
466                         }
467                 }()
468                 defer close(ready)
469                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
470                 if err != nil {
471                         return
472                 }
473                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
474         }()
475         select {
476         case <-ctx.Done():
477                 v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
478                 // Our pipe might be stuck in Write(), waiting for
479                 // PutReader() to read. If so, un-stick it. This means
480                 // PutReader will get corrupt data, but that's OK: the
481                 // size and MD5 won't match, so the write will fail.
482                 go io.Copy(ioutil.Discard, bufr)
483                 // CloseWithError() will return once pending I/O is done.
484                 bufw.CloseWithError(ctx.Err())
485                 v.logger.Debugf("%s: abandoning PutReader goroutine", v)
486                 return ctx.Err()
487         case <-ready:
488                 // Unblock pipe in case PutReader did not consume it.
489                 io.Copy(ioutil.Discard, bufr)
490                 return v.translateError(err)
491         }
492 }
493
494 // Touch sets the timestamp for the given locator to the current time.
495 func (v *S3Volume) Touch(loc string) error {
496         if v.volume.ReadOnly {
497                 return MethodDisabledError
498         }
499         _, err := v.bucket.Head(loc, nil)
500         err = v.translateError(err)
501         if os.IsNotExist(err) && v.fixRace(loc) {
502                 // The data object got trashed in a race, but fixRace
503                 // rescued it.
504         } else if err != nil {
505                 return err
506         }
507         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
508         return v.translateError(err)
509 }
510
511 // Mtime returns the stored timestamp for the given locator.
512 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
513         _, err := v.bucket.Head(loc, nil)
514         if err != nil {
515                 return zeroTime, v.translateError(err)
516         }
517         resp, err := v.bucket.Head("recent/"+loc, nil)
518         err = v.translateError(err)
519         if os.IsNotExist(err) {
520                 // The data object X exists, but recent/X is missing.
521                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
522                 if err != nil {
523                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
524                         return zeroTime, v.translateError(err)
525                 }
526                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
527                 resp, err = v.bucket.Head("recent/"+loc, nil)
528                 if err != nil {
529                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
530                         return zeroTime, v.translateError(err)
531                 }
532         } else if err != nil {
533                 // HEAD recent/X failed for some other reason.
534                 return zeroTime, err
535         }
536         return v.lastModified(resp)
537 }
538
539 // IndexTo writes a complete list of locators with the given prefix
540 // for which Get() can retrieve data.
541 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
542         // Use a merge sort to find matching sets of X and recent/X.
543         dataL := s3Lister{
544                 Bucket:   v.bucket.Bucket(),
545                 Prefix:   prefix,
546                 PageSize: v.IndexPageSize,
547                 Stats:    &v.bucket.stats,
548         }
549         recentL := s3Lister{
550                 Bucket:   v.bucket.Bucket(),
551                 Prefix:   "recent/" + prefix,
552                 PageSize: v.IndexPageSize,
553                 Stats:    &v.bucket.stats,
554         }
555         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
556                 if data.Key >= "g" {
557                         // Conveniently, "recent/*" and "trash/*" are
558                         // lexically greater than all hex-encoded data
559                         // hashes, so stopping here avoids iterating
560                         // over all of them needlessly with dataL.
561                         break
562                 }
563                 if !v.isKeepBlock(data.Key) {
564                         continue
565                 }
566
567                 // stamp is the list entry we should use to report the
568                 // last-modified time for this data block: it will be
569                 // the recent/X entry if one exists, otherwise the
570                 // entry for the data block itself.
571                 stamp := data
572
573                 // Advance to the corresponding recent/X marker, if any
574                 for recent != nil && recentL.Error() == nil {
575                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
576                                 recent = recentL.Next()
577                                 continue
578                         } else if cmp == 0 {
579                                 stamp = recent
580                                 recent = recentL.Next()
581                                 break
582                         } else {
583                                 // recent/X marker is missing: we'll
584                                 // use the timestamp on the data
585                                 // object.
586                                 break
587                         }
588                 }
589                 if err := recentL.Error(); err != nil {
590                         return err
591                 }
592                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
593                 if err != nil {
594                         return err
595                 }
596                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
597         }
598         return dataL.Error()
599 }
600
601 // Trash a Keep block.
602 func (v *S3Volume) Trash(loc string) error {
603         if v.volume.ReadOnly {
604                 return MethodDisabledError
605         }
606         if t, err := v.Mtime(loc); err != nil {
607                 return err
608         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
609                 return nil
610         }
611         if v.cluster.Collections.BlobTrashLifetime == 0 {
612                 if !v.UnsafeDelete {
613                         return ErrS3TrashDisabled
614                 }
615                 return v.translateError(v.bucket.Del(loc))
616         }
617         err := v.checkRaceWindow(loc)
618         if err != nil {
619                 return err
620         }
621         err = v.safeCopy("trash/"+loc, loc)
622         if err != nil {
623                 return err
624         }
625         return v.translateError(v.bucket.Del(loc))
626 }
627
628 // checkRaceWindow returns a non-nil error if trash/loc is, or might
629 // be, in the race window (i.e., it's not safe to trash loc).
630 func (v *S3Volume) checkRaceWindow(loc string) error {
631         resp, err := v.bucket.Head("trash/"+loc, nil)
632         err = v.translateError(err)
633         if os.IsNotExist(err) {
634                 // OK, trash/X doesn't exist so we're not in the race
635                 // window
636                 return nil
637         } else if err != nil {
638                 // Error looking up trash/X. We don't know whether
639                 // we're in the race window
640                 return err
641         }
642         t, err := v.lastModified(resp)
643         if err != nil {
644                 // Can't parse timestamp
645                 return err
646         }
647         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
648         if safeWindow <= 0 {
649                 // We can't count on "touch trash/X" to prolong
650                 // trash/X's lifetime. The new timestamp might not
651                 // become visible until now+raceWindow, and EmptyTrash
652                 // is allowed to delete trash/X before then.
653                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
654         }
655         // trash/X exists, but it won't be eligible for deletion until
656         // after now+raceWindow, so it's safe to overwrite it.
657         return nil
658 }
659
660 // safeCopy calls PutCopy, and checks the response to make sure the
661 // copy succeeded and updated the timestamp on the destination object
662 // (PutCopy returns 200 OK if the request was received, even if the
663 // copy failed).
664 func (v *S3Volume) safeCopy(dst, src string) error {
665         resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{
666                 ContentType:       "application/octet-stream",
667                 MetadataDirective: "REPLACE",
668         }, v.bucket.Bucket().Name+"/"+src)
669         err = v.translateError(err)
670         if os.IsNotExist(err) {
671                 return err
672         } else if err != nil {
673                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err)
674         }
675         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
676                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
677         } else if time.Now().Sub(t) > maxClockSkew {
678                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
679         }
680         return nil
681 }
682
683 // Get the LastModified header from resp, and parse it as RFC1123 or
684 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
685 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
686         s := resp.Header.Get("Last-Modified")
687         t, err = time.Parse(time.RFC1123, s)
688         if err != nil && s != "" {
689                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
690                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
691                 // as required by HTTP spec. If it's not a valid HTTP
692                 // header value, it's probably AWS (or s3test) giving
693                 // us a nearly-RFC1123 timestamp.
694                 t, err = time.Parse(nearlyRFC1123, s)
695         }
696         return
697 }
698
699 // Untrash moves block from trash back into store
700 func (v *S3Volume) Untrash(loc string) error {
701         err := v.safeCopy(loc, "trash/"+loc)
702         if err != nil {
703                 return err
704         }
705         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
706         return v.translateError(err)
707 }
708
709 // Status returns a *VolumeStatus representing the current in-use
710 // storage capacity and a fake available capacity that doesn't make
711 // the volume seem full or nearly-full.
712 func (v *S3Volume) Status() *VolumeStatus {
713         return &VolumeStatus{
714                 DeviceNum: 1,
715                 BytesFree: BlockSize * 1000,
716                 BytesUsed: 1,
717         }
718 }
719
720 // InternalStats returns bucket I/O and API call counters.
721 func (v *S3Volume) InternalStats() interface{} {
722         return &v.bucket.stats
723 }
724
725 // String implements fmt.Stringer.
726 func (v *S3Volume) String() string {
727         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
728 }
729
730 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
731
732 func (v *S3Volume) isKeepBlock(s string) bool {
733         return s3KeepBlockRegexp.MatchString(s)
734 }
735
736 // fixRace(X) is called when "recent/X" exists but "X" doesn't
737 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
738 // there was a race between Put and Trash, fixRace recovers from the
739 // race by Untrashing the block.
740 func (v *S3Volume) fixRace(loc string) bool {
741         trash, err := v.bucket.Head("trash/"+loc, nil)
742         if err != nil {
743                 if !os.IsNotExist(v.translateError(err)) {
744                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
745                 }
746                 return false
747         }
748         trashTime, err := v.lastModified(trash)
749         if err != nil {
750                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
751                 return false
752         }
753
754         recent, err := v.bucket.Head("recent/"+loc, nil)
755         if err != nil {
756                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
757                 return false
758         }
759         recentTime, err := v.lastModified(recent)
760         if err != nil {
761                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
762                 return false
763         }
764
765         ageWhenTrashed := trashTime.Sub(recentTime)
766         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
767                 // No evidence of a race: block hasn't been written
768                 // since it became eligible for Trash. No fix needed.
769                 return false
770         }
771
772         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
773         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
774         err = v.safeCopy(loc, "trash/"+loc)
775         if err != nil {
776                 log.Printf("error: fixRace: %s", err)
777                 return false
778         }
779         return true
780 }
781
782 func (v *S3Volume) translateError(err error) error {
783         switch err := err.(type) {
784         case *s3.Error:
785                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
786                         strings.Contains(err.Error(), "Not Found") {
787                         return os.ErrNotExist
788                 }
789                 // Other 404 errors like NoSuchVersion and
790                 // NoSuchBucket are different problems which should
791                 // get called out downstream, so we don't convert them
792                 // to os.ErrNotExist.
793         }
794         return err
795 }
796
797 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
798 // and deletes them from the volume.
799 func (v *S3Volume) EmptyTrash() {
800         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
801                 return
802         }
803
804         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
805
806         // Define "ready to delete" as "...when EmptyTrash started".
807         startT := time.Now()
808
809         emptyOneKey := func(trash *s3.Key) {
810                 loc := trash.Key[6:]
811                 if !v.isKeepBlock(loc) {
812                         return
813                 }
814                 atomic.AddInt64(&bytesInTrash, trash.Size)
815                 atomic.AddInt64(&blocksInTrash, 1)
816
817                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
818                 if err != nil {
819                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
820                         return
821                 }
822                 recent, err := v.bucket.Head("recent/"+loc, nil)
823                 if err != nil && os.IsNotExist(v.translateError(err)) {
824                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
825                         err = v.Untrash(loc)
826                         if err != nil {
827                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
828                         }
829                         return
830                 } else if err != nil {
831                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
832                         return
833                 }
834                 recentT, err := v.lastModified(recent)
835                 if err != nil {
836                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
837                         return
838                 }
839                 if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
840                         if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
841                                 // recent/loc is too old to protect
842                                 // loc from being Trashed again during
843                                 // the raceWindow that starts if we
844                                 // delete trash/X now.
845                                 //
846                                 // Note this means (TrashSweepInterval
847                                 // < BlobSigningTTL - raceWindow) is
848                                 // necessary to avoid starvation.
849                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
850                                 v.fixRace(loc)
851                                 v.Touch(loc)
852                                 return
853                         }
854                         _, err := v.bucket.Head(loc, nil)
855                         if os.IsNotExist(err) {
856                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
857                                 v.fixRace(loc)
858                                 return
859                         } else if err != nil {
860                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
861                                 return
862                         }
863                 }
864                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
865                         return
866                 }
867                 err = v.bucket.Del(trash.Key)
868                 if err != nil {
869                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
870                         return
871                 }
872                 atomic.AddInt64(&bytesDeleted, trash.Size)
873                 atomic.AddInt64(&blocksDeleted, 1)
874
875                 _, err = v.bucket.Head(loc, nil)
876                 if err == nil {
877                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
878                         return
879                 }
880                 if !os.IsNotExist(v.translateError(err)) {
881                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
882                         return
883                 }
884                 err = v.bucket.Del("recent/" + loc)
885                 if err != nil {
886                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
887                 }
888         }
889
890         var wg sync.WaitGroup
891         todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency)
892         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
893                 wg.Add(1)
894                 go func() {
895                         defer wg.Done()
896                         for key := range todo {
897                                 emptyOneKey(key)
898                         }
899                 }()
900         }
901
902         trashL := s3Lister{
903                 Bucket:   v.bucket.Bucket(),
904                 Prefix:   "trash/",
905                 PageSize: v.IndexPageSize,
906                 Stats:    &v.bucket.stats,
907         }
908         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
909                 todo <- trash
910         }
911         close(todo)
912         wg.Wait()
913
914         if err := trashL.Error(); err != nil {
915                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
916         }
917         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
918 }
919
920 type s3Lister struct {
921         Bucket     *s3.Bucket
922         Prefix     string
923         PageSize   int
924         Stats      *s3bucketStats
925         nextMarker string
926         buf        []s3.Key
927         err        error
928 }
929
930 // First fetches the first page and returns the first item. It returns
931 // nil if the response is the empty set or an error occurs.
932 func (lister *s3Lister) First() *s3.Key {
933         lister.getPage()
934         return lister.pop()
935 }
936
937 // Next returns the next item, fetching the next page if necessary. It
938 // returns nil if the last available item has already been fetched, or
939 // an error occurs.
940 func (lister *s3Lister) Next() *s3.Key {
941         if len(lister.buf) == 0 && lister.nextMarker != "" {
942                 lister.getPage()
943         }
944         return lister.pop()
945 }
946
947 // Return the most recent error encountered by First or Next.
948 func (lister *s3Lister) Error() error {
949         return lister.err
950 }
951
952 func (lister *s3Lister) getPage() {
953         lister.Stats.TickOps("list")
954         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
955         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
956         lister.nextMarker = ""
957         if err != nil {
958                 lister.err = err
959                 return
960         }
961         if resp.IsTruncated {
962                 lister.nextMarker = resp.NextMarker
963         }
964         lister.buf = make([]s3.Key, 0, len(resp.Contents))
965         for _, key := range resp.Contents {
966                 if !strings.HasPrefix(key.Key, lister.Prefix) {
967                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
968                         continue
969                 }
970                 lister.buf = append(lister.buf, key)
971         }
972 }
973
974 func (lister *s3Lister) pop() (k *s3.Key) {
975         if len(lister.buf) > 0 {
976                 k = &lister.buf[0]
977                 lister.buf = lister.buf[1:]
978         }
979         return
980 }
981
982 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
983 // wrapped bucket can be replaced atomically with SetBucket in order
984 // to update credentials.
985 type s3bucket struct {
986         bucket *s3.Bucket
987         stats  s3bucketStats
988         mu     sync.Mutex
989 }
990
991 func (b *s3bucket) Bucket() *s3.Bucket {
992         b.mu.Lock()
993         defer b.mu.Unlock()
994         return b.bucket
995 }
996
997 func (b *s3bucket) SetBucket(bucket *s3.Bucket) {
998         b.mu.Lock()
999         defer b.mu.Unlock()
1000         b.bucket = bucket
1001 }
1002
1003 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
1004         rdr, err := b.Bucket().GetReader(path)
1005         b.stats.TickOps("get")
1006         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
1007         b.stats.TickErr(err)
1008         return NewCountingReader(rdr, b.stats.TickInBytes), err
1009 }
1010
1011 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
1012         resp, err := b.Bucket().Head(path, headers)
1013         b.stats.TickOps("head")
1014         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
1015         b.stats.TickErr(err)
1016         return resp, err
1017 }
1018
1019 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
1020         if length == 0 {
1021                 // goamz will only send Content-Length: 0 when reader
1022                 // is nil due to net.http.Request.ContentLength
1023                 // behavior.  Otherwise, Content-Length header is
1024                 // omitted which will cause some S3 services
1025                 // (including AWS and Ceph RadosGW) to fail to create
1026                 // empty objects.
1027                 r = nil
1028         } else {
1029                 r = NewCountingReader(r, b.stats.TickOutBytes)
1030         }
1031         err := b.Bucket().PutReader(path, r, length, contType, perm, options)
1032         b.stats.TickOps("put")
1033         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
1034         b.stats.TickErr(err)
1035         return err
1036 }
1037
1038 func (b *s3bucket) Del(path string) error {
1039         err := b.Bucket().Del(path)
1040         b.stats.TickOps("delete")
1041         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1042         b.stats.TickErr(err)
1043         return err
1044 }
1045
1046 type s3bucketStats struct {
1047         statsTicker
1048         Ops     uint64
1049         GetOps  uint64
1050         PutOps  uint64
1051         HeadOps uint64
1052         DelOps  uint64
1053         ListOps uint64
1054 }
1055
1056 func (s *s3bucketStats) TickErr(err error) {
1057         if err == nil {
1058                 return
1059         }
1060         errType := fmt.Sprintf("%T", err)
1061         if err, ok := err.(*s3.Error); ok {
1062                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1063         }
1064         s.statsTicker.TickErr(err, errType)
1065 }