20690: Remove obsolete docs/comments about wb1.
[arvados.git] / services / keepstore / s3aws_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "os"
17         "regexp"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/aws/awserr"
26         "github.com/aws/aws-sdk-go-v2/aws/defaults"
27         "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
28         "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
29         "github.com/aws/aws-sdk-go-v2/aws/endpoints"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34 )
35
36 func init() {
37         driver["S3"] = newS3AWSVolume
38 }
39
40 const (
41         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
42         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
43         maxClockSkew            = 600 * time.Second
44         nearlyRFC1123           = "Mon, 2 Jan 2006 15:04:05 GMT"
45 )
46
47 var (
48         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
49 )
50
51 // S3AWSVolume implements Volume using an S3 bucket.
52 type S3AWSVolume struct {
53         arvados.S3VolumeDriverParameters
54         AuthToken      string    // populated automatically when IAMRole is used
55         AuthExpiration time.Time // populated automatically when IAMRole is used
56
57         cluster   *arvados.Cluster
58         volume    arvados.Volume
59         logger    logrus.FieldLogger
60         metrics   *volumeMetricsVecs
61         bucket    *s3AWSbucket
62         region    string
63         startOnce sync.Once
64 }
65
66 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
67 // wrapped bucket can be replaced atomically with SetBucket in order
68 // to update credentials.
69 type s3AWSbucket struct {
70         bucket string
71         svc    *s3.Client
72         stats  s3awsbucketStats
73         mu     sync.Mutex
74 }
75
76 const (
77         PartSize         = 5 * 1024 * 1024
78         ReadConcurrency  = 13
79         WriteConcurrency = 5
80 )
81
82 var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
83 var s3AWSZeroTime time.Time
84
85 func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
86         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
87                 s = s[v.PrefixLength+1:]
88         }
89         return s, s3AWSKeepBlockRegexp.MatchString(s)
90 }
91
92 // Return the key used for a given loc. If PrefixLength==0 then
93 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
94 // "abc/abcdef0123", etc.
95 func (v *S3AWSVolume) key(loc string) string {
96         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
97                 return loc[:v.PrefixLength] + "/" + loc
98         } else {
99                 return loc
100         }
101 }
102
103 func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
104         v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
105         err := json.Unmarshal(volume.DriverParameters, v)
106         if err != nil {
107                 return nil, err
108         }
109         v.logger = logger.WithField("Volume", v.String())
110         return v, v.check("")
111 }
112
113 func (v *S3AWSVolume) translateError(err error) error {
114         if _, ok := err.(*aws.RequestCanceledError); ok {
115                 return context.Canceled
116         } else if aerr, ok := err.(awserr.Error); ok {
117                 if aerr.Code() == "NotFound" {
118                         return os.ErrNotExist
119                 } else if aerr.Code() == "NoSuchKey" {
120                         return os.ErrNotExist
121                 }
122         }
123         return err
124 }
125
126 // safeCopy calls CopyObjectRequest, and checks the response to make
127 // sure the copy succeeded and updated the timestamp on the
128 // destination object
129 //
130 // (If something goes wrong during the copy, the error will be
131 // embedded in the 200 OK response)
132 func (v *S3AWSVolume) safeCopy(dst, src string) error {
133         input := &s3.CopyObjectInput{
134                 Bucket:      aws.String(v.bucket.bucket),
135                 ContentType: aws.String("application/octet-stream"),
136                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
137                 Key:         aws.String(dst),
138         }
139
140         req := v.bucket.svc.CopyObjectRequest(input)
141         resp, err := req.Send(context.Background())
142
143         err = v.translateError(err)
144         if os.IsNotExist(err) {
145                 return err
146         } else if err != nil {
147                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
148         }
149
150         if resp.CopyObjectResult.LastModified == nil {
151                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
152         } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
153                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
154         }
155         return nil
156 }
157
158 func (v *S3AWSVolume) check(ec2metadataHostname string) error {
159         if v.Bucket == "" {
160                 return errors.New("DriverParameters: Bucket must be provided")
161         }
162         if v.IndexPageSize == 0 {
163                 v.IndexPageSize = 1000
164         }
165         if v.RaceWindow < 0 {
166                 return errors.New("DriverParameters: RaceWindow must not be negative")
167         }
168
169         if v.V2Signature {
170                 return errors.New("DriverParameters: V2Signature is not supported")
171         }
172
173         defaultResolver := endpoints.NewDefaultResolver()
174
175         cfg := defaults.Config()
176
177         if v.Endpoint == "" && v.Region == "" {
178                 return fmt.Errorf("AWS region or endpoint must be specified")
179         } else if v.Endpoint != "" || ec2metadataHostname != "" {
180                 myCustomResolver := func(service, region string) (aws.Endpoint, error) {
181                         if v.Endpoint != "" && service == "s3" {
182                                 return aws.Endpoint{
183                                         URL:           v.Endpoint,
184                                         SigningRegion: region,
185                                 }, nil
186                         } else if service == "ec2metadata" && ec2metadataHostname != "" {
187                                 return aws.Endpoint{
188                                         URL: ec2metadataHostname,
189                                 }, nil
190                         } else {
191                                 return defaultResolver.ResolveEndpoint(service, region)
192                         }
193                 }
194                 cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
195         }
196         if v.Region == "" {
197                 // Endpoint is already specified (otherwise we would
198                 // have errored out above), but Region is also
199                 // required by the aws sdk, in order to determine
200                 // SignatureVersions.
201                 v.Region = "us-east-1"
202         }
203         cfg.Region = v.Region
204
205         // Zero timeouts mean "wait forever", which is a bad
206         // default. Default to long timeouts instead.
207         if v.ConnectTimeout == 0 {
208                 v.ConnectTimeout = s3DefaultConnectTimeout
209         }
210         if v.ReadTimeout == 0 {
211                 v.ReadTimeout = s3DefaultReadTimeout
212         }
213
214         creds := aws.NewChainProvider(
215                 []aws.CredentialsProvider{
216                         aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
217                         ec2rolecreds.New(ec2metadata.New(cfg)),
218                 })
219
220         cfg.Credentials = creds
221
222         v.bucket = &s3AWSbucket{
223                 bucket: v.Bucket,
224                 svc:    s3.New(cfg),
225         }
226
227         // Set up prometheus metrics
228         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
229         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
230
231         return nil
232 }
233
234 // String implements fmt.Stringer.
235 func (v *S3AWSVolume) String() string {
236         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
237 }
238
239 // GetDeviceID returns a globally unique ID for the storage bucket.
240 func (v *S3AWSVolume) GetDeviceID() string {
241         return "s3://" + v.Endpoint + "/" + v.Bucket
242 }
243
244 // Compare the given data with the stored data.
245 func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
246         key := v.key(loc)
247         errChan := make(chan error, 1)
248         go func() {
249                 _, err := v.head("recent/" + key)
250                 errChan <- err
251         }()
252         var err error
253         select {
254         case <-ctx.Done():
255                 return ctx.Err()
256         case err = <-errChan:
257         }
258         if err != nil {
259                 // Checking for the key itself here would interfere
260                 // with future GET requests.
261                 //
262                 // On AWS, if X doesn't exist, a HEAD or GET request
263                 // for X causes X's non-existence to be cached. Thus,
264                 // if we test for X, then create X and return a
265                 // signature to our client, the client might still get
266                 // 404 from all keepstores when trying to read it.
267                 //
268                 // To avoid this, we avoid doing HEAD X or GET X until
269                 // we know X has been written.
270                 //
271                 // Note that X might exist even though recent/X
272                 // doesn't: for example, the response to HEAD recent/X
273                 // might itself come from a stale cache. In such
274                 // cases, we will return a false negative and
275                 // PutHandler might needlessly create another replica
276                 // on a different volume. That's not ideal, but it's
277                 // better than passing the eventually-consistent
278                 // problem on to our clients.
279                 return v.translateError(err)
280         }
281
282         input := &s3.GetObjectInput{
283                 Bucket: aws.String(v.bucket.bucket),
284                 Key:    aws.String(key),
285         }
286
287         req := v.bucket.svc.GetObjectRequest(input)
288         result, err := req.Send(ctx)
289         if err != nil {
290                 return v.translateError(err)
291         }
292         return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32]))
293 }
294
295 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
296 // and deletes them from the volume.
297 func (v *S3AWSVolume) EmptyTrash() {
298         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
299
300         // Define "ready to delete" as "...when EmptyTrash started".
301         startT := time.Now()
302
303         emptyOneKey := func(trash *s3.Object) {
304                 key := strings.TrimPrefix(*trash.Key, "trash/")
305                 loc, isblk := v.isKeepBlock(key)
306                 if !isblk {
307                         return
308                 }
309                 atomic.AddInt64(&bytesInTrash, *trash.Size)
310                 atomic.AddInt64(&blocksInTrash, 1)
311
312                 trashT := *trash.LastModified
313                 recent, err := v.head("recent/" + key)
314                 if err != nil && os.IsNotExist(v.translateError(err)) {
315                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
316                         err = v.Untrash(loc)
317                         if err != nil {
318                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
319                         }
320                         return
321                 } else if err != nil {
322                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
323                         return
324                 }
325                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
326                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
327                                 // recent/key is too old to protect
328                                 // loc from being Trashed again during
329                                 // the raceWindow that starts if we
330                                 // delete trash/X now.
331                                 //
332                                 // Note this means (TrashSweepInterval
333                                 // < BlobSigningTTL - raceWindow) is
334                                 // necessary to avoid starvation.
335                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
336                                 v.fixRace(key)
337                                 v.Touch(loc)
338                                 return
339                         }
340                         _, err := v.head(key)
341                         if os.IsNotExist(err) {
342                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
343                                 v.fixRace(key)
344                                 return
345                         } else if err != nil {
346                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
347                                 return
348                         }
349                 }
350                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
351                         return
352                 }
353                 err = v.bucket.Del(*trash.Key)
354                 if err != nil {
355                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
356                         return
357                 }
358                 atomic.AddInt64(&bytesDeleted, *trash.Size)
359                 atomic.AddInt64(&blocksDeleted, 1)
360
361                 _, err = v.head(*trash.Key)
362                 if err == nil {
363                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
364                         return
365                 }
366                 if !os.IsNotExist(v.translateError(err)) {
367                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
368                         return
369                 }
370                 err = v.bucket.Del("recent/" + key)
371                 if err != nil {
372                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
373                 }
374         }
375
376         var wg sync.WaitGroup
377         todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
378         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
379                 wg.Add(1)
380                 go func() {
381                         defer wg.Done()
382                         for key := range todo {
383                                 emptyOneKey(key)
384                         }
385                 }()
386         }
387
388         trashL := s3awsLister{
389                 Logger:   v.logger,
390                 Bucket:   v.bucket,
391                 Prefix:   "trash/",
392                 PageSize: v.IndexPageSize,
393                 Stats:    &v.bucket.stats,
394         }
395         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
396                 todo <- trash
397         }
398         close(todo)
399         wg.Wait()
400
401         if err := trashL.Error(); err != nil {
402                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
403         }
404         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
405 }
406
407 // fixRace(X) is called when "recent/X" exists but "X" doesn't
408 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
409 // was a race between Put and Trash, fixRace recovers from the race by
410 // Untrashing the block.
411 func (v *S3AWSVolume) fixRace(key string) bool {
412         trash, err := v.head("trash/" + key)
413         if err != nil {
414                 if !os.IsNotExist(v.translateError(err)) {
415                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
416                 }
417                 return false
418         }
419
420         recent, err := v.head("recent/" + key)
421         if err != nil {
422                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
423                 return false
424         }
425
426         recentTime := *recent.LastModified
427         trashTime := *trash.LastModified
428         ageWhenTrashed := trashTime.Sub(recentTime)
429         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
430                 // No evidence of a race: block hasn't been written
431                 // since it became eligible for Trash. No fix needed.
432                 return false
433         }
434
435         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
436         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
437         err = v.safeCopy(key, "trash/"+key)
438         if err != nil {
439                 v.logger.WithError(err).Error("fixRace: copy failed")
440                 return false
441         }
442         return true
443 }
444
445 func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
446         input := &s3.HeadObjectInput{
447                 Bucket: aws.String(v.bucket.bucket),
448                 Key:    aws.String(key),
449         }
450
451         req := v.bucket.svc.HeadObjectRequest(input)
452         res, err := req.Send(context.TODO())
453
454         v.bucket.stats.TickOps("head")
455         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
456         v.bucket.stats.TickErr(err)
457
458         if err != nil {
459                 return nil, v.translateError(err)
460         }
461         result = res.HeadObjectOutput
462         return
463 }
464
465 // Get a block: copy the block data into buf, and return the number of
466 // bytes copied.
467 func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
468         // Do not use getWithPipe here: the BlockReader interface does not pass
469         // through 'buf []byte', and we don't want to allocate two buffers for each
470         // read request. Instead, use a version of ReadBlock that accepts 'buf []byte'
471         // as an input.
472         key := v.key(loc)
473         count, err := v.readWorker(ctx, key, buf)
474         if err == nil {
475                 return count, err
476         }
477
478         err = v.translateError(err)
479         if !os.IsNotExist(err) {
480                 return 0, err
481         }
482
483         _, err = v.head("recent/" + key)
484         err = v.translateError(err)
485         if err != nil {
486                 // If we can't read recent/X, there's no point in
487                 // trying fixRace. Give up.
488                 return 0, err
489         }
490         if !v.fixRace(key) {
491                 err = os.ErrNotExist
492                 return 0, err
493         }
494
495         count, err = v.readWorker(ctx, key, buf)
496         if err != nil {
497                 v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
498                 err = v.translateError(err)
499                 return 0, err
500         }
501         return count, err
502 }
503
504 func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (int, error) {
505         awsBuf := aws.NewWriteAtBuffer(buf)
506         downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
507                 u.PartSize = PartSize
508                 u.Concurrency = ReadConcurrency
509         })
510
511         v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
512
513         count, err := downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
514                 Bucket: aws.String(v.bucket.bucket),
515                 Key:    aws.String(key),
516         })
517         v.bucket.stats.TickOps("get")
518         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
519         v.bucket.stats.TickErr(err)
520         v.bucket.stats.TickInBytes(uint64(count))
521         return int(count), v.translateError(err)
522 }
523
524 func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
525         if r == nil {
526                 // r == nil leads to a memory violation in func readFillBuf in
527                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
528                 r = bytes.NewReader(nil)
529         }
530
531         uploadInput := s3manager.UploadInput{
532                 Bucket: aws.String(v.bucket.bucket),
533                 Key:    aws.String(key),
534                 Body:   r,
535         }
536
537         if loc, ok := v.isKeepBlock(key); ok {
538                 var contentMD5 string
539                 md5, err := hex.DecodeString(loc)
540                 if err != nil {
541                         return v.translateError(err)
542                 }
543                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
544                 uploadInput.ContentMD5 = &contentMD5
545         }
546
547         // Experimentation indicated that using concurrency 5 yields the best
548         // throughput, better than higher concurrency (10 or 13) by ~5%.
549         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
550         // is detrimental to througput (minus ~15%).
551         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
552                 u.PartSize = PartSize
553                 u.Concurrency = WriteConcurrency
554         })
555
556         // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
557         // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
558         // block, so there is no extra memory use to be concerned about. See
559         // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
560         // calculating the Sha-256 because we don't need it; we already use md5sum
561         // hashes that match the name of the block.
562         _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
563                 r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
564         }))
565
566         v.bucket.stats.TickOps("put")
567         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
568         v.bucket.stats.TickErr(err)
569
570         return v.translateError(err)
571 }
572
573 // Put writes a block.
574 func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
575         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
576         // sdk to avoid memory allocation there. See #17339 for more information.
577         if v.volume.ReadOnly {
578                 return MethodDisabledError
579         }
580
581         rdr := bytes.NewReader(block)
582         r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
583         key := v.key(loc)
584         err := v.writeObject(ctx, key, r)
585         if err != nil {
586                 return err
587         }
588         return v.writeObject(ctx, "recent/"+key, nil)
589 }
590
591 type s3awsLister struct {
592         Logger            logrus.FieldLogger
593         Bucket            *s3AWSbucket
594         Prefix            string
595         PageSize          int
596         Stats             *s3awsbucketStats
597         ContinuationToken string
598         buf               []s3.Object
599         err               error
600 }
601
602 // First fetches the first page and returns the first item. It returns
603 // nil if the response is the empty set or an error occurs.
604 func (lister *s3awsLister) First() *s3.Object {
605         lister.getPage()
606         return lister.pop()
607 }
608
609 // Next returns the next item, fetching the next page if necessary. It
610 // returns nil if the last available item has already been fetched, or
611 // an error occurs.
612 func (lister *s3awsLister) Next() *s3.Object {
613         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
614                 lister.getPage()
615         }
616         return lister.pop()
617 }
618
619 // Return the most recent error encountered by First or Next.
620 func (lister *s3awsLister) Error() error {
621         return lister.err
622 }
623
624 func (lister *s3awsLister) getPage() {
625         lister.Stats.TickOps("list")
626         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
627
628         var input *s3.ListObjectsV2Input
629         if lister.ContinuationToken == "" {
630                 input = &s3.ListObjectsV2Input{
631                         Bucket:  aws.String(lister.Bucket.bucket),
632                         MaxKeys: aws.Int64(int64(lister.PageSize)),
633                         Prefix:  aws.String(lister.Prefix),
634                 }
635         } else {
636                 input = &s3.ListObjectsV2Input{
637                         Bucket:            aws.String(lister.Bucket.bucket),
638                         MaxKeys:           aws.Int64(int64(lister.PageSize)),
639                         Prefix:            aws.String(lister.Prefix),
640                         ContinuationToken: &lister.ContinuationToken,
641                 }
642         }
643
644         req := lister.Bucket.svc.ListObjectsV2Request(input)
645         resp, err := req.Send(context.Background())
646         if err != nil {
647                 if aerr, ok := err.(awserr.Error); ok {
648                         lister.err = aerr
649                 } else {
650                         lister.err = err
651                 }
652                 return
653         }
654
655         if *resp.IsTruncated {
656                 lister.ContinuationToken = *resp.NextContinuationToken
657         } else {
658                 lister.ContinuationToken = ""
659         }
660         lister.buf = make([]s3.Object, 0, len(resp.Contents))
661         for _, key := range resp.Contents {
662                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
663                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
664                         continue
665                 }
666                 lister.buf = append(lister.buf, key)
667         }
668 }
669
670 func (lister *s3awsLister) pop() (k *s3.Object) {
671         if len(lister.buf) > 0 {
672                 k = &lister.buf[0]
673                 lister.buf = lister.buf[1:]
674         }
675         return
676 }
677
678 // IndexTo writes a complete list of locators with the given prefix
679 // for which Get() can retrieve data.
680 func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
681         prefix = v.key(prefix)
682         // Use a merge sort to find matching sets of X and recent/X.
683         dataL := s3awsLister{
684                 Logger:   v.logger,
685                 Bucket:   v.bucket,
686                 Prefix:   prefix,
687                 PageSize: v.IndexPageSize,
688                 Stats:    &v.bucket.stats,
689         }
690         recentL := s3awsLister{
691                 Logger:   v.logger,
692                 Bucket:   v.bucket,
693                 Prefix:   "recent/" + prefix,
694                 PageSize: v.IndexPageSize,
695                 Stats:    &v.bucket.stats,
696         }
697         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
698                 if *data.Key >= "g" {
699                         // Conveniently, "recent/*" and "trash/*" are
700                         // lexically greater than all hex-encoded data
701                         // hashes, so stopping here avoids iterating
702                         // over all of them needlessly with dataL.
703                         break
704                 }
705                 loc, isblk := v.isKeepBlock(*data.Key)
706                 if !isblk {
707                         continue
708                 }
709
710                 // stamp is the list entry we should use to report the
711                 // last-modified time for this data block: it will be
712                 // the recent/X entry if one exists, otherwise the
713                 // entry for the data block itself.
714                 stamp := data
715
716                 // Advance to the corresponding recent/X marker, if any
717                 for recent != nil && recentL.Error() == nil {
718                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
719                                 recent = recentL.Next()
720                                 continue
721                         } else if cmp == 0 {
722                                 stamp = recent
723                                 recent = recentL.Next()
724                                 break
725                         } else {
726                                 // recent/X marker is missing: we'll
727                                 // use the timestamp on the data
728                                 // object.
729                                 break
730                         }
731                 }
732                 if err := recentL.Error(); err != nil {
733                         return err
734                 }
735                 // We truncate sub-second precision here. Otherwise
736                 // timestamps will never match the RFC1123-formatted
737                 // Last-Modified values parsed by Mtime().
738                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
739         }
740         return dataL.Error()
741 }
742
743 // Mtime returns the stored timestamp for the given locator.
744 func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
745         key := v.key(loc)
746         _, err := v.head(key)
747         if err != nil {
748                 return s3AWSZeroTime, v.translateError(err)
749         }
750         resp, err := v.head("recent/" + key)
751         err = v.translateError(err)
752         if os.IsNotExist(err) {
753                 // The data object X exists, but recent/X is missing.
754                 err = v.writeObject(context.Background(), "recent/"+key, nil)
755                 if err != nil {
756                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
757                         return s3AWSZeroTime, v.translateError(err)
758                 }
759                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
760                 resp, err = v.head("recent/" + key)
761                 if err != nil {
762                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
763                         return s3AWSZeroTime, v.translateError(err)
764                 }
765         } else if err != nil {
766                 // HEAD recent/X failed for some other reason.
767                 return s3AWSZeroTime, err
768         }
769         return *resp.LastModified, err
770 }
771
772 // Status returns a *VolumeStatus representing the current in-use
773 // storage capacity and a fake available capacity that doesn't make
774 // the volume seem full or nearly-full.
775 func (v *S3AWSVolume) Status() *VolumeStatus {
776         return &VolumeStatus{
777                 DeviceNum: 1,
778                 BytesFree: BlockSize * 1000,
779                 BytesUsed: 1,
780         }
781 }
782
783 // InternalStats returns bucket I/O and API call counters.
784 func (v *S3AWSVolume) InternalStats() interface{} {
785         return &v.bucket.stats
786 }
787
788 // Touch sets the timestamp for the given locator to the current time.
789 func (v *S3AWSVolume) Touch(loc string) error {
790         if v.volume.ReadOnly {
791                 return MethodDisabledError
792         }
793         key := v.key(loc)
794         _, err := v.head(key)
795         err = v.translateError(err)
796         if os.IsNotExist(err) && v.fixRace(key) {
797                 // The data object got trashed in a race, but fixRace
798                 // rescued it.
799         } else if err != nil {
800                 return err
801         }
802         err = v.writeObject(context.Background(), "recent/"+key, nil)
803         return v.translateError(err)
804 }
805
806 // checkRaceWindow returns a non-nil error if trash/key is, or might
807 // be, in the race window (i.e., it's not safe to trash key).
808 func (v *S3AWSVolume) checkRaceWindow(key string) error {
809         resp, err := v.head("trash/" + key)
810         err = v.translateError(err)
811         if os.IsNotExist(err) {
812                 // OK, trash/X doesn't exist so we're not in the race
813                 // window
814                 return nil
815         } else if err != nil {
816                 // Error looking up trash/X. We don't know whether
817                 // we're in the race window
818                 return err
819         }
820         t := resp.LastModified
821         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
822         if safeWindow <= 0 {
823                 // We can't count on "touch trash/X" to prolong
824                 // trash/X's lifetime. The new timestamp might not
825                 // become visible until now+raceWindow, and EmptyTrash
826                 // is allowed to delete trash/X before then.
827                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
828         }
829         // trash/X exists, but it won't be eligible for deletion until
830         // after now+raceWindow, so it's safe to overwrite it.
831         return nil
832 }
833
834 func (b *s3AWSbucket) Del(path string) error {
835         input := &s3.DeleteObjectInput{
836                 Bucket: aws.String(b.bucket),
837                 Key:    aws.String(path),
838         }
839         req := b.svc.DeleteObjectRequest(input)
840         _, err := req.Send(context.Background())
841         b.stats.TickOps("delete")
842         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
843         b.stats.TickErr(err)
844         return err
845 }
846
847 // Trash a Keep block.
848 func (v *S3AWSVolume) Trash(loc string) error {
849         if v.volume.ReadOnly && !v.volume.AllowTrashWhenReadOnly {
850                 return MethodDisabledError
851         }
852         if t, err := v.Mtime(loc); err != nil {
853                 return err
854         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
855                 return nil
856         }
857         key := v.key(loc)
858         if v.cluster.Collections.BlobTrashLifetime == 0 {
859                 if !v.UnsafeDelete {
860                         return ErrS3TrashDisabled
861                 }
862                 return v.translateError(v.bucket.Del(key))
863         }
864         err := v.checkRaceWindow(key)
865         if err != nil {
866                 return err
867         }
868         err = v.safeCopy("trash/"+key, key)
869         if err != nil {
870                 return err
871         }
872         return v.translateError(v.bucket.Del(key))
873 }
874
875 // Untrash moves block from trash back into store
876 func (v *S3AWSVolume) Untrash(loc string) error {
877         key := v.key(loc)
878         err := v.safeCopy(key, "trash/"+key)
879         if err != nil {
880                 return err
881         }
882         err = v.writeObject(context.Background(), "recent/"+key, nil)
883         return v.translateError(err)
884 }
885
886 type s3awsbucketStats struct {
887         statsTicker
888         Ops     uint64
889         GetOps  uint64
890         PutOps  uint64
891         HeadOps uint64
892         DelOps  uint64
893         ListOps uint64
894 }
895
896 func (s *s3awsbucketStats) TickErr(err error) {
897         if err == nil {
898                 return
899         }
900         errType := fmt.Sprintf("%T", err)
901         if aerr, ok := err.(awserr.Error); ok {
902                 if reqErr, ok := err.(awserr.RequestFailure); ok {
903                         // A service error occurred
904                         errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
905                 } else {
906                         errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
907                 }
908         }
909         s.statsTicker.TickErr(err, errType)
910 }