17339: fix tests.
[arvados.git] / services / keepstore / s3aws_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "os"
17         "regexp"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/aws/awserr"
26         "github.com/aws/aws-sdk-go-v2/aws/defaults"
27         "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
28         "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
29         "github.com/aws/aws-sdk-go-v2/aws/endpoints"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34 )
35
36 // S3AWSVolume implements Volume using an S3 bucket.
37 type S3AWSVolume struct {
38         arvados.S3VolumeDriverParameters
39         AuthToken      string    // populated automatically when IAMRole is used
40         AuthExpiration time.Time // populated automatically when IAMRole is used
41
42         cluster   *arvados.Cluster
43         volume    arvados.Volume
44         logger    logrus.FieldLogger
45         metrics   *volumeMetricsVecs
46         bucket    *s3AWSbucket
47         region    string
48         startOnce sync.Once
49 }
50
51 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
52 // wrapped bucket can be replaced atomically with SetBucket in order
53 // to update credentials.
54 type s3AWSbucket struct {
55         bucket string
56         svc    *s3.Client
57         stats  s3awsbucketStats
58         mu     sync.Mutex
59 }
60
61 // chooseS3VolumeDriver distinguishes between the old goamz driver and
62 // aws-sdk-go based on the UseAWSS3v2Driver feature flag
63 func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
64         v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
65         err := json.Unmarshal(volume.DriverParameters, v)
66         if err != nil {
67                 return nil, err
68         }
69         if v.UseAWSS3v2Driver {
70                 logger.Debugln("Using AWS S3 v2 driver")
71                 return newS3AWSVolume(cluster, volume, logger, metrics)
72         }
73         logger.Debugln("Using goamz S3 driver")
74         return newS3Volume(cluster, volume, logger, metrics)
75 }
76
77 const (
78         PartSize         = 5 * 1024 * 1024
79         ReadConcurrency  = 13
80         WriteConcurrency = 5
81 )
82
83 var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
84 var s3AWSZeroTime time.Time
85
86 func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
87         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
88                 s = s[v.PrefixLength+1:]
89         }
90         return s, s3AWSKeepBlockRegexp.MatchString(s)
91 }
92
93 // Return the key used for a given loc. If PrefixLength==0 then
94 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
95 // "abc/abcdef0123", etc.
96 func (v *S3AWSVolume) key(loc string) string {
97         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
98                 return loc[:v.PrefixLength] + "/" + loc
99         } else {
100                 return loc
101         }
102 }
103
104 func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
105         v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
106         err := json.Unmarshal(volume.DriverParameters, v)
107         if err != nil {
108                 return nil, err
109         }
110         v.logger = logger.WithField("Volume", v.String())
111         return v, v.check("")
112 }
113
114 func (v *S3AWSVolume) translateError(err error) error {
115         if aerr, ok := err.(awserr.Error); ok {
116                 switch aerr.Code() {
117                 case "NotFound":
118                         return os.ErrNotExist
119                 case "NoSuchKey":
120                         return os.ErrNotExist
121                 }
122         } else {
123                 switch err.(type) {
124                 case *aws.RequestCanceledError:
125                         return context.Canceled
126                 }
127         }
128         return err
129 }
130
131 // safeCopy calls CopyObjectRequest, and checks the response to make
132 // sure the copy succeeded and updated the timestamp on the
133 // destination object
134 //
135 // (If something goes wrong during the copy, the error will be
136 // embedded in the 200 OK response)
137 func (v *S3AWSVolume) safeCopy(dst, src string) error {
138         input := &s3.CopyObjectInput{
139                 Bucket:      aws.String(v.bucket.bucket),
140                 ContentType: aws.String("application/octet-stream"),
141                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
142                 Key:         aws.String(dst),
143         }
144
145         req := v.bucket.svc.CopyObjectRequest(input)
146         resp, err := req.Send(context.Background())
147
148         err = v.translateError(err)
149         if os.IsNotExist(err) {
150                 return err
151         } else if err != nil {
152                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
153         }
154
155         if resp.CopyObjectResult.LastModified == nil {
156                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
157         } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
158                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
159         }
160         return nil
161 }
162
163 func (v *S3AWSVolume) check(ec2metadataHostname string) error {
164         if v.Bucket == "" {
165                 return errors.New("DriverParameters: Bucket must be provided")
166         }
167         if v.IndexPageSize == 0 {
168                 v.IndexPageSize = 1000
169         }
170         if v.RaceWindow < 0 {
171                 return errors.New("DriverParameters: RaceWindow must not be negative")
172         }
173
174         if v.V2Signature {
175                 return errors.New("DriverParameters: V2Signature is not supported")
176         }
177
178         defaultResolver := endpoints.NewDefaultResolver()
179
180         cfg := defaults.Config()
181
182         if v.Endpoint == "" && v.Region == "" {
183                 return fmt.Errorf("AWS region or endpoint must be specified")
184         } else if v.Endpoint != "" || ec2metadataHostname != "" {
185                 myCustomResolver := func(service, region string) (aws.Endpoint, error) {
186                         if v.Endpoint != "" && service == "s3" {
187                                 return aws.Endpoint{
188                                         URL:           v.Endpoint,
189                                         SigningRegion: v.Region,
190                                 }, nil
191                         } else if service == "ec2metadata" && ec2metadataHostname != "" {
192                                 return aws.Endpoint{
193                                         URL: ec2metadataHostname,
194                                 }, nil
195                         }
196
197                         return defaultResolver.ResolveEndpoint(service, region)
198                 }
199                 cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
200         }
201
202         cfg.Region = v.Region
203
204         // Zero timeouts mean "wait forever", which is a bad
205         // default. Default to long timeouts instead.
206         if v.ConnectTimeout == 0 {
207                 v.ConnectTimeout = s3DefaultConnectTimeout
208         }
209         if v.ReadTimeout == 0 {
210                 v.ReadTimeout = s3DefaultReadTimeout
211         }
212
213         creds := aws.NewChainProvider(
214                 []aws.CredentialsProvider{
215                         aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
216                         ec2rolecreds.New(ec2metadata.New(cfg)),
217                 })
218
219         cfg.Credentials = creds
220
221         v.bucket = &s3AWSbucket{
222                 bucket: v.Bucket,
223                 svc:    s3.New(cfg),
224         }
225
226         // Set up prometheus metrics
227         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
228         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
229
230         return nil
231 }
232
233 // String implements fmt.Stringer.
234 func (v *S3AWSVolume) String() string {
235         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
236 }
237
238 // GetDeviceID returns a globally unique ID for the storage bucket.
239 func (v *S3AWSVolume) GetDeviceID() string {
240         return "s3://" + v.Endpoint + "/" + v.Bucket
241 }
242
243 // Compare the given data with the stored data.
244 func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
245         key := v.key(loc)
246         errChan := make(chan error, 1)
247         go func() {
248                 _, err := v.head("recent/" + key)
249                 errChan <- err
250         }()
251         var err error
252         select {
253         case <-ctx.Done():
254                 return ctx.Err()
255         case err = <-errChan:
256         }
257         if err != nil {
258                 // Checking for the key itself here would interfere
259                 // with future GET requests.
260                 //
261                 // On AWS, if X doesn't exist, a HEAD or GET request
262                 // for X causes X's non-existence to be cached. Thus,
263                 // if we test for X, then create X and return a
264                 // signature to our client, the client might still get
265                 // 404 from all keepstores when trying to read it.
266                 //
267                 // To avoid this, we avoid doing HEAD X or GET X until
268                 // we know X has been written.
269                 //
270                 // Note that X might exist even though recent/X
271                 // doesn't: for example, the response to HEAD recent/X
272                 // might itself come from a stale cache. In such
273                 // cases, we will return a false negative and
274                 // PutHandler might needlessly create another replica
275                 // on a different volume. That's not ideal, but it's
276                 // better than passing the eventually-consistent
277                 // problem on to our clients.
278                 return v.translateError(err)
279         }
280
281         input := &s3.GetObjectInput{
282                 Bucket: aws.String(v.bucket.bucket),
283                 Key:    aws.String(key),
284         }
285
286         req := v.bucket.svc.GetObjectRequest(input)
287         result, err := req.Send(ctx)
288         if err != nil {
289                 return v.translateError(err)
290         }
291         return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32]))
292 }
293
294 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
295 // and deletes them from the volume.
296 func (v *S3AWSVolume) EmptyTrash() {
297         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
298                 return
299         }
300
301         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
302
303         // Define "ready to delete" as "...when EmptyTrash started".
304         startT := time.Now()
305
306         emptyOneKey := func(trash *s3.Object) {
307                 key := strings.TrimPrefix(*trash.Key, "trash/")
308                 loc, isblk := v.isKeepBlock(key)
309                 if !isblk {
310                         return
311                 }
312                 atomic.AddInt64(&bytesInTrash, *trash.Size)
313                 atomic.AddInt64(&blocksInTrash, 1)
314
315                 trashT := *trash.LastModified
316                 recent, err := v.head("recent/" + key)
317                 if err != nil && os.IsNotExist(v.translateError(err)) {
318                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
319                         err = v.Untrash(loc)
320                         if err != nil {
321                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
322                         }
323                         return
324                 } else if err != nil {
325                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
326                         return
327                 }
328                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
329                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
330                                 // recent/key is too old to protect
331                                 // loc from being Trashed again during
332                                 // the raceWindow that starts if we
333                                 // delete trash/X now.
334                                 //
335                                 // Note this means (TrashSweepInterval
336                                 // < BlobSigningTTL - raceWindow) is
337                                 // necessary to avoid starvation.
338                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
339                                 v.fixRace(key)
340                                 v.Touch(loc)
341                                 return
342                         }
343                         _, err := v.head(key)
344                         if os.IsNotExist(err) {
345                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
346                                 v.fixRace(key)
347                                 return
348                         } else if err != nil {
349                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
350                                 return
351                         }
352                 }
353                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
354                         return
355                 }
356                 err = v.bucket.Del(*trash.Key)
357                 if err != nil {
358                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
359                         return
360                 }
361                 atomic.AddInt64(&bytesDeleted, *trash.Size)
362                 atomic.AddInt64(&blocksDeleted, 1)
363
364                 _, err = v.head(*trash.Key)
365                 if err == nil {
366                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
367                         return
368                 }
369                 if !os.IsNotExist(v.translateError(err)) {
370                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
371                         return
372                 }
373                 err = v.bucket.Del("recent/" + key)
374                 if err != nil {
375                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
376                 }
377         }
378
379         var wg sync.WaitGroup
380         todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
381         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
382                 wg.Add(1)
383                 go func() {
384                         defer wg.Done()
385                         for key := range todo {
386                                 emptyOneKey(key)
387                         }
388                 }()
389         }
390
391         trashL := s3awsLister{
392                 Logger:   v.logger,
393                 Bucket:   v.bucket,
394                 Prefix:   "trash/",
395                 PageSize: v.IndexPageSize,
396                 Stats:    &v.bucket.stats,
397         }
398         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
399                 todo <- trash
400         }
401         close(todo)
402         wg.Wait()
403
404         if err := trashL.Error(); err != nil {
405                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
406         }
407         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
408 }
409
410 // fixRace(X) is called when "recent/X" exists but "X" doesn't
411 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
412 // was a race between Put and Trash, fixRace recovers from the race by
413 // Untrashing the block.
414 func (v *S3AWSVolume) fixRace(key string) bool {
415         trash, err := v.head("trash/" + key)
416         if err != nil {
417                 if !os.IsNotExist(v.translateError(err)) {
418                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
419                 }
420                 return false
421         }
422
423         recent, err := v.head("recent/" + key)
424         if err != nil {
425                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
426                 return false
427         }
428
429         recentTime := *recent.LastModified
430         trashTime := *trash.LastModified
431         ageWhenTrashed := trashTime.Sub(recentTime)
432         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
433                 // No evidence of a race: block hasn't been written
434                 // since it became eligible for Trash. No fix needed.
435                 return false
436         }
437
438         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
439         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
440         err = v.safeCopy(key, "trash/"+key)
441         if err != nil {
442                 v.logger.WithError(err).Error("fixRace: copy failed")
443                 return false
444         }
445         return true
446 }
447
448 func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
449         input := &s3.HeadObjectInput{
450                 Bucket: aws.String(v.bucket.bucket),
451                 Key:    aws.String(key),
452         }
453
454         req := v.bucket.svc.HeadObjectRequest(input)
455         res, err := req.Send(context.TODO())
456
457         v.bucket.stats.TickOps("head")
458         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
459         v.bucket.stats.TickErr(err)
460
461         if err != nil {
462                 return nil, v.translateError(err)
463         }
464         result = res.HeadObjectOutput
465         return
466 }
467
468 // Get a block: copy the block data into buf, and return the number of
469 // bytes copied.
470 func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
471         // Do not use getWithPipe here: the BlockReader interface does not pass
472         // through 'buf []byte', and we don't want to allocate two buffers for each
473         // read request. Instead, use a version of ReadBlock that accepts 'buf []byte'
474         // as an input.
475         return v.ReadBlock(ctx, loc, buf)
476 }
477
478 func (v *S3AWSVolume) ReadBlock(ctx context.Context, loc string, buf []byte) (int, error) {
479         key := v.key(loc)
480         count, err := v.readWorker(ctx, key, buf)
481         if err == nil {
482                 v.bucket.stats.TickInBytes(uint64(count))
483                 return count, err
484         }
485
486         err = v.translateError(err)
487         if !os.IsNotExist(err) {
488                 return 0, err
489         }
490
491         _, err = v.head("recent/" + key)
492         err = v.translateError(err)
493         if err != nil {
494                 // If we can't read recent/X, there's no point in
495                 // trying fixRace. Give up.
496                 return 0, err
497         }
498         if !v.fixRace(key) {
499                 err = os.ErrNotExist
500                 return 0, err
501         }
502
503         count, err = v.readWorker(ctx, key, buf)
504         if err != nil {
505                 v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
506                 err = v.translateError(err)
507                 return 0, err
508         }
509         v.bucket.stats.TickInBytes(uint64(count))
510         return count, err
511 }
512
513 func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (int, error) {
514         awsBuf := aws.NewWriteAtBuffer(buf)
515         downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
516                 u.PartSize = PartSize
517                 u.Concurrency = ReadConcurrency
518         })
519
520         v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
521
522         count, err := downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
523                 Bucket: aws.String(v.bucket.bucket),
524                 Key:    aws.String(key),
525         })
526         v.bucket.stats.TickOps("get")
527         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
528         v.bucket.stats.TickErr(err)
529         if err != nil {
530                 return 0, v.translateError(err)
531         }
532         buf = awsBuf.Bytes()
533
534         return int(count), err
535 }
536
537 func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
538         if r == nil {
539                 // r == nil leads to a memory violation in func readFillBuf in
540                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
541                 r = bytes.NewReader(nil)
542         }
543
544         uploadInput := s3manager.UploadInput{
545                 Bucket: aws.String(v.bucket.bucket),
546                 Key:    aws.String(key),
547                 Body:   r,
548         }
549
550         if loc, ok := v.isKeepBlock(key); ok {
551                 var contentMD5 string
552                 md5, err := hex.DecodeString(loc)
553                 if err != nil {
554                         return err
555                 }
556                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
557                 uploadInput.ContentMD5 = &contentMD5
558         }
559
560         // Experimentation indicated that using concurrency 5 yields the best
561         // throughput, better than higher concurrency (10 or 13) by ~5%.
562         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
563         // is detrimental to througput (minus ~15%).
564         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
565                 u.PartSize = PartSize
566                 u.Concurrency = WriteConcurrency
567         })
568
569         // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
570         // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
571         // block, so there is no extra memory use to be concerned about. See
572         // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
573         // calculating the Sha-256 because we don't need it; we already use md5sum
574         // hashes that match the name of the block.
575         _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
576                 r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
577         }))
578
579         v.bucket.stats.TickOps("put")
580         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
581         v.bucket.stats.TickErr(err)
582
583         return err
584 }
585
586 // Put writes a block.
587 func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
588         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
589         // sdk to avoid memory allocation there. See #17339 for more information.
590         return v.translateError(v.WriteBlock(ctx, loc, bytes.NewReader(block)))
591 }
592
593 // WriteBlock implements BlockWriter.
594 func (v *S3AWSVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
595         if v.volume.ReadOnly {
596                 return MethodDisabledError
597         }
598
599         r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
600         key := v.key(loc)
601         err := v.writeObject(ctx, key, r)
602         if err != nil {
603                 return err
604         }
605         return v.writeObject(ctx, "recent/"+key, nil)
606 }
607
608 type s3awsLister struct {
609         Logger            logrus.FieldLogger
610         Bucket            *s3AWSbucket
611         Prefix            string
612         PageSize          int
613         Stats             *s3awsbucketStats
614         ContinuationToken string
615         buf               []s3.Object
616         err               error
617 }
618
619 // First fetches the first page and returns the first item. It returns
620 // nil if the response is the empty set or an error occurs.
621 func (lister *s3awsLister) First() *s3.Object {
622         lister.getPage()
623         return lister.pop()
624 }
625
626 // Next returns the next item, fetching the next page if necessary. It
627 // returns nil if the last available item has already been fetched, or
628 // an error occurs.
629 func (lister *s3awsLister) Next() *s3.Object {
630         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
631                 lister.getPage()
632         }
633         return lister.pop()
634 }
635
636 // Return the most recent error encountered by First or Next.
637 func (lister *s3awsLister) Error() error {
638         return lister.err
639 }
640
641 func (lister *s3awsLister) getPage() {
642         lister.Stats.TickOps("list")
643         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
644
645         var input *s3.ListObjectsV2Input
646         if lister.ContinuationToken == "" {
647                 input = &s3.ListObjectsV2Input{
648                         Bucket:  aws.String(lister.Bucket.bucket),
649                         MaxKeys: aws.Int64(int64(lister.PageSize)),
650                         Prefix:  aws.String(lister.Prefix),
651                 }
652         } else {
653                 input = &s3.ListObjectsV2Input{
654                         Bucket:            aws.String(lister.Bucket.bucket),
655                         MaxKeys:           aws.Int64(int64(lister.PageSize)),
656                         Prefix:            aws.String(lister.Prefix),
657                         ContinuationToken: &lister.ContinuationToken,
658                 }
659         }
660
661         req := lister.Bucket.svc.ListObjectsV2Request(input)
662         resp, err := req.Send(context.Background())
663         if err != nil {
664                 if aerr, ok := err.(awserr.Error); ok {
665                         lister.err = aerr
666                 } else {
667                         lister.err = err
668                 }
669                 return
670         }
671
672         if *resp.IsTruncated {
673                 lister.ContinuationToken = *resp.NextContinuationToken
674         } else {
675                 lister.ContinuationToken = ""
676         }
677         lister.buf = make([]s3.Object, 0, len(resp.Contents))
678         for _, key := range resp.Contents {
679                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
680                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
681                         continue
682                 }
683                 lister.buf = append(lister.buf, key)
684         }
685 }
686
687 func (lister *s3awsLister) pop() (k *s3.Object) {
688         if len(lister.buf) > 0 {
689                 k = &lister.buf[0]
690                 lister.buf = lister.buf[1:]
691         }
692         return
693 }
694
695 // IndexTo writes a complete list of locators with the given prefix
696 // for which Get() can retrieve data.
697 func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
698         prefix = v.key(prefix)
699         // Use a merge sort to find matching sets of X and recent/X.
700         dataL := s3awsLister{
701                 Logger:   v.logger,
702                 Bucket:   v.bucket,
703                 Prefix:   prefix,
704                 PageSize: v.IndexPageSize,
705                 Stats:    &v.bucket.stats,
706         }
707         recentL := s3awsLister{
708                 Logger:   v.logger,
709                 Bucket:   v.bucket,
710                 Prefix:   "recent/" + prefix,
711                 PageSize: v.IndexPageSize,
712                 Stats:    &v.bucket.stats,
713         }
714         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
715                 if *data.Key >= "g" {
716                         // Conveniently, "recent/*" and "trash/*" are
717                         // lexically greater than all hex-encoded data
718                         // hashes, so stopping here avoids iterating
719                         // over all of them needlessly with dataL.
720                         break
721                 }
722                 loc, isblk := v.isKeepBlock(*data.Key)
723                 if !isblk {
724                         continue
725                 }
726
727                 // stamp is the list entry we should use to report the
728                 // last-modified time for this data block: it will be
729                 // the recent/X entry if one exists, otherwise the
730                 // entry for the data block itself.
731                 stamp := data
732
733                 // Advance to the corresponding recent/X marker, if any
734                 for recent != nil && recentL.Error() == nil {
735                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
736                                 recent = recentL.Next()
737                                 continue
738                         } else if cmp == 0 {
739                                 stamp = recent
740                                 recent = recentL.Next()
741                                 break
742                         } else {
743                                 // recent/X marker is missing: we'll
744                                 // use the timestamp on the data
745                                 // object.
746                                 break
747                         }
748                 }
749                 if err := recentL.Error(); err != nil {
750                         return err
751                 }
752                 // We truncate sub-second precision here. Otherwise
753                 // timestamps will never match the RFC1123-formatted
754                 // Last-Modified values parsed by Mtime().
755                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
756         }
757         return dataL.Error()
758 }
759
760 // Mtime returns the stored timestamp for the given locator.
761 func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
762         key := v.key(loc)
763         _, err := v.head(key)
764         if err != nil {
765                 return s3AWSZeroTime, v.translateError(err)
766         }
767         resp, err := v.head("recent/" + key)
768         err = v.translateError(err)
769         if os.IsNotExist(err) {
770                 // The data object X exists, but recent/X is missing.
771                 err = v.writeObject(context.Background(), "recent/"+key, nil)
772                 if err != nil {
773                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
774                         return s3AWSZeroTime, v.translateError(err)
775                 }
776                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
777                 resp, err = v.head("recent/" + key)
778                 if err != nil {
779                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
780                         return s3AWSZeroTime, v.translateError(err)
781                 }
782         } else if err != nil {
783                 // HEAD recent/X failed for some other reason.
784                 return s3AWSZeroTime, err
785         }
786         return *resp.LastModified, err
787 }
788
789 // Status returns a *VolumeStatus representing the current in-use
790 // storage capacity and a fake available capacity that doesn't make
791 // the volume seem full or nearly-full.
792 func (v *S3AWSVolume) Status() *VolumeStatus {
793         return &VolumeStatus{
794                 DeviceNum: 1,
795                 BytesFree: BlockSize * 1000,
796                 BytesUsed: 1,
797         }
798 }
799
800 // InternalStats returns bucket I/O and API call counters.
801 func (v *S3AWSVolume) InternalStats() interface{} {
802         return &v.bucket.stats
803 }
804
805 // Touch sets the timestamp for the given locator to the current time.
806 func (v *S3AWSVolume) Touch(loc string) error {
807         if v.volume.ReadOnly {
808                 return MethodDisabledError
809         }
810         key := v.key(loc)
811         _, err := v.head(key)
812         err = v.translateError(err)
813         if os.IsNotExist(err) && v.fixRace(key) {
814                 // The data object got trashed in a race, but fixRace
815                 // rescued it.
816         } else if err != nil {
817                 return err
818         }
819         err = v.writeObject(context.Background(), "recent/"+key, nil)
820         return v.translateError(err)
821 }
822
823 // checkRaceWindow returns a non-nil error if trash/key is, or might
824 // be, in the race window (i.e., it's not safe to trash key).
825 func (v *S3AWSVolume) checkRaceWindow(key string) error {
826         resp, err := v.head("trash/" + key)
827         err = v.translateError(err)
828         if os.IsNotExist(err) {
829                 // OK, trash/X doesn't exist so we're not in the race
830                 // window
831                 return nil
832         } else if err != nil {
833                 // Error looking up trash/X. We don't know whether
834                 // we're in the race window
835                 return err
836         }
837         t := resp.LastModified
838         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
839         if safeWindow <= 0 {
840                 // We can't count on "touch trash/X" to prolong
841                 // trash/X's lifetime. The new timestamp might not
842                 // become visible until now+raceWindow, and EmptyTrash
843                 // is allowed to delete trash/X before then.
844                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
845         }
846         // trash/X exists, but it won't be eligible for deletion until
847         // after now+raceWindow, so it's safe to overwrite it.
848         return nil
849 }
850
851 func (b *s3AWSbucket) Del(path string) error {
852         input := &s3.DeleteObjectInput{
853                 Bucket: aws.String(b.bucket),
854                 Key:    aws.String(path),
855         }
856         req := b.svc.DeleteObjectRequest(input)
857         _, err := req.Send(context.Background())
858         b.stats.TickOps("delete")
859         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
860         b.stats.TickErr(err)
861         return err
862 }
863
864 // Trash a Keep block.
865 func (v *S3AWSVolume) Trash(loc string) error {
866         if v.volume.ReadOnly {
867                 return MethodDisabledError
868         }
869         if t, err := v.Mtime(loc); err != nil {
870                 return err
871         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
872                 return nil
873         }
874         key := v.key(loc)
875         if v.cluster.Collections.BlobTrashLifetime == 0 {
876                 if !v.UnsafeDelete {
877                         return ErrS3TrashDisabled
878                 }
879                 return v.translateError(v.bucket.Del(key))
880         }
881         err := v.checkRaceWindow(key)
882         if err != nil {
883                 return err
884         }
885         err = v.safeCopy("trash/"+key, key)
886         if err != nil {
887                 return err
888         }
889         return v.translateError(v.bucket.Del(key))
890 }
891
892 // Untrash moves block from trash back into store
893 func (v *S3AWSVolume) Untrash(loc string) error {
894         key := v.key(loc)
895         err := v.safeCopy(key, "trash/"+key)
896         if err != nil {
897                 return err
898         }
899         err = v.writeObject(context.Background(), "recent/"+key, nil)
900         return v.translateError(err)
901 }
902
903 type s3awsbucketStats struct {
904         statsTicker
905         Ops     uint64
906         GetOps  uint64
907         PutOps  uint64
908         HeadOps uint64
909         DelOps  uint64
910         ListOps uint64
911 }
912
913 func (s *s3awsbucketStats) TickErr(err error) {
914         if err == nil {
915                 return
916         }
917         errType := fmt.Sprintf("%T", err)
918         if aerr, ok := err.(awserr.Error); ok {
919                 if reqErr, ok := err.(awserr.RequestFailure); ok {
920                         // A service error occurred
921                         errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
922                 } else {
923                         errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
924                 }
925         }
926         s.statsTicker.TickErr(err, errType)
927 }