Merge branch '18830-fix-centos-7-provisioning'
[arvados.git] / services / keepstore / s3aws_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "os"
17         "regexp"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/aws/awserr"
26         "github.com/aws/aws-sdk-go-v2/aws/defaults"
27         "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
28         "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
29         "github.com/aws/aws-sdk-go-v2/aws/endpoints"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34 )
35
36 // S3AWSVolume implements Volume using an S3 bucket.
37 type S3AWSVolume struct {
38         arvados.S3VolumeDriverParameters
39         AuthToken      string    // populated automatically when IAMRole is used
40         AuthExpiration time.Time // populated automatically when IAMRole is used
41
42         cluster   *arvados.Cluster
43         volume    arvados.Volume
44         logger    logrus.FieldLogger
45         metrics   *volumeMetricsVecs
46         bucket    *s3AWSbucket
47         region    string
48         startOnce sync.Once
49 }
50
51 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
52 // wrapped bucket can be replaced atomically with SetBucket in order
53 // to update credentials.
54 type s3AWSbucket struct {
55         bucket string
56         svc    *s3.Client
57         stats  s3awsbucketStats
58         mu     sync.Mutex
59 }
60
61 // chooseS3VolumeDriver distinguishes between the old goamz driver and
62 // aws-sdk-go based on the UseAWSS3v2Driver feature flag
63 func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
64         v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
65         err := json.Unmarshal(volume.DriverParameters, v)
66         if err != nil {
67                 return nil, err
68         }
69         if v.UseAWSS3v2Driver {
70                 logger.Debugln("Using AWS S3 v2 driver")
71                 return newS3AWSVolume(cluster, volume, logger, metrics)
72         }
73         logger.Debugln("Using goamz S3 driver")
74         return newS3Volume(cluster, volume, logger, metrics)
75 }
76
77 const (
78         PartSize         = 5 * 1024 * 1024
79         ReadConcurrency  = 13
80         WriteConcurrency = 5
81 )
82
83 var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
84 var s3AWSZeroTime time.Time
85
86 func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
87         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
88                 s = s[v.PrefixLength+1:]
89         }
90         return s, s3AWSKeepBlockRegexp.MatchString(s)
91 }
92
93 // Return the key used for a given loc. If PrefixLength==0 then
94 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
95 // "abc/abcdef0123", etc.
96 func (v *S3AWSVolume) key(loc string) string {
97         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
98                 return loc[:v.PrefixLength] + "/" + loc
99         } else {
100                 return loc
101         }
102 }
103
104 func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
105         v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
106         err := json.Unmarshal(volume.DriverParameters, v)
107         if err != nil {
108                 return nil, err
109         }
110         v.logger = logger.WithField("Volume", v.String())
111         return v, v.check("")
112 }
113
114 func (v *S3AWSVolume) translateError(err error) error {
115         if _, ok := err.(*aws.RequestCanceledError); ok {
116                 return context.Canceled
117         } else if aerr, ok := err.(awserr.Error); ok {
118                 if aerr.Code() == "NotFound" {
119                         return os.ErrNotExist
120                 } else if aerr.Code() == "NoSuchKey" {
121                         return os.ErrNotExist
122                 }
123         }
124         return err
125 }
126
127 // safeCopy calls CopyObjectRequest, and checks the response to make
128 // sure the copy succeeded and updated the timestamp on the
129 // destination object
130 //
131 // (If something goes wrong during the copy, the error will be
132 // embedded in the 200 OK response)
133 func (v *S3AWSVolume) safeCopy(dst, src string) error {
134         input := &s3.CopyObjectInput{
135                 Bucket:      aws.String(v.bucket.bucket),
136                 ContentType: aws.String("application/octet-stream"),
137                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
138                 Key:         aws.String(dst),
139         }
140
141         req := v.bucket.svc.CopyObjectRequest(input)
142         resp, err := req.Send(context.Background())
143
144         err = v.translateError(err)
145         if os.IsNotExist(err) {
146                 return err
147         } else if err != nil {
148                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
149         }
150
151         if resp.CopyObjectResult.LastModified == nil {
152                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
153         } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
154                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
155         }
156         return nil
157 }
158
159 func (v *S3AWSVolume) check(ec2metadataHostname string) error {
160         if v.Bucket == "" {
161                 return errors.New("DriverParameters: Bucket must be provided")
162         }
163         if v.IndexPageSize == 0 {
164                 v.IndexPageSize = 1000
165         }
166         if v.RaceWindow < 0 {
167                 return errors.New("DriverParameters: RaceWindow must not be negative")
168         }
169
170         if v.V2Signature {
171                 return errors.New("DriverParameters: V2Signature is not supported")
172         }
173
174         defaultResolver := endpoints.NewDefaultResolver()
175
176         cfg := defaults.Config()
177
178         if v.Endpoint == "" && v.Region == "" {
179                 return fmt.Errorf("AWS region or endpoint must be specified")
180         } else if v.Endpoint != "" || ec2metadataHostname != "" {
181                 myCustomResolver := func(service, region string) (aws.Endpoint, error) {
182                         if v.Endpoint != "" && service == "s3" {
183                                 return aws.Endpoint{
184                                         URL:           v.Endpoint,
185                                         SigningRegion: v.Region,
186                                 }, nil
187                         } else if service == "ec2metadata" && ec2metadataHostname != "" {
188                                 return aws.Endpoint{
189                                         URL: ec2metadataHostname,
190                                 }, nil
191                         }
192
193                         return defaultResolver.ResolveEndpoint(service, region)
194                 }
195                 cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
196         }
197
198         cfg.Region = v.Region
199
200         // Zero timeouts mean "wait forever", which is a bad
201         // default. Default to long timeouts instead.
202         if v.ConnectTimeout == 0 {
203                 v.ConnectTimeout = s3DefaultConnectTimeout
204         }
205         if v.ReadTimeout == 0 {
206                 v.ReadTimeout = s3DefaultReadTimeout
207         }
208
209         creds := aws.NewChainProvider(
210                 []aws.CredentialsProvider{
211                         aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
212                         ec2rolecreds.New(ec2metadata.New(cfg)),
213                 })
214
215         cfg.Credentials = creds
216
217         v.bucket = &s3AWSbucket{
218                 bucket: v.Bucket,
219                 svc:    s3.New(cfg),
220         }
221
222         // Set up prometheus metrics
223         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
224         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
225
226         return nil
227 }
228
229 // String implements fmt.Stringer.
230 func (v *S3AWSVolume) String() string {
231         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
232 }
233
234 // GetDeviceID returns a globally unique ID for the storage bucket.
235 func (v *S3AWSVolume) GetDeviceID() string {
236         return "s3://" + v.Endpoint + "/" + v.Bucket
237 }
238
239 // Compare the given data with the stored data.
240 func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
241         key := v.key(loc)
242         errChan := make(chan error, 1)
243         go func() {
244                 _, err := v.head("recent/" + key)
245                 errChan <- err
246         }()
247         var err error
248         select {
249         case <-ctx.Done():
250                 return ctx.Err()
251         case err = <-errChan:
252         }
253         if err != nil {
254                 // Checking for the key itself here would interfere
255                 // with future GET requests.
256                 //
257                 // On AWS, if X doesn't exist, a HEAD or GET request
258                 // for X causes X's non-existence to be cached. Thus,
259                 // if we test for X, then create X and return a
260                 // signature to our client, the client might still get
261                 // 404 from all keepstores when trying to read it.
262                 //
263                 // To avoid this, we avoid doing HEAD X or GET X until
264                 // we know X has been written.
265                 //
266                 // Note that X might exist even though recent/X
267                 // doesn't: for example, the response to HEAD recent/X
268                 // might itself come from a stale cache. In such
269                 // cases, we will return a false negative and
270                 // PutHandler might needlessly create another replica
271                 // on a different volume. That's not ideal, but it's
272                 // better than passing the eventually-consistent
273                 // problem on to our clients.
274                 return v.translateError(err)
275         }
276
277         input := &s3.GetObjectInput{
278                 Bucket: aws.String(v.bucket.bucket),
279                 Key:    aws.String(key),
280         }
281
282         req := v.bucket.svc.GetObjectRequest(input)
283         result, err := req.Send(ctx)
284         if err != nil {
285                 return v.translateError(err)
286         }
287         return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32]))
288 }
289
290 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
291 // and deletes them from the volume.
292 func (v *S3AWSVolume) EmptyTrash() {
293         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
294                 return
295         }
296
297         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
298
299         // Define "ready to delete" as "...when EmptyTrash started".
300         startT := time.Now()
301
302         emptyOneKey := func(trash *s3.Object) {
303                 key := strings.TrimPrefix(*trash.Key, "trash/")
304                 loc, isblk := v.isKeepBlock(key)
305                 if !isblk {
306                         return
307                 }
308                 atomic.AddInt64(&bytesInTrash, *trash.Size)
309                 atomic.AddInt64(&blocksInTrash, 1)
310
311                 trashT := *trash.LastModified
312                 recent, err := v.head("recent/" + key)
313                 if err != nil && os.IsNotExist(v.translateError(err)) {
314                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
315                         err = v.Untrash(loc)
316                         if err != nil {
317                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
318                         }
319                         return
320                 } else if err != nil {
321                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
322                         return
323                 }
324                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
325                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
326                                 // recent/key is too old to protect
327                                 // loc from being Trashed again during
328                                 // the raceWindow that starts if we
329                                 // delete trash/X now.
330                                 //
331                                 // Note this means (TrashSweepInterval
332                                 // < BlobSigningTTL - raceWindow) is
333                                 // necessary to avoid starvation.
334                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
335                                 v.fixRace(key)
336                                 v.Touch(loc)
337                                 return
338                         }
339                         _, err := v.head(key)
340                         if os.IsNotExist(err) {
341                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
342                                 v.fixRace(key)
343                                 return
344                         } else if err != nil {
345                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
346                                 return
347                         }
348                 }
349                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
350                         return
351                 }
352                 err = v.bucket.Del(*trash.Key)
353                 if err != nil {
354                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
355                         return
356                 }
357                 atomic.AddInt64(&bytesDeleted, *trash.Size)
358                 atomic.AddInt64(&blocksDeleted, 1)
359
360                 _, err = v.head(*trash.Key)
361                 if err == nil {
362                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
363                         return
364                 }
365                 if !os.IsNotExist(v.translateError(err)) {
366                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
367                         return
368                 }
369                 err = v.bucket.Del("recent/" + key)
370                 if err != nil {
371                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
372                 }
373         }
374
375         var wg sync.WaitGroup
376         todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
377         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
378                 wg.Add(1)
379                 go func() {
380                         defer wg.Done()
381                         for key := range todo {
382                                 emptyOneKey(key)
383                         }
384                 }()
385         }
386
387         trashL := s3awsLister{
388                 Logger:   v.logger,
389                 Bucket:   v.bucket,
390                 Prefix:   "trash/",
391                 PageSize: v.IndexPageSize,
392                 Stats:    &v.bucket.stats,
393         }
394         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
395                 todo <- trash
396         }
397         close(todo)
398         wg.Wait()
399
400         if err := trashL.Error(); err != nil {
401                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
402         }
403         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
404 }
405
406 // fixRace(X) is called when "recent/X" exists but "X" doesn't
407 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
408 // was a race between Put and Trash, fixRace recovers from the race by
409 // Untrashing the block.
410 func (v *S3AWSVolume) fixRace(key string) bool {
411         trash, err := v.head("trash/" + key)
412         if err != nil {
413                 if !os.IsNotExist(v.translateError(err)) {
414                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
415                 }
416                 return false
417         }
418
419         recent, err := v.head("recent/" + key)
420         if err != nil {
421                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
422                 return false
423         }
424
425         recentTime := *recent.LastModified
426         trashTime := *trash.LastModified
427         ageWhenTrashed := trashTime.Sub(recentTime)
428         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
429                 // No evidence of a race: block hasn't been written
430                 // since it became eligible for Trash. No fix needed.
431                 return false
432         }
433
434         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
435         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
436         err = v.safeCopy(key, "trash/"+key)
437         if err != nil {
438                 v.logger.WithError(err).Error("fixRace: copy failed")
439                 return false
440         }
441         return true
442 }
443
444 func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
445         input := &s3.HeadObjectInput{
446                 Bucket: aws.String(v.bucket.bucket),
447                 Key:    aws.String(key),
448         }
449
450         req := v.bucket.svc.HeadObjectRequest(input)
451         res, err := req.Send(context.TODO())
452
453         v.bucket.stats.TickOps("head")
454         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
455         v.bucket.stats.TickErr(err)
456
457         if err != nil {
458                 return nil, v.translateError(err)
459         }
460         result = res.HeadObjectOutput
461         return
462 }
463
464 // Get a block: copy the block data into buf, and return the number of
465 // bytes copied.
466 func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
467         // Do not use getWithPipe here: the BlockReader interface does not pass
468         // through 'buf []byte', and we don't want to allocate two buffers for each
469         // read request. Instead, use a version of ReadBlock that accepts 'buf []byte'
470         // as an input.
471         key := v.key(loc)
472         count, err := v.readWorker(ctx, key, buf)
473         if err == nil {
474                 return count, err
475         }
476
477         err = v.translateError(err)
478         if !os.IsNotExist(err) {
479                 return 0, err
480         }
481
482         _, err = v.head("recent/" + key)
483         err = v.translateError(err)
484         if err != nil {
485                 // If we can't read recent/X, there's no point in
486                 // trying fixRace. Give up.
487                 return 0, err
488         }
489         if !v.fixRace(key) {
490                 err = os.ErrNotExist
491                 return 0, err
492         }
493
494         count, err = v.readWorker(ctx, key, buf)
495         if err != nil {
496                 v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
497                 err = v.translateError(err)
498                 return 0, err
499         }
500         return count, err
501 }
502
503 func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (int, error) {
504         awsBuf := aws.NewWriteAtBuffer(buf)
505         downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
506                 u.PartSize = PartSize
507                 u.Concurrency = ReadConcurrency
508         })
509
510         v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
511
512         count, err := downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
513                 Bucket: aws.String(v.bucket.bucket),
514                 Key:    aws.String(key),
515         })
516         v.bucket.stats.TickOps("get")
517         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
518         v.bucket.stats.TickErr(err)
519         v.bucket.stats.TickInBytes(uint64(count))
520         return int(count), v.translateError(err)
521 }
522
523 func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
524         if r == nil {
525                 // r == nil leads to a memory violation in func readFillBuf in
526                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
527                 r = bytes.NewReader(nil)
528         }
529
530         uploadInput := s3manager.UploadInput{
531                 Bucket: aws.String(v.bucket.bucket),
532                 Key:    aws.String(key),
533                 Body:   r,
534         }
535
536         if loc, ok := v.isKeepBlock(key); ok {
537                 var contentMD5 string
538                 md5, err := hex.DecodeString(loc)
539                 if err != nil {
540                         return v.translateError(err)
541                 }
542                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
543                 uploadInput.ContentMD5 = &contentMD5
544         }
545
546         // Experimentation indicated that using concurrency 5 yields the best
547         // throughput, better than higher concurrency (10 or 13) by ~5%.
548         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
549         // is detrimental to througput (minus ~15%).
550         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
551                 u.PartSize = PartSize
552                 u.Concurrency = WriteConcurrency
553         })
554
555         // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
556         // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
557         // block, so there is no extra memory use to be concerned about. See
558         // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
559         // calculating the Sha-256 because we don't need it; we already use md5sum
560         // hashes that match the name of the block.
561         _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
562                 r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
563         }))
564
565         v.bucket.stats.TickOps("put")
566         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
567         v.bucket.stats.TickErr(err)
568
569         return v.translateError(err)
570 }
571
572 // Put writes a block.
573 func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
574         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
575         // sdk to avoid memory allocation there. See #17339 for more information.
576         if v.volume.ReadOnly {
577                 return MethodDisabledError
578         }
579
580         rdr := bytes.NewReader(block)
581         r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
582         key := v.key(loc)
583         err := v.writeObject(ctx, key, r)
584         if err != nil {
585                 return err
586         }
587         return v.writeObject(ctx, "recent/"+key, nil)
588 }
589
590 type s3awsLister struct {
591         Logger            logrus.FieldLogger
592         Bucket            *s3AWSbucket
593         Prefix            string
594         PageSize          int
595         Stats             *s3awsbucketStats
596         ContinuationToken string
597         buf               []s3.Object
598         err               error
599 }
600
601 // First fetches the first page and returns the first item. It returns
602 // nil if the response is the empty set or an error occurs.
603 func (lister *s3awsLister) First() *s3.Object {
604         lister.getPage()
605         return lister.pop()
606 }
607
608 // Next returns the next item, fetching the next page if necessary. It
609 // returns nil if the last available item has already been fetched, or
610 // an error occurs.
611 func (lister *s3awsLister) Next() *s3.Object {
612         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
613                 lister.getPage()
614         }
615         return lister.pop()
616 }
617
618 // Return the most recent error encountered by First or Next.
619 func (lister *s3awsLister) Error() error {
620         return lister.err
621 }
622
623 func (lister *s3awsLister) getPage() {
624         lister.Stats.TickOps("list")
625         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
626
627         var input *s3.ListObjectsV2Input
628         if lister.ContinuationToken == "" {
629                 input = &s3.ListObjectsV2Input{
630                         Bucket:  aws.String(lister.Bucket.bucket),
631                         MaxKeys: aws.Int64(int64(lister.PageSize)),
632                         Prefix:  aws.String(lister.Prefix),
633                 }
634         } else {
635                 input = &s3.ListObjectsV2Input{
636                         Bucket:            aws.String(lister.Bucket.bucket),
637                         MaxKeys:           aws.Int64(int64(lister.PageSize)),
638                         Prefix:            aws.String(lister.Prefix),
639                         ContinuationToken: &lister.ContinuationToken,
640                 }
641         }
642
643         req := lister.Bucket.svc.ListObjectsV2Request(input)
644         resp, err := req.Send(context.Background())
645         if err != nil {
646                 if aerr, ok := err.(awserr.Error); ok {
647                         lister.err = aerr
648                 } else {
649                         lister.err = err
650                 }
651                 return
652         }
653
654         if *resp.IsTruncated {
655                 lister.ContinuationToken = *resp.NextContinuationToken
656         } else {
657                 lister.ContinuationToken = ""
658         }
659         lister.buf = make([]s3.Object, 0, len(resp.Contents))
660         for _, key := range resp.Contents {
661                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
662                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
663                         continue
664                 }
665                 lister.buf = append(lister.buf, key)
666         }
667 }
668
669 func (lister *s3awsLister) pop() (k *s3.Object) {
670         if len(lister.buf) > 0 {
671                 k = &lister.buf[0]
672                 lister.buf = lister.buf[1:]
673         }
674         return
675 }
676
677 // IndexTo writes a complete list of locators with the given prefix
678 // for which Get() can retrieve data.
679 func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
680         prefix = v.key(prefix)
681         // Use a merge sort to find matching sets of X and recent/X.
682         dataL := s3awsLister{
683                 Logger:   v.logger,
684                 Bucket:   v.bucket,
685                 Prefix:   prefix,
686                 PageSize: v.IndexPageSize,
687                 Stats:    &v.bucket.stats,
688         }
689         recentL := s3awsLister{
690                 Logger:   v.logger,
691                 Bucket:   v.bucket,
692                 Prefix:   "recent/" + prefix,
693                 PageSize: v.IndexPageSize,
694                 Stats:    &v.bucket.stats,
695         }
696         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
697                 if *data.Key >= "g" {
698                         // Conveniently, "recent/*" and "trash/*" are
699                         // lexically greater than all hex-encoded data
700                         // hashes, so stopping here avoids iterating
701                         // over all of them needlessly with dataL.
702                         break
703                 }
704                 loc, isblk := v.isKeepBlock(*data.Key)
705                 if !isblk {
706                         continue
707                 }
708
709                 // stamp is the list entry we should use to report the
710                 // last-modified time for this data block: it will be
711                 // the recent/X entry if one exists, otherwise the
712                 // entry for the data block itself.
713                 stamp := data
714
715                 // Advance to the corresponding recent/X marker, if any
716                 for recent != nil && recentL.Error() == nil {
717                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
718                                 recent = recentL.Next()
719                                 continue
720                         } else if cmp == 0 {
721                                 stamp = recent
722                                 recent = recentL.Next()
723                                 break
724                         } else {
725                                 // recent/X marker is missing: we'll
726                                 // use the timestamp on the data
727                                 // object.
728                                 break
729                         }
730                 }
731                 if err := recentL.Error(); err != nil {
732                         return err
733                 }
734                 // We truncate sub-second precision here. Otherwise
735                 // timestamps will never match the RFC1123-formatted
736                 // Last-Modified values parsed by Mtime().
737                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
738         }
739         return dataL.Error()
740 }
741
742 // Mtime returns the stored timestamp for the given locator.
743 func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
744         key := v.key(loc)
745         _, err := v.head(key)
746         if err != nil {
747                 return s3AWSZeroTime, v.translateError(err)
748         }
749         resp, err := v.head("recent/" + key)
750         err = v.translateError(err)
751         if os.IsNotExist(err) {
752                 // The data object X exists, but recent/X is missing.
753                 err = v.writeObject(context.Background(), "recent/"+key, nil)
754                 if err != nil {
755                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
756                         return s3AWSZeroTime, v.translateError(err)
757                 }
758                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
759                 resp, err = v.head("recent/" + key)
760                 if err != nil {
761                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
762                         return s3AWSZeroTime, v.translateError(err)
763                 }
764         } else if err != nil {
765                 // HEAD recent/X failed for some other reason.
766                 return s3AWSZeroTime, err
767         }
768         return *resp.LastModified, err
769 }
770
771 // Status returns a *VolumeStatus representing the current in-use
772 // storage capacity and a fake available capacity that doesn't make
773 // the volume seem full or nearly-full.
774 func (v *S3AWSVolume) Status() *VolumeStatus {
775         return &VolumeStatus{
776                 DeviceNum: 1,
777                 BytesFree: BlockSize * 1000,
778                 BytesUsed: 1,
779         }
780 }
781
782 // InternalStats returns bucket I/O and API call counters.
783 func (v *S3AWSVolume) InternalStats() interface{} {
784         return &v.bucket.stats
785 }
786
787 // Touch sets the timestamp for the given locator to the current time.
788 func (v *S3AWSVolume) Touch(loc string) error {
789         if v.volume.ReadOnly {
790                 return MethodDisabledError
791         }
792         key := v.key(loc)
793         _, err := v.head(key)
794         err = v.translateError(err)
795         if os.IsNotExist(err) && v.fixRace(key) {
796                 // The data object got trashed in a race, but fixRace
797                 // rescued it.
798         } else if err != nil {
799                 return err
800         }
801         err = v.writeObject(context.Background(), "recent/"+key, nil)
802         return v.translateError(err)
803 }
804
805 // checkRaceWindow returns a non-nil error if trash/key is, or might
806 // be, in the race window (i.e., it's not safe to trash key).
807 func (v *S3AWSVolume) checkRaceWindow(key string) error {
808         resp, err := v.head("trash/" + key)
809         err = v.translateError(err)
810         if os.IsNotExist(err) {
811                 // OK, trash/X doesn't exist so we're not in the race
812                 // window
813                 return nil
814         } else if err != nil {
815                 // Error looking up trash/X. We don't know whether
816                 // we're in the race window
817                 return err
818         }
819         t := resp.LastModified
820         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
821         if safeWindow <= 0 {
822                 // We can't count on "touch trash/X" to prolong
823                 // trash/X's lifetime. The new timestamp might not
824                 // become visible until now+raceWindow, and EmptyTrash
825                 // is allowed to delete trash/X before then.
826                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
827         }
828         // trash/X exists, but it won't be eligible for deletion until
829         // after now+raceWindow, so it's safe to overwrite it.
830         return nil
831 }
832
833 func (b *s3AWSbucket) Del(path string) error {
834         input := &s3.DeleteObjectInput{
835                 Bucket: aws.String(b.bucket),
836                 Key:    aws.String(path),
837         }
838         req := b.svc.DeleteObjectRequest(input)
839         _, err := req.Send(context.Background())
840         b.stats.TickOps("delete")
841         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
842         b.stats.TickErr(err)
843         return err
844 }
845
846 // Trash a Keep block.
847 func (v *S3AWSVolume) Trash(loc string) error {
848         if v.volume.ReadOnly {
849                 return MethodDisabledError
850         }
851         if t, err := v.Mtime(loc); err != nil {
852                 return err
853         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
854                 return nil
855         }
856         key := v.key(loc)
857         if v.cluster.Collections.BlobTrashLifetime == 0 {
858                 if !v.UnsafeDelete {
859                         return ErrS3TrashDisabled
860                 }
861                 return v.translateError(v.bucket.Del(key))
862         }
863         err := v.checkRaceWindow(key)
864         if err != nil {
865                 return err
866         }
867         err = v.safeCopy("trash/"+key, key)
868         if err != nil {
869                 return err
870         }
871         return v.translateError(v.bucket.Del(key))
872 }
873
874 // Untrash moves block from trash back into store
875 func (v *S3AWSVolume) Untrash(loc string) error {
876         key := v.key(loc)
877         err := v.safeCopy(key, "trash/"+key)
878         if err != nil {
879                 return err
880         }
881         err = v.writeObject(context.Background(), "recent/"+key, nil)
882         return v.translateError(err)
883 }
884
885 type s3awsbucketStats struct {
886         statsTicker
887         Ops     uint64
888         GetOps  uint64
889         PutOps  uint64
890         HeadOps uint64
891         DelOps  uint64
892         ListOps uint64
893 }
894
895 func (s *s3awsbucketStats) TickErr(err error) {
896         if err == nil {
897                 return
898         }
899         errType := fmt.Sprintf("%T", err)
900         if aerr, ok := err.(awserr.Error); ok {
901                 if reqErr, ok := err.(awserr.RequestFailure); ok {
902                         // A service error occurred
903                         errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
904                 } else {
905                         errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
906                 }
907         }
908         s.statsTicker.TickErr(err, errType)
909 }