15599: Support IAM role credentials in keepstore.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/sha256"
11         "encoding/base64"
12         "encoding/hex"
13         "encoding/json"
14         "errors"
15         "fmt"
16         "io"
17         "io/ioutil"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "time"
26
27         "git.curoverse.com/arvados.git/sdk/go/arvados"
28         "github.com/AdRoll/goamz/aws"
29         "github.com/AdRoll/goamz/s3"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32 )
33
34 func init() {
35         driver["S3"] = newS3Volume
36 }
37
38 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
39         v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
40         err := json.Unmarshal(volume.DriverParameters, &v)
41         if err != nil {
42                 return nil, err
43         }
44         return v, v.check()
45 }
46
47 func (v *S3Volume) check() error {
48         if v.Bucket == "" {
49                 return errors.New("DriverParameters: Bucket must be provided")
50         }
51         if v.IAMRole == "" && (v.AccessKey == "" || v.SecretKey == "") {
52                 return errors.New("DriverParameters: either IAMRole or literal credentials (AccessKey and SecretKey) must be provided")
53         }
54         if v.IndexPageSize == 0 {
55                 v.IndexPageSize = 1000
56         }
57         if v.RaceWindow < 0 {
58                 return errors.New("DriverParameters: RaceWindow must not be negative")
59         }
60
61         var ok bool
62         v.region, ok = aws.Regions[v.Region]
63         if v.Endpoint == "" {
64                 if !ok {
65                         return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
66                 }
67         } else if ok {
68                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
69                         "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
70         } else {
71                 v.region = aws.Region{
72                         Name:                 v.Region,
73                         S3Endpoint:           v.Endpoint,
74                         S3LocationConstraint: v.LocationConstraint,
75                 }
76         }
77
78         // Zero timeouts mean "wait forever", which is a bad
79         // default. Default to long timeouts instead.
80         if v.ConnectTimeout == 0 {
81                 v.ConnectTimeout = s3DefaultConnectTimeout
82         }
83         if v.ReadTimeout == 0 {
84                 v.ReadTimeout = s3DefaultReadTimeout
85         }
86
87         v.bucket = &s3bucket{
88                 bucket: &s3.Bucket{
89                         S3:   v.newS3Client(),
90                         Name: v.Bucket,
91                 },
92         }
93         // Set up prometheus metrics
94         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
95         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
96
97         err := v.bootstrapIAMCredentials()
98         if err != nil {
99                 return fmt.Errorf("error getting IAM credentials: %s", err)
100         }
101
102         return nil
103 }
104
105 const (
106         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
107         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
108 )
109
110 var (
111         // ErrS3TrashDisabled is returned by Trash if that operation
112         // is impossible with the current config.
113         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
114
115         s3ACL = s3.Private
116
117         zeroTime time.Time
118 )
119
120 const (
121         maxClockSkew  = 600 * time.Second
122         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
123 )
124
125 func s3regions() (okList []string) {
126         for r := range aws.Regions {
127                 okList = append(okList, r)
128         }
129         return
130 }
131
132 // S3Volume implements Volume using an S3 bucket.
133 type S3Volume struct {
134         AccessKey          string
135         SecretKey          string
136         AuthToken          string    // populated automatically when IAMRole is used
137         AuthExpiration     time.Time // populated automatically when IAMRole is used
138         IAMRole            string
139         Endpoint           string
140         Region             string
141         Bucket             string
142         LocationConstraint bool
143         IndexPageSize      int
144         ConnectTimeout     arvados.Duration
145         ReadTimeout        arvados.Duration
146         RaceWindow         arvados.Duration
147         UnsafeDelete       bool
148
149         cluster   *arvados.Cluster
150         volume    arvados.Volume
151         logger    logrus.FieldLogger
152         metrics   *volumeMetricsVecs
153         bucket    *s3bucket
154         region    aws.Region
155         startOnce sync.Once
156 }
157
158 // GetDeviceID returns a globally unique ID for the storage bucket.
159 func (v *S3Volume) GetDeviceID() string {
160         return "s3://" + v.Endpoint + "/" + v.Bucket
161 }
162
163 func (v *S3Volume) bootstrapIAMCredentials() error {
164         if v.IAMRole == "" {
165                 return nil
166         }
167         ttl, err := v.updateIAMCredentials()
168         if err != nil {
169                 return err
170         }
171         go func() {
172                 for {
173                         time.Sleep(ttl)
174                         ttl, err = v.updateIAMCredentials()
175                         if err != nil {
176                                 v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole)
177                                 ttl = time.Second
178                         } else if ttl < time.Second {
179                                 v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole)
180                                 ttl = time.Second
181                         }
182                 }
183         }()
184         return nil
185 }
186
187 func (v *S3Volume) newS3Client() *s3.S3 {
188         auth := aws.NewAuth(v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration)
189         client := s3.New(*auth, v.region)
190         if v.region.EC2Endpoint.Signer == aws.V4Signature {
191                 // Currently affects only eu-central-1
192                 client.Signature = aws.V4Signature
193         }
194         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
195         client.ReadTimeout = time.Duration(v.ReadTimeout)
196         return client
197 }
198
199 // returned by AWS metadata endpoint .../security-credentials/${rolename}
200 type iamCredentials struct {
201         Code            string
202         LastUpdated     time.Time
203         Type            string
204         AccessKeyID     string
205         SecretAccessKey string
206         Token           string
207         Expiration      time.Time
208 }
209
210 // Returns TTL of updated credentials, i.e., time to sleep until next
211 // update.
212 func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
213         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
214         defer cancel()
215
216         var url string
217         if strings.Contains(v.IAMRole, "://") {
218                 // Configuration provides complete URL (used by tests)
219                 url = v.IAMRole
220         } else {
221                 // Configuration provides IAM role name and we use the
222                 // AWS metadata endpoint
223                 url = "http://169.254.169.254/latest/meta-data/iam/security-credentials/" + v.IAMRole
224         }
225         v.logger.WithField("URL", url).Debug("getting credentials")
226         req, err := http.NewRequest("GET", url, nil)
227         if err != nil {
228                 return 0, fmt.Errorf("error setting up request %s: %s", url, err)
229         }
230         resp, err := http.DefaultClient.Do(req.WithContext(ctx))
231         if err != nil {
232                 return 0, fmt.Errorf("error getting %s: %s", url, err)
233         }
234         defer resp.Body.Close()
235         if resp.StatusCode != http.StatusOK {
236                 return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
237         }
238         var cred iamCredentials
239         err = json.NewDecoder(resp.Body).Decode(&cred)
240         if err != nil {
241                 return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
242         }
243         v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
244         v.bucket.SetBucket(&s3.Bucket{
245                 S3:   v.newS3Client(),
246                 Name: v.Bucket,
247         })
248         // TTL is time from now to expiration, minus 5m.  "We make new
249         // credentials available at least five minutes before the
250         // expiration of the old credentials."  --
251         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
252         // (If that's not true, the returned ttl might be zero or
253         // negative, which the caller can handle.)
254         ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute
255         v.logger.WithFields(logrus.Fields{
256                 "AccessKeyID": cred.AccessKeyID,
257                 "LastUpdated": cred.LastUpdated,
258                 "Expiration":  cred.Expiration,
259                 "TTL":         arvados.Duration(ttl),
260         }).Debug("updated credentials")
261         return ttl, nil
262 }
263
264 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
265         ready := make(chan bool)
266         go func() {
267                 rdr, err = v.getReader(loc)
268                 close(ready)
269         }()
270         select {
271         case <-ready:
272                 return
273         case <-ctx.Done():
274                 v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
275                 go func() {
276                         <-ready
277                         if err == nil {
278                                 rdr.Close()
279                         }
280                 }()
281                 return nil, ctx.Err()
282         }
283 }
284
285 // getReader wraps (Bucket)GetReader.
286 //
287 // In situations where (Bucket)GetReader would fail because the block
288 // disappeared in a Trash race, getReader calls fixRace to recover the
289 // data, and tries again.
290 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
291         rdr, err = v.bucket.GetReader(loc)
292         err = v.translateError(err)
293         if err == nil || !os.IsNotExist(err) {
294                 return
295         }
296
297         _, err = v.bucket.Head("recent/"+loc, nil)
298         err = v.translateError(err)
299         if err != nil {
300                 // If we can't read recent/X, there's no point in
301                 // trying fixRace. Give up.
302                 return
303         }
304         if !v.fixRace(loc) {
305                 err = os.ErrNotExist
306                 return
307         }
308
309         rdr, err = v.bucket.GetReader(loc)
310         if err != nil {
311                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
312                 err = v.translateError(err)
313         }
314         return
315 }
316
317 // Get a block: copy the block data into buf, and return the number of
318 // bytes copied.
319 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
320         rdr, err := v.getReaderWithContext(ctx, loc)
321         if err != nil {
322                 return 0, err
323         }
324
325         var n int
326         ready := make(chan bool)
327         go func() {
328                 defer close(ready)
329
330                 defer rdr.Close()
331                 n, err = io.ReadFull(rdr, buf)
332
333                 switch err {
334                 case nil, io.EOF, io.ErrUnexpectedEOF:
335                         err = nil
336                 default:
337                         err = v.translateError(err)
338                 }
339         }()
340         select {
341         case <-ctx.Done():
342                 v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
343                 rdr.Close()
344                 // Must wait for ReadFull to return, to ensure it
345                 // doesn't write to buf after we return.
346                 v.logger.Debug("s3: waiting for ReadFull() to fail")
347                 <-ready
348                 return 0, ctx.Err()
349         case <-ready:
350                 return n, err
351         }
352 }
353
354 // Compare the given data with the stored data.
355 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
356         errChan := make(chan error, 1)
357         go func() {
358                 _, err := v.bucket.Head("recent/"+loc, nil)
359                 errChan <- err
360         }()
361         var err error
362         select {
363         case <-ctx.Done():
364                 return ctx.Err()
365         case err = <-errChan:
366         }
367         if err != nil {
368                 // Checking for "loc" itself here would interfere with
369                 // future GET requests.
370                 //
371                 // On AWS, if X doesn't exist, a HEAD or GET request
372                 // for X causes X's non-existence to be cached. Thus,
373                 // if we test for X, then create X and return a
374                 // signature to our client, the client might still get
375                 // 404 from all keepstores when trying to read it.
376                 //
377                 // To avoid this, we avoid doing HEAD X or GET X until
378                 // we know X has been written.
379                 //
380                 // Note that X might exist even though recent/X
381                 // doesn't: for example, the response to HEAD recent/X
382                 // might itself come from a stale cache. In such
383                 // cases, we will return a false negative and
384                 // PutHandler might needlessly create another replica
385                 // on a different volume. That's not ideal, but it's
386                 // better than passing the eventually-consistent
387                 // problem on to our clients.
388                 return v.translateError(err)
389         }
390         rdr, err := v.getReaderWithContext(ctx, loc)
391         if err != nil {
392                 return err
393         }
394         defer rdr.Close()
395         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
396 }
397
398 // Put writes a block.
399 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
400         if v.volume.ReadOnly {
401                 return MethodDisabledError
402         }
403         var opts s3.Options
404         size := len(block)
405         if size > 0 {
406                 md5, err := hex.DecodeString(loc)
407                 if err != nil {
408                         return err
409                 }
410                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
411                 // In AWS regions that use V4 signatures, we need to
412                 // provide ContentSHA256 up front. Otherwise, the S3
413                 // library reads the request body (from our buffer)
414                 // into another new buffer in order to compute the
415                 // SHA256 before sending the request -- which would
416                 // mean consuming 128 MiB of memory for the duration
417                 // of a 64 MiB write.
418                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
419         }
420
421         // Send the block data through a pipe, so that (if we need to)
422         // we can close the pipe early and abandon our PutReader()
423         // goroutine, without worrying about PutReader() accessing our
424         // block buffer after we release it.
425         bufr, bufw := io.Pipe()
426         go func() {
427                 io.Copy(bufw, bytes.NewReader(block))
428                 bufw.Close()
429         }()
430
431         var err error
432         ready := make(chan bool)
433         go func() {
434                 defer func() {
435                         if ctx.Err() != nil {
436                                 v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
437                         }
438                 }()
439                 defer close(ready)
440                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
441                 if err != nil {
442                         return
443                 }
444                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
445         }()
446         select {
447         case <-ctx.Done():
448                 v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
449                 // Our pipe might be stuck in Write(), waiting for
450                 // PutReader() to read. If so, un-stick it. This means
451                 // PutReader will get corrupt data, but that's OK: the
452                 // size and MD5 won't match, so the write will fail.
453                 go io.Copy(ioutil.Discard, bufr)
454                 // CloseWithError() will return once pending I/O is done.
455                 bufw.CloseWithError(ctx.Err())
456                 v.logger.Debugf("%s: abandoning PutReader goroutine", v)
457                 return ctx.Err()
458         case <-ready:
459                 // Unblock pipe in case PutReader did not consume it.
460                 io.Copy(ioutil.Discard, bufr)
461                 return v.translateError(err)
462         }
463 }
464
465 // Touch sets the timestamp for the given locator to the current time.
466 func (v *S3Volume) Touch(loc string) error {
467         if v.volume.ReadOnly {
468                 return MethodDisabledError
469         }
470         _, err := v.bucket.Head(loc, nil)
471         err = v.translateError(err)
472         if os.IsNotExist(err) && v.fixRace(loc) {
473                 // The data object got trashed in a race, but fixRace
474                 // rescued it.
475         } else if err != nil {
476                 return err
477         }
478         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
479         return v.translateError(err)
480 }
481
482 // Mtime returns the stored timestamp for the given locator.
483 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
484         _, err := v.bucket.Head(loc, nil)
485         if err != nil {
486                 return zeroTime, v.translateError(err)
487         }
488         resp, err := v.bucket.Head("recent/"+loc, nil)
489         err = v.translateError(err)
490         if os.IsNotExist(err) {
491                 // The data object X exists, but recent/X is missing.
492                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
493                 if err != nil {
494                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
495                         return zeroTime, v.translateError(err)
496                 }
497                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
498                 resp, err = v.bucket.Head("recent/"+loc, nil)
499                 if err != nil {
500                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
501                         return zeroTime, v.translateError(err)
502                 }
503         } else if err != nil {
504                 // HEAD recent/X failed for some other reason.
505                 return zeroTime, err
506         }
507         return v.lastModified(resp)
508 }
509
510 // IndexTo writes a complete list of locators with the given prefix
511 // for which Get() can retrieve data.
512 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
513         // Use a merge sort to find matching sets of X and recent/X.
514         dataL := s3Lister{
515                 Bucket:   v.bucket.Bucket(),
516                 Prefix:   prefix,
517                 PageSize: v.IndexPageSize,
518                 Stats:    &v.bucket.stats,
519         }
520         recentL := s3Lister{
521                 Bucket:   v.bucket.Bucket(),
522                 Prefix:   "recent/" + prefix,
523                 PageSize: v.IndexPageSize,
524                 Stats:    &v.bucket.stats,
525         }
526         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
527                 if data.Key >= "g" {
528                         // Conveniently, "recent/*" and "trash/*" are
529                         // lexically greater than all hex-encoded data
530                         // hashes, so stopping here avoids iterating
531                         // over all of them needlessly with dataL.
532                         break
533                 }
534                 if !v.isKeepBlock(data.Key) {
535                         continue
536                 }
537
538                 // stamp is the list entry we should use to report the
539                 // last-modified time for this data block: it will be
540                 // the recent/X entry if one exists, otherwise the
541                 // entry for the data block itself.
542                 stamp := data
543
544                 // Advance to the corresponding recent/X marker, if any
545                 for recent != nil && recentL.Error() == nil {
546                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
547                                 recent = recentL.Next()
548                                 continue
549                         } else if cmp == 0 {
550                                 stamp = recent
551                                 recent = recentL.Next()
552                                 break
553                         } else {
554                                 // recent/X marker is missing: we'll
555                                 // use the timestamp on the data
556                                 // object.
557                                 break
558                         }
559                 }
560                 if err := recentL.Error(); err != nil {
561                         return err
562                 }
563                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
564                 if err != nil {
565                         return err
566                 }
567                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
568         }
569         return dataL.Error()
570 }
571
572 // Trash a Keep block.
573 func (v *S3Volume) Trash(loc string) error {
574         if v.volume.ReadOnly {
575                 return MethodDisabledError
576         }
577         if t, err := v.Mtime(loc); err != nil {
578                 return err
579         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
580                 return nil
581         }
582         if v.cluster.Collections.BlobTrashLifetime == 0 {
583                 if !v.UnsafeDelete {
584                         return ErrS3TrashDisabled
585                 }
586                 return v.translateError(v.bucket.Del(loc))
587         }
588         err := v.checkRaceWindow(loc)
589         if err != nil {
590                 return err
591         }
592         err = v.safeCopy("trash/"+loc, loc)
593         if err != nil {
594                 return err
595         }
596         return v.translateError(v.bucket.Del(loc))
597 }
598
599 // checkRaceWindow returns a non-nil error if trash/loc is, or might
600 // be, in the race window (i.e., it's not safe to trash loc).
601 func (v *S3Volume) checkRaceWindow(loc string) error {
602         resp, err := v.bucket.Head("trash/"+loc, nil)
603         err = v.translateError(err)
604         if os.IsNotExist(err) {
605                 // OK, trash/X doesn't exist so we're not in the race
606                 // window
607                 return nil
608         } else if err != nil {
609                 // Error looking up trash/X. We don't know whether
610                 // we're in the race window
611                 return err
612         }
613         t, err := v.lastModified(resp)
614         if err != nil {
615                 // Can't parse timestamp
616                 return err
617         }
618         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
619         if safeWindow <= 0 {
620                 // We can't count on "touch trash/X" to prolong
621                 // trash/X's lifetime. The new timestamp might not
622                 // become visible until now+raceWindow, and EmptyTrash
623                 // is allowed to delete trash/X before then.
624                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
625         }
626         // trash/X exists, but it won't be eligible for deletion until
627         // after now+raceWindow, so it's safe to overwrite it.
628         return nil
629 }
630
631 // safeCopy calls PutCopy, and checks the response to make sure the
632 // copy succeeded and updated the timestamp on the destination object
633 // (PutCopy returns 200 OK if the request was received, even if the
634 // copy failed).
635 func (v *S3Volume) safeCopy(dst, src string) error {
636         resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{
637                 ContentType:       "application/octet-stream",
638                 MetadataDirective: "REPLACE",
639         }, v.bucket.Bucket().Name+"/"+src)
640         err = v.translateError(err)
641         if os.IsNotExist(err) {
642                 return err
643         } else if err != nil {
644                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err)
645         }
646         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
647                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
648         } else if time.Now().Sub(t) > maxClockSkew {
649                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
650         }
651         return nil
652 }
653
654 // Get the LastModified header from resp, and parse it as RFC1123 or
655 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
656 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
657         s := resp.Header.Get("Last-Modified")
658         t, err = time.Parse(time.RFC1123, s)
659         if err != nil && s != "" {
660                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
661                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
662                 // as required by HTTP spec. If it's not a valid HTTP
663                 // header value, it's probably AWS (or s3test) giving
664                 // us a nearly-RFC1123 timestamp.
665                 t, err = time.Parse(nearlyRFC1123, s)
666         }
667         return
668 }
669
670 // Untrash moves block from trash back into store
671 func (v *S3Volume) Untrash(loc string) error {
672         err := v.safeCopy(loc, "trash/"+loc)
673         if err != nil {
674                 return err
675         }
676         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
677         return v.translateError(err)
678 }
679
680 // Status returns a *VolumeStatus representing the current in-use
681 // storage capacity and a fake available capacity that doesn't make
682 // the volume seem full or nearly-full.
683 func (v *S3Volume) Status() *VolumeStatus {
684         return &VolumeStatus{
685                 DeviceNum: 1,
686                 BytesFree: BlockSize * 1000,
687                 BytesUsed: 1,
688         }
689 }
690
691 // InternalStats returns bucket I/O and API call counters.
692 func (v *S3Volume) InternalStats() interface{} {
693         return &v.bucket.stats
694 }
695
696 // String implements fmt.Stringer.
697 func (v *S3Volume) String() string {
698         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
699 }
700
701 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
702
703 func (v *S3Volume) isKeepBlock(s string) bool {
704         return s3KeepBlockRegexp.MatchString(s)
705 }
706
707 // fixRace(X) is called when "recent/X" exists but "X" doesn't
708 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
709 // there was a race between Put and Trash, fixRace recovers from the
710 // race by Untrashing the block.
711 func (v *S3Volume) fixRace(loc string) bool {
712         trash, err := v.bucket.Head("trash/"+loc, nil)
713         if err != nil {
714                 if !os.IsNotExist(v.translateError(err)) {
715                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
716                 }
717                 return false
718         }
719         trashTime, err := v.lastModified(trash)
720         if err != nil {
721                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
722                 return false
723         }
724
725         recent, err := v.bucket.Head("recent/"+loc, nil)
726         if err != nil {
727                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
728                 return false
729         }
730         recentTime, err := v.lastModified(recent)
731         if err != nil {
732                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
733                 return false
734         }
735
736         ageWhenTrashed := trashTime.Sub(recentTime)
737         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
738                 // No evidence of a race: block hasn't been written
739                 // since it became eligible for Trash. No fix needed.
740                 return false
741         }
742
743         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
744         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
745         err = v.safeCopy(loc, "trash/"+loc)
746         if err != nil {
747                 log.Printf("error: fixRace: %s", err)
748                 return false
749         }
750         return true
751 }
752
753 func (v *S3Volume) translateError(err error) error {
754         switch err := err.(type) {
755         case *s3.Error:
756                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
757                         strings.Contains(err.Error(), "Not Found") {
758                         return os.ErrNotExist
759                 }
760                 // Other 404 errors like NoSuchVersion and
761                 // NoSuchBucket are different problems which should
762                 // get called out downstream, so we don't convert them
763                 // to os.ErrNotExist.
764         }
765         return err
766 }
767
768 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
769 // and deletes them from the volume.
770 func (v *S3Volume) EmptyTrash() {
771         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
772                 return
773         }
774
775         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
776
777         // Define "ready to delete" as "...when EmptyTrash started".
778         startT := time.Now()
779
780         emptyOneKey := func(trash *s3.Key) {
781                 loc := trash.Key[6:]
782                 if !v.isKeepBlock(loc) {
783                         return
784                 }
785                 atomic.AddInt64(&bytesInTrash, trash.Size)
786                 atomic.AddInt64(&blocksInTrash, 1)
787
788                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
789                 if err != nil {
790                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
791                         return
792                 }
793                 recent, err := v.bucket.Head("recent/"+loc, nil)
794                 if err != nil && os.IsNotExist(v.translateError(err)) {
795                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
796                         err = v.Untrash(loc)
797                         if err != nil {
798                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
799                         }
800                         return
801                 } else if err != nil {
802                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
803                         return
804                 }
805                 recentT, err := v.lastModified(recent)
806                 if err != nil {
807                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
808                         return
809                 }
810                 if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
811                         if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
812                                 // recent/loc is too old to protect
813                                 // loc from being Trashed again during
814                                 // the raceWindow that starts if we
815                                 // delete trash/X now.
816                                 //
817                                 // Note this means (TrashSweepInterval
818                                 // < BlobSigningTTL - raceWindow) is
819                                 // necessary to avoid starvation.
820                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
821                                 v.fixRace(loc)
822                                 v.Touch(loc)
823                                 return
824                         }
825                         _, err := v.bucket.Head(loc, nil)
826                         if os.IsNotExist(err) {
827                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
828                                 v.fixRace(loc)
829                                 return
830                         } else if err != nil {
831                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
832                                 return
833                         }
834                 }
835                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
836                         return
837                 }
838                 err = v.bucket.Del(trash.Key)
839                 if err != nil {
840                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
841                         return
842                 }
843                 atomic.AddInt64(&bytesDeleted, trash.Size)
844                 atomic.AddInt64(&blocksDeleted, 1)
845
846                 _, err = v.bucket.Head(loc, nil)
847                 if err == nil {
848                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
849                         return
850                 }
851                 if !os.IsNotExist(v.translateError(err)) {
852                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
853                         return
854                 }
855                 err = v.bucket.Del("recent/" + loc)
856                 if err != nil {
857                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
858                 }
859         }
860
861         var wg sync.WaitGroup
862         todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency)
863         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
864                 wg.Add(1)
865                 go func() {
866                         defer wg.Done()
867                         for key := range todo {
868                                 emptyOneKey(key)
869                         }
870                 }()
871         }
872
873         trashL := s3Lister{
874                 Bucket:   v.bucket.Bucket(),
875                 Prefix:   "trash/",
876                 PageSize: v.IndexPageSize,
877                 Stats:    &v.bucket.stats,
878         }
879         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
880                 todo <- trash
881         }
882         close(todo)
883         wg.Wait()
884
885         if err := trashL.Error(); err != nil {
886                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
887         }
888         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
889 }
890
891 type s3Lister struct {
892         Bucket     *s3.Bucket
893         Prefix     string
894         PageSize   int
895         Stats      *s3bucketStats
896         nextMarker string
897         buf        []s3.Key
898         err        error
899 }
900
901 // First fetches the first page and returns the first item. It returns
902 // nil if the response is the empty set or an error occurs.
903 func (lister *s3Lister) First() *s3.Key {
904         lister.getPage()
905         return lister.pop()
906 }
907
908 // Next returns the next item, fetching the next page if necessary. It
909 // returns nil if the last available item has already been fetched, or
910 // an error occurs.
911 func (lister *s3Lister) Next() *s3.Key {
912         if len(lister.buf) == 0 && lister.nextMarker != "" {
913                 lister.getPage()
914         }
915         return lister.pop()
916 }
917
918 // Return the most recent error encountered by First or Next.
919 func (lister *s3Lister) Error() error {
920         return lister.err
921 }
922
923 func (lister *s3Lister) getPage() {
924         lister.Stats.TickOps("list")
925         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
926         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
927         lister.nextMarker = ""
928         if err != nil {
929                 lister.err = err
930                 return
931         }
932         if resp.IsTruncated {
933                 lister.nextMarker = resp.NextMarker
934         }
935         lister.buf = make([]s3.Key, 0, len(resp.Contents))
936         for _, key := range resp.Contents {
937                 if !strings.HasPrefix(key.Key, lister.Prefix) {
938                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
939                         continue
940                 }
941                 lister.buf = append(lister.buf, key)
942         }
943 }
944
945 func (lister *s3Lister) pop() (k *s3.Key) {
946         if len(lister.buf) > 0 {
947                 k = &lister.buf[0]
948                 lister.buf = lister.buf[1:]
949         }
950         return
951 }
952
953 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
954 // wrapped bucket can be replaced atomically with SetBucket in order
955 // to update credentials.
956 type s3bucket struct {
957         bucket *s3.Bucket
958         stats  s3bucketStats
959         mu     sync.Mutex
960 }
961
962 func (b *s3bucket) Bucket() *s3.Bucket {
963         b.mu.Lock()
964         defer b.mu.Unlock()
965         return b.bucket
966 }
967
968 func (b *s3bucket) SetBucket(bucket *s3.Bucket) {
969         b.mu.Lock()
970         defer b.mu.Unlock()
971         b.bucket = bucket
972 }
973
974 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
975         rdr, err := b.Bucket().GetReader(path)
976         b.stats.TickOps("get")
977         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
978         b.stats.TickErr(err)
979         return NewCountingReader(rdr, b.stats.TickInBytes), err
980 }
981
982 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
983         resp, err := b.Bucket().Head(path, headers)
984         b.stats.TickOps("head")
985         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
986         b.stats.TickErr(err)
987         return resp, err
988 }
989
990 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
991         if length == 0 {
992                 // goamz will only send Content-Length: 0 when reader
993                 // is nil due to net.http.Request.ContentLength
994                 // behavior.  Otherwise, Content-Length header is
995                 // omitted which will cause some S3 services
996                 // (including AWS and Ceph RadosGW) to fail to create
997                 // empty objects.
998                 r = nil
999         } else {
1000                 r = NewCountingReader(r, b.stats.TickOutBytes)
1001         }
1002         err := b.Bucket().PutReader(path, r, length, contType, perm, options)
1003         b.stats.TickOps("put")
1004         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
1005         b.stats.TickErr(err)
1006         return err
1007 }
1008
1009 func (b *s3bucket) Del(path string) error {
1010         err := b.Bucket().Del(path)
1011         b.stats.TickOps("delete")
1012         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1013         b.stats.TickErr(err)
1014         return err
1015 }
1016
1017 type s3bucketStats struct {
1018         statsTicker
1019         Ops     uint64
1020         GetOps  uint64
1021         PutOps  uint64
1022         HeadOps uint64
1023         DelOps  uint64
1024         ListOps uint64
1025 }
1026
1027 func (s *s3bucketStats) TickErr(err error) {
1028         if err == nil {
1029                 return
1030         }
1031         errType := fmt.Sprintf("%T", err)
1032         if err, ok := err.(*s3.Error); ok {
1033                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1034         }
1035         s.statsTicker.TickErr(err, errType)
1036 }