Merge branch '14714-keep-balance-config'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bufio"
9         "bytes"
10         "context"
11         "crypto/sha256"
12         "encoding/base64"
13         "encoding/hex"
14         "encoding/json"
15         "errors"
16         "fmt"
17         "io"
18         "io/ioutil"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "strings"
24         "sync"
25         "sync/atomic"
26         "time"
27
28         "git.curoverse.com/arvados.git/sdk/go/arvados"
29         "github.com/AdRoll/goamz/aws"
30         "github.com/AdRoll/goamz/s3"
31         "github.com/prometheus/client_golang/prometheus"
32         "github.com/sirupsen/logrus"
33 )
34
35 func init() {
36         driver["S3"] = newS3Volume
37 }
38
39 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
40         v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
41         err := json.Unmarshal(volume.DriverParameters, &v)
42         if err != nil {
43                 return nil, err
44         }
45         return v, v.check()
46 }
47
48 func (v *S3Volume) check() error {
49         if v.Bucket == "" {
50                 return errors.New("DriverParameters: Bucket must be provided")
51         }
52         if v.IndexPageSize == 0 {
53                 v.IndexPageSize = 1000
54         }
55         if v.RaceWindow < 0 {
56                 return errors.New("DriverParameters: RaceWindow must not be negative")
57         }
58
59         var ok bool
60         v.region, ok = aws.Regions[v.Region]
61         if v.Endpoint == "" {
62                 if !ok {
63                         return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
64                 }
65         } else if ok {
66                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
67                         "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
68         } else {
69                 v.region = aws.Region{
70                         Name:                 v.Region,
71                         S3Endpoint:           v.Endpoint,
72                         S3LocationConstraint: v.LocationConstraint,
73                 }
74         }
75
76         // Zero timeouts mean "wait forever", which is a bad
77         // default. Default to long timeouts instead.
78         if v.ConnectTimeout == 0 {
79                 v.ConnectTimeout = s3DefaultConnectTimeout
80         }
81         if v.ReadTimeout == 0 {
82                 v.ReadTimeout = s3DefaultReadTimeout
83         }
84
85         v.bucket = &s3bucket{
86                 bucket: &s3.Bucket{
87                         S3:   v.newS3Client(),
88                         Name: v.Bucket,
89                 },
90         }
91         // Set up prometheus metrics
92         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
93         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
94
95         err := v.bootstrapIAMCredentials()
96         if err != nil {
97                 return fmt.Errorf("error getting IAM credentials: %s", err)
98         }
99
100         return nil
101 }
102
103 const (
104         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
105         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
106 )
107
108 var (
109         // ErrS3TrashDisabled is returned by Trash if that operation
110         // is impossible with the current config.
111         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
112
113         s3ACL = s3.Private
114
115         zeroTime time.Time
116 )
117
118 const (
119         maxClockSkew  = 600 * time.Second
120         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
121 )
122
123 func s3regions() (okList []string) {
124         for r := range aws.Regions {
125                 okList = append(okList, r)
126         }
127         return
128 }
129
130 // S3Volume implements Volume using an S3 bucket.
131 type S3Volume struct {
132         AccessKey          string
133         SecretKey          string
134         AuthToken          string    // populated automatically when IAMRole is used
135         AuthExpiration     time.Time // populated automatically when IAMRole is used
136         IAMRole            string
137         Endpoint           string
138         Region             string
139         Bucket             string
140         LocationConstraint bool
141         IndexPageSize      int
142         ConnectTimeout     arvados.Duration
143         ReadTimeout        arvados.Duration
144         RaceWindow         arvados.Duration
145         UnsafeDelete       bool
146
147         cluster   *arvados.Cluster
148         volume    arvados.Volume
149         logger    logrus.FieldLogger
150         metrics   *volumeMetricsVecs
151         bucket    *s3bucket
152         region    aws.Region
153         startOnce sync.Once
154 }
155
156 // GetDeviceID returns a globally unique ID for the storage bucket.
157 func (v *S3Volume) GetDeviceID() string {
158         return "s3://" + v.Endpoint + "/" + v.Bucket
159 }
160
161 func (v *S3Volume) bootstrapIAMCredentials() error {
162         if v.AccessKey != "" || v.SecretKey != "" {
163                 if v.IAMRole != "" {
164                         return errors.New("invalid DriverParameters: AccessKey and SecretKey must be blank if IAMRole is specified")
165                 }
166                 return nil
167         }
168         ttl, err := v.updateIAMCredentials()
169         if err != nil {
170                 return err
171         }
172         go func() {
173                 for {
174                         time.Sleep(ttl)
175                         ttl, err = v.updateIAMCredentials()
176                         if err != nil {
177                                 v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole)
178                                 ttl = time.Second
179                         } else if ttl < time.Second {
180                                 v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole)
181                                 ttl = time.Second
182                         }
183                 }
184         }()
185         return nil
186 }
187
188 func (v *S3Volume) newS3Client() *s3.S3 {
189         auth := aws.NewAuth(v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration)
190         client := s3.New(*auth, v.region)
191         if v.region.EC2Endpoint.Signer == aws.V4Signature {
192                 // Currently affects only eu-central-1
193                 client.Signature = aws.V4Signature
194         }
195         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
196         client.ReadTimeout = time.Duration(v.ReadTimeout)
197         return client
198 }
199
200 // returned by AWS metadata endpoint .../security-credentials/${rolename}
201 type iamCredentials struct {
202         Code            string
203         LastUpdated     time.Time
204         Type            string
205         AccessKeyID     string
206         SecretAccessKey string
207         Token           string
208         Expiration      time.Time
209 }
210
211 // Returns TTL of updated credentials, i.e., time to sleep until next
212 // update.
213 func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
214         ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
215         defer cancel()
216
217         metadataBaseURL := "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
218
219         var url string
220         if strings.Contains(v.IAMRole, "://") {
221                 // Configuration provides complete URL (used by tests)
222                 url = v.IAMRole
223         } else if v.IAMRole != "" {
224                 // Configuration provides IAM role name and we use the
225                 // AWS metadata endpoint
226                 url = metadataBaseURL + v.IAMRole
227         } else {
228                 url = metadataBaseURL
229                 v.logger.WithField("URL", url).Debug("looking up IAM role name")
230                 req, err := http.NewRequest("GET", url, nil)
231                 if err != nil {
232                         return 0, fmt.Errorf("error setting up request %s: %s", url, err)
233                 }
234                 resp, err := http.DefaultClient.Do(req.WithContext(ctx))
235                 if err != nil {
236                         return 0, fmt.Errorf("error getting %s: %s", url, err)
237                 }
238                 defer resp.Body.Close()
239                 if resp.StatusCode == http.StatusNotFound {
240                         return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKey and SecretKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status)
241                 } else if resp.StatusCode != http.StatusOK {
242                         return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
243                 }
244                 body := bufio.NewReader(resp.Body)
245                 var role string
246                 _, err = fmt.Fscanf(body, "%s\n", &role)
247                 if err != nil {
248                         return 0, fmt.Errorf("error reading response from %s: %s", url, err)
249                 }
250                 if n, _ := body.Read(make([]byte, 64)); n > 0 {
251                         v.logger.Warnf("ignoring additional data returned by metadata endpoint %s after the single role name that we expected", url)
252                 }
253                 v.logger.WithField("Role", role).Debug("looked up IAM role name")
254                 url = url + role
255         }
256
257         v.logger.WithField("URL", url).Debug("getting credentials")
258         req, err := http.NewRequest("GET", url, nil)
259         if err != nil {
260                 return 0, fmt.Errorf("error setting up request %s: %s", url, err)
261         }
262         resp, err := http.DefaultClient.Do(req.WithContext(ctx))
263         if err != nil {
264                 return 0, fmt.Errorf("error getting %s: %s", url, err)
265         }
266         defer resp.Body.Close()
267         if resp.StatusCode != http.StatusOK {
268                 return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
269         }
270         var cred iamCredentials
271         err = json.NewDecoder(resp.Body).Decode(&cred)
272         if err != nil {
273                 return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
274         }
275         v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
276         v.bucket.SetBucket(&s3.Bucket{
277                 S3:   v.newS3Client(),
278                 Name: v.Bucket,
279         })
280         // TTL is time from now to expiration, minus 5m.  "We make new
281         // credentials available at least five minutes before the
282         // expiration of the old credentials."  --
283         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
284         // (If that's not true, the returned ttl might be zero or
285         // negative, which the caller can handle.)
286         ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute
287         v.logger.WithFields(logrus.Fields{
288                 "AccessKeyID": cred.AccessKeyID,
289                 "LastUpdated": cred.LastUpdated,
290                 "Expiration":  cred.Expiration,
291                 "TTL":         arvados.Duration(ttl),
292         }).Debug("updated credentials")
293         return ttl, nil
294 }
295
296 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
297         ready := make(chan bool)
298         go func() {
299                 rdr, err = v.getReader(loc)
300                 close(ready)
301         }()
302         select {
303         case <-ready:
304                 return
305         case <-ctx.Done():
306                 v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
307                 go func() {
308                         <-ready
309                         if err == nil {
310                                 rdr.Close()
311                         }
312                 }()
313                 return nil, ctx.Err()
314         }
315 }
316
317 // getReader wraps (Bucket)GetReader.
318 //
319 // In situations where (Bucket)GetReader would fail because the block
320 // disappeared in a Trash race, getReader calls fixRace to recover the
321 // data, and tries again.
322 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
323         rdr, err = v.bucket.GetReader(loc)
324         err = v.translateError(err)
325         if err == nil || !os.IsNotExist(err) {
326                 return
327         }
328
329         _, err = v.bucket.Head("recent/"+loc, nil)
330         err = v.translateError(err)
331         if err != nil {
332                 // If we can't read recent/X, there's no point in
333                 // trying fixRace. Give up.
334                 return
335         }
336         if !v.fixRace(loc) {
337                 err = os.ErrNotExist
338                 return
339         }
340
341         rdr, err = v.bucket.GetReader(loc)
342         if err != nil {
343                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
344                 err = v.translateError(err)
345         }
346         return
347 }
348
349 // Get a block: copy the block data into buf, and return the number of
350 // bytes copied.
351 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
352         rdr, err := v.getReaderWithContext(ctx, loc)
353         if err != nil {
354                 return 0, err
355         }
356
357         var n int
358         ready := make(chan bool)
359         go func() {
360                 defer close(ready)
361
362                 defer rdr.Close()
363                 n, err = io.ReadFull(rdr, buf)
364
365                 switch err {
366                 case nil, io.EOF, io.ErrUnexpectedEOF:
367                         err = nil
368                 default:
369                         err = v.translateError(err)
370                 }
371         }()
372         select {
373         case <-ctx.Done():
374                 v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
375                 rdr.Close()
376                 // Must wait for ReadFull to return, to ensure it
377                 // doesn't write to buf after we return.
378                 v.logger.Debug("s3: waiting for ReadFull() to fail")
379                 <-ready
380                 return 0, ctx.Err()
381         case <-ready:
382                 return n, err
383         }
384 }
385
386 // Compare the given data with the stored data.
387 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
388         errChan := make(chan error, 1)
389         go func() {
390                 _, err := v.bucket.Head("recent/"+loc, nil)
391                 errChan <- err
392         }()
393         var err error
394         select {
395         case <-ctx.Done():
396                 return ctx.Err()
397         case err = <-errChan:
398         }
399         if err != nil {
400                 // Checking for "loc" itself here would interfere with
401                 // future GET requests.
402                 //
403                 // On AWS, if X doesn't exist, a HEAD or GET request
404                 // for X causes X's non-existence to be cached. Thus,
405                 // if we test for X, then create X and return a
406                 // signature to our client, the client might still get
407                 // 404 from all keepstores when trying to read it.
408                 //
409                 // To avoid this, we avoid doing HEAD X or GET X until
410                 // we know X has been written.
411                 //
412                 // Note that X might exist even though recent/X
413                 // doesn't: for example, the response to HEAD recent/X
414                 // might itself come from a stale cache. In such
415                 // cases, we will return a false negative and
416                 // PutHandler might needlessly create another replica
417                 // on a different volume. That's not ideal, but it's
418                 // better than passing the eventually-consistent
419                 // problem on to our clients.
420                 return v.translateError(err)
421         }
422         rdr, err := v.getReaderWithContext(ctx, loc)
423         if err != nil {
424                 return err
425         }
426         defer rdr.Close()
427         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
428 }
429
430 // Put writes a block.
431 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
432         if v.volume.ReadOnly {
433                 return MethodDisabledError
434         }
435         var opts s3.Options
436         size := len(block)
437         if size > 0 {
438                 md5, err := hex.DecodeString(loc)
439                 if err != nil {
440                         return err
441                 }
442                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
443                 // In AWS regions that use V4 signatures, we need to
444                 // provide ContentSHA256 up front. Otherwise, the S3
445                 // library reads the request body (from our buffer)
446                 // into another new buffer in order to compute the
447                 // SHA256 before sending the request -- which would
448                 // mean consuming 128 MiB of memory for the duration
449                 // of a 64 MiB write.
450                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
451         }
452
453         // Send the block data through a pipe, so that (if we need to)
454         // we can close the pipe early and abandon our PutReader()
455         // goroutine, without worrying about PutReader() accessing our
456         // block buffer after we release it.
457         bufr, bufw := io.Pipe()
458         go func() {
459                 io.Copy(bufw, bytes.NewReader(block))
460                 bufw.Close()
461         }()
462
463         var err error
464         ready := make(chan bool)
465         go func() {
466                 defer func() {
467                         if ctx.Err() != nil {
468                                 v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
469                         }
470                 }()
471                 defer close(ready)
472                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
473                 if err != nil {
474                         return
475                 }
476                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
477         }()
478         select {
479         case <-ctx.Done():
480                 v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
481                 // Our pipe might be stuck in Write(), waiting for
482                 // PutReader() to read. If so, un-stick it. This means
483                 // PutReader will get corrupt data, but that's OK: the
484                 // size and MD5 won't match, so the write will fail.
485                 go io.Copy(ioutil.Discard, bufr)
486                 // CloseWithError() will return once pending I/O is done.
487                 bufw.CloseWithError(ctx.Err())
488                 v.logger.Debugf("%s: abandoning PutReader goroutine", v)
489                 return ctx.Err()
490         case <-ready:
491                 // Unblock pipe in case PutReader did not consume it.
492                 io.Copy(ioutil.Discard, bufr)
493                 return v.translateError(err)
494         }
495 }
496
497 // Touch sets the timestamp for the given locator to the current time.
498 func (v *S3Volume) Touch(loc string) error {
499         if v.volume.ReadOnly {
500                 return MethodDisabledError
501         }
502         _, err := v.bucket.Head(loc, nil)
503         err = v.translateError(err)
504         if os.IsNotExist(err) && v.fixRace(loc) {
505                 // The data object got trashed in a race, but fixRace
506                 // rescued it.
507         } else if err != nil {
508                 return err
509         }
510         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
511         return v.translateError(err)
512 }
513
514 // Mtime returns the stored timestamp for the given locator.
515 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
516         _, err := v.bucket.Head(loc, nil)
517         if err != nil {
518                 return zeroTime, v.translateError(err)
519         }
520         resp, err := v.bucket.Head("recent/"+loc, nil)
521         err = v.translateError(err)
522         if os.IsNotExist(err) {
523                 // The data object X exists, but recent/X is missing.
524                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
525                 if err != nil {
526                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
527                         return zeroTime, v.translateError(err)
528                 }
529                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
530                 resp, err = v.bucket.Head("recent/"+loc, nil)
531                 if err != nil {
532                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
533                         return zeroTime, v.translateError(err)
534                 }
535         } else if err != nil {
536                 // HEAD recent/X failed for some other reason.
537                 return zeroTime, err
538         }
539         return v.lastModified(resp)
540 }
541
542 // IndexTo writes a complete list of locators with the given prefix
543 // for which Get() can retrieve data.
544 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
545         // Use a merge sort to find matching sets of X and recent/X.
546         dataL := s3Lister{
547                 Bucket:   v.bucket.Bucket(),
548                 Prefix:   prefix,
549                 PageSize: v.IndexPageSize,
550                 Stats:    &v.bucket.stats,
551         }
552         recentL := s3Lister{
553                 Bucket:   v.bucket.Bucket(),
554                 Prefix:   "recent/" + prefix,
555                 PageSize: v.IndexPageSize,
556                 Stats:    &v.bucket.stats,
557         }
558         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
559                 if data.Key >= "g" {
560                         // Conveniently, "recent/*" and "trash/*" are
561                         // lexically greater than all hex-encoded data
562                         // hashes, so stopping here avoids iterating
563                         // over all of them needlessly with dataL.
564                         break
565                 }
566                 if !v.isKeepBlock(data.Key) {
567                         continue
568                 }
569
570                 // stamp is the list entry we should use to report the
571                 // last-modified time for this data block: it will be
572                 // the recent/X entry if one exists, otherwise the
573                 // entry for the data block itself.
574                 stamp := data
575
576                 // Advance to the corresponding recent/X marker, if any
577                 for recent != nil && recentL.Error() == nil {
578                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
579                                 recent = recentL.Next()
580                                 continue
581                         } else if cmp == 0 {
582                                 stamp = recent
583                                 recent = recentL.Next()
584                                 break
585                         } else {
586                                 // recent/X marker is missing: we'll
587                                 // use the timestamp on the data
588                                 // object.
589                                 break
590                         }
591                 }
592                 if err := recentL.Error(); err != nil {
593                         return err
594                 }
595                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
596                 if err != nil {
597                         return err
598                 }
599                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
600         }
601         return dataL.Error()
602 }
603
604 // Trash a Keep block.
605 func (v *S3Volume) Trash(loc string) error {
606         if v.volume.ReadOnly {
607                 return MethodDisabledError
608         }
609         if t, err := v.Mtime(loc); err != nil {
610                 return err
611         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
612                 return nil
613         }
614         if v.cluster.Collections.BlobTrashLifetime == 0 {
615                 if !v.UnsafeDelete {
616                         return ErrS3TrashDisabled
617                 }
618                 return v.translateError(v.bucket.Del(loc))
619         }
620         err := v.checkRaceWindow(loc)
621         if err != nil {
622                 return err
623         }
624         err = v.safeCopy("trash/"+loc, loc)
625         if err != nil {
626                 return err
627         }
628         return v.translateError(v.bucket.Del(loc))
629 }
630
631 // checkRaceWindow returns a non-nil error if trash/loc is, or might
632 // be, in the race window (i.e., it's not safe to trash loc).
633 func (v *S3Volume) checkRaceWindow(loc string) error {
634         resp, err := v.bucket.Head("trash/"+loc, nil)
635         err = v.translateError(err)
636         if os.IsNotExist(err) {
637                 // OK, trash/X doesn't exist so we're not in the race
638                 // window
639                 return nil
640         } else if err != nil {
641                 // Error looking up trash/X. We don't know whether
642                 // we're in the race window
643                 return err
644         }
645         t, err := v.lastModified(resp)
646         if err != nil {
647                 // Can't parse timestamp
648                 return err
649         }
650         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
651         if safeWindow <= 0 {
652                 // We can't count on "touch trash/X" to prolong
653                 // trash/X's lifetime. The new timestamp might not
654                 // become visible until now+raceWindow, and EmptyTrash
655                 // is allowed to delete trash/X before then.
656                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
657         }
658         // trash/X exists, but it won't be eligible for deletion until
659         // after now+raceWindow, so it's safe to overwrite it.
660         return nil
661 }
662
663 // safeCopy calls PutCopy, and checks the response to make sure the
664 // copy succeeded and updated the timestamp on the destination object
665 // (PutCopy returns 200 OK if the request was received, even if the
666 // copy failed).
667 func (v *S3Volume) safeCopy(dst, src string) error {
668         resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{
669                 ContentType:       "application/octet-stream",
670                 MetadataDirective: "REPLACE",
671         }, v.bucket.Bucket().Name+"/"+src)
672         err = v.translateError(err)
673         if os.IsNotExist(err) {
674                 return err
675         } else if err != nil {
676                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err)
677         }
678         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
679                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
680         } else if time.Now().Sub(t) > maxClockSkew {
681                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
682         }
683         return nil
684 }
685
686 // Get the LastModified header from resp, and parse it as RFC1123 or
687 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
688 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
689         s := resp.Header.Get("Last-Modified")
690         t, err = time.Parse(time.RFC1123, s)
691         if err != nil && s != "" {
692                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
693                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
694                 // as required by HTTP spec. If it's not a valid HTTP
695                 // header value, it's probably AWS (or s3test) giving
696                 // us a nearly-RFC1123 timestamp.
697                 t, err = time.Parse(nearlyRFC1123, s)
698         }
699         return
700 }
701
702 // Untrash moves block from trash back into store
703 func (v *S3Volume) Untrash(loc string) error {
704         err := v.safeCopy(loc, "trash/"+loc)
705         if err != nil {
706                 return err
707         }
708         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
709         return v.translateError(err)
710 }
711
712 // Status returns a *VolumeStatus representing the current in-use
713 // storage capacity and a fake available capacity that doesn't make
714 // the volume seem full or nearly-full.
715 func (v *S3Volume) Status() *VolumeStatus {
716         return &VolumeStatus{
717                 DeviceNum: 1,
718                 BytesFree: BlockSize * 1000,
719                 BytesUsed: 1,
720         }
721 }
722
723 // InternalStats returns bucket I/O and API call counters.
724 func (v *S3Volume) InternalStats() interface{} {
725         return &v.bucket.stats
726 }
727
728 // String implements fmt.Stringer.
729 func (v *S3Volume) String() string {
730         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
731 }
732
733 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
734
735 func (v *S3Volume) isKeepBlock(s string) bool {
736         return s3KeepBlockRegexp.MatchString(s)
737 }
738
739 // fixRace(X) is called when "recent/X" exists but "X" doesn't
740 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
741 // there was a race between Put and Trash, fixRace recovers from the
742 // race by Untrashing the block.
743 func (v *S3Volume) fixRace(loc string) bool {
744         trash, err := v.bucket.Head("trash/"+loc, nil)
745         if err != nil {
746                 if !os.IsNotExist(v.translateError(err)) {
747                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
748                 }
749                 return false
750         }
751         trashTime, err := v.lastModified(trash)
752         if err != nil {
753                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
754                 return false
755         }
756
757         recent, err := v.bucket.Head("recent/"+loc, nil)
758         if err != nil {
759                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
760                 return false
761         }
762         recentTime, err := v.lastModified(recent)
763         if err != nil {
764                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
765                 return false
766         }
767
768         ageWhenTrashed := trashTime.Sub(recentTime)
769         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
770                 // No evidence of a race: block hasn't been written
771                 // since it became eligible for Trash. No fix needed.
772                 return false
773         }
774
775         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
776         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
777         err = v.safeCopy(loc, "trash/"+loc)
778         if err != nil {
779                 log.Printf("error: fixRace: %s", err)
780                 return false
781         }
782         return true
783 }
784
785 func (v *S3Volume) translateError(err error) error {
786         switch err := err.(type) {
787         case *s3.Error:
788                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
789                         strings.Contains(err.Error(), "Not Found") {
790                         return os.ErrNotExist
791                 }
792                 // Other 404 errors like NoSuchVersion and
793                 // NoSuchBucket are different problems which should
794                 // get called out downstream, so we don't convert them
795                 // to os.ErrNotExist.
796         }
797         return err
798 }
799
800 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
801 // and deletes them from the volume.
802 func (v *S3Volume) EmptyTrash() {
803         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
804                 return
805         }
806
807         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
808
809         // Define "ready to delete" as "...when EmptyTrash started".
810         startT := time.Now()
811
812         emptyOneKey := func(trash *s3.Key) {
813                 loc := trash.Key[6:]
814                 if !v.isKeepBlock(loc) {
815                         return
816                 }
817                 atomic.AddInt64(&bytesInTrash, trash.Size)
818                 atomic.AddInt64(&blocksInTrash, 1)
819
820                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
821                 if err != nil {
822                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
823                         return
824                 }
825                 recent, err := v.bucket.Head("recent/"+loc, nil)
826                 if err != nil && os.IsNotExist(v.translateError(err)) {
827                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
828                         err = v.Untrash(loc)
829                         if err != nil {
830                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
831                         }
832                         return
833                 } else if err != nil {
834                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
835                         return
836                 }
837                 recentT, err := v.lastModified(recent)
838                 if err != nil {
839                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
840                         return
841                 }
842                 if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
843                         if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
844                                 // recent/loc is too old to protect
845                                 // loc from being Trashed again during
846                                 // the raceWindow that starts if we
847                                 // delete trash/X now.
848                                 //
849                                 // Note this means (TrashSweepInterval
850                                 // < BlobSigningTTL - raceWindow) is
851                                 // necessary to avoid starvation.
852                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
853                                 v.fixRace(loc)
854                                 v.Touch(loc)
855                                 return
856                         }
857                         _, err := v.bucket.Head(loc, nil)
858                         if os.IsNotExist(err) {
859                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
860                                 v.fixRace(loc)
861                                 return
862                         } else if err != nil {
863                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
864                                 return
865                         }
866                 }
867                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
868                         return
869                 }
870                 err = v.bucket.Del(trash.Key)
871                 if err != nil {
872                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
873                         return
874                 }
875                 atomic.AddInt64(&bytesDeleted, trash.Size)
876                 atomic.AddInt64(&blocksDeleted, 1)
877
878                 _, err = v.bucket.Head(loc, nil)
879                 if err == nil {
880                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
881                         return
882                 }
883                 if !os.IsNotExist(v.translateError(err)) {
884                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
885                         return
886                 }
887                 err = v.bucket.Del("recent/" + loc)
888                 if err != nil {
889                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
890                 }
891         }
892
893         var wg sync.WaitGroup
894         todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency)
895         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
896                 wg.Add(1)
897                 go func() {
898                         defer wg.Done()
899                         for key := range todo {
900                                 emptyOneKey(key)
901                         }
902                 }()
903         }
904
905         trashL := s3Lister{
906                 Bucket:   v.bucket.Bucket(),
907                 Prefix:   "trash/",
908                 PageSize: v.IndexPageSize,
909                 Stats:    &v.bucket.stats,
910         }
911         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
912                 todo <- trash
913         }
914         close(todo)
915         wg.Wait()
916
917         if err := trashL.Error(); err != nil {
918                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
919         }
920         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
921 }
922
923 type s3Lister struct {
924         Bucket     *s3.Bucket
925         Prefix     string
926         PageSize   int
927         Stats      *s3bucketStats
928         nextMarker string
929         buf        []s3.Key
930         err        error
931 }
932
933 // First fetches the first page and returns the first item. It returns
934 // nil if the response is the empty set or an error occurs.
935 func (lister *s3Lister) First() *s3.Key {
936         lister.getPage()
937         return lister.pop()
938 }
939
940 // Next returns the next item, fetching the next page if necessary. It
941 // returns nil if the last available item has already been fetched, or
942 // an error occurs.
943 func (lister *s3Lister) Next() *s3.Key {
944         if len(lister.buf) == 0 && lister.nextMarker != "" {
945                 lister.getPage()
946         }
947         return lister.pop()
948 }
949
950 // Return the most recent error encountered by First or Next.
951 func (lister *s3Lister) Error() error {
952         return lister.err
953 }
954
955 func (lister *s3Lister) getPage() {
956         lister.Stats.TickOps("list")
957         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
958         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
959         lister.nextMarker = ""
960         if err != nil {
961                 lister.err = err
962                 return
963         }
964         if resp.IsTruncated {
965                 lister.nextMarker = resp.NextMarker
966         }
967         lister.buf = make([]s3.Key, 0, len(resp.Contents))
968         for _, key := range resp.Contents {
969                 if !strings.HasPrefix(key.Key, lister.Prefix) {
970                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
971                         continue
972                 }
973                 lister.buf = append(lister.buf, key)
974         }
975 }
976
977 func (lister *s3Lister) pop() (k *s3.Key) {
978         if len(lister.buf) > 0 {
979                 k = &lister.buf[0]
980                 lister.buf = lister.buf[1:]
981         }
982         return
983 }
984
985 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
986 // wrapped bucket can be replaced atomically with SetBucket in order
987 // to update credentials.
988 type s3bucket struct {
989         bucket *s3.Bucket
990         stats  s3bucketStats
991         mu     sync.Mutex
992 }
993
994 func (b *s3bucket) Bucket() *s3.Bucket {
995         b.mu.Lock()
996         defer b.mu.Unlock()
997         return b.bucket
998 }
999
1000 func (b *s3bucket) SetBucket(bucket *s3.Bucket) {
1001         b.mu.Lock()
1002         defer b.mu.Unlock()
1003         b.bucket = bucket
1004 }
1005
1006 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
1007         rdr, err := b.Bucket().GetReader(path)
1008         b.stats.TickOps("get")
1009         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
1010         b.stats.TickErr(err)
1011         return NewCountingReader(rdr, b.stats.TickInBytes), err
1012 }
1013
1014 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
1015         resp, err := b.Bucket().Head(path, headers)
1016         b.stats.TickOps("head")
1017         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
1018         b.stats.TickErr(err)
1019         return resp, err
1020 }
1021
1022 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
1023         if length == 0 {
1024                 // goamz will only send Content-Length: 0 when reader
1025                 // is nil due to net.http.Request.ContentLength
1026                 // behavior.  Otherwise, Content-Length header is
1027                 // omitted which will cause some S3 services
1028                 // (including AWS and Ceph RadosGW) to fail to create
1029                 // empty objects.
1030                 r = nil
1031         } else {
1032                 r = NewCountingReader(r, b.stats.TickOutBytes)
1033         }
1034         err := b.Bucket().PutReader(path, r, length, contType, perm, options)
1035         b.stats.TickOps("put")
1036         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
1037         b.stats.TickErr(err)
1038         return err
1039 }
1040
1041 func (b *s3bucket) Del(path string) error {
1042         err := b.Bucket().Del(path)
1043         b.stats.TickOps("delete")
1044         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
1045         b.stats.TickErr(err)
1046         return err
1047 }
1048
1049 type s3bucketStats struct {
1050         statsTicker
1051         Ops     uint64
1052         GetOps  uint64
1053         PutOps  uint64
1054         HeadOps uint64
1055         DelOps  uint64
1056         ListOps uint64
1057 }
1058
1059 func (s *s3bucketStats) TickErr(err error) {
1060         if err == nil {
1061                 return
1062         }
1063         errType := fmt.Sprintf("%T", err)
1064         if err, ok := err.(*s3.Error); ok {
1065                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
1066         }
1067         s.statsTicker.TickErr(err, errType)
1068 }