20862: Add 'sdk/ruby-google-api-client/' from commit '2f4be67955e48bb65d008ecd9ff6da9...
[arvados.git] / services / keepstore / s3aws_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "os"
17         "regexp"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/aws/awserr"
26         "github.com/aws/aws-sdk-go-v2/aws/defaults"
27         "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
28         "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
29         "github.com/aws/aws-sdk-go-v2/aws/endpoints"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34 )
35
36 func init() {
37         driver["S3"] = newS3AWSVolume
38 }
39
40 const (
41         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
42         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
43         maxClockSkew            = 600 * time.Second
44         nearlyRFC1123           = "Mon, 2 Jan 2006 15:04:05 GMT"
45 )
46
47 var (
48         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
49 )
50
51 // S3AWSVolume implements Volume using an S3 bucket.
52 type S3AWSVolume struct {
53         arvados.S3VolumeDriverParameters
54         AuthToken      string    // populated automatically when IAMRole is used
55         AuthExpiration time.Time // populated automatically when IAMRole is used
56
57         cluster   *arvados.Cluster
58         volume    arvados.Volume
59         logger    logrus.FieldLogger
60         metrics   *volumeMetricsVecs
61         bucket    *s3AWSbucket
62         region    string
63         startOnce sync.Once
64 }
65
66 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
67 // wrapped bucket can be replaced atomically with SetBucket in order
68 // to update credentials.
69 type s3AWSbucket struct {
70         bucket string
71         svc    *s3.Client
72         stats  s3awsbucketStats
73         mu     sync.Mutex
74 }
75
76 const (
77         PartSize         = 5 * 1024 * 1024
78         ReadConcurrency  = 13
79         WriteConcurrency = 5
80 )
81
82 var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
83 var s3AWSZeroTime time.Time
84
85 func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
86         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
87                 s = s[v.PrefixLength+1:]
88         }
89         return s, s3AWSKeepBlockRegexp.MatchString(s)
90 }
91
92 // Return the key used for a given loc. If PrefixLength==0 then
93 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
94 // "abc/abcdef0123", etc.
95 func (v *S3AWSVolume) key(loc string) string {
96         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
97                 return loc[:v.PrefixLength] + "/" + loc
98         } else {
99                 return loc
100         }
101 }
102
103 func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
104         v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
105         err := json.Unmarshal(volume.DriverParameters, v)
106         if err != nil {
107                 return nil, err
108         }
109         v.logger = logger.WithField("Volume", v.String())
110         return v, v.check("")
111 }
112
113 func (v *S3AWSVolume) translateError(err error) error {
114         if _, ok := err.(*aws.RequestCanceledError); ok {
115                 return context.Canceled
116         } else if aerr, ok := err.(awserr.Error); ok {
117                 if aerr.Code() == "NotFound" {
118                         return os.ErrNotExist
119                 } else if aerr.Code() == "NoSuchKey" {
120                         return os.ErrNotExist
121                 }
122         }
123         return err
124 }
125
126 // safeCopy calls CopyObjectRequest, and checks the response to make
127 // sure the copy succeeded and updated the timestamp on the
128 // destination object
129 //
130 // (If something goes wrong during the copy, the error will be
131 // embedded in the 200 OK response)
132 func (v *S3AWSVolume) safeCopy(dst, src string) error {
133         input := &s3.CopyObjectInput{
134                 Bucket:      aws.String(v.bucket.bucket),
135                 ContentType: aws.String("application/octet-stream"),
136                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
137                 Key:         aws.String(dst),
138         }
139
140         req := v.bucket.svc.CopyObjectRequest(input)
141         resp, err := req.Send(context.Background())
142
143         err = v.translateError(err)
144         if os.IsNotExist(err) {
145                 return err
146         } else if err != nil {
147                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
148         }
149
150         if resp.CopyObjectResult.LastModified == nil {
151                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
152         } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
153                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
154         }
155         return nil
156 }
157
158 func (v *S3AWSVolume) check(ec2metadataHostname string) error {
159         if v.Bucket == "" {
160                 return errors.New("DriverParameters: Bucket must be provided")
161         }
162         if v.IndexPageSize == 0 {
163                 v.IndexPageSize = 1000
164         }
165         if v.RaceWindow < 0 {
166                 return errors.New("DriverParameters: RaceWindow must not be negative")
167         }
168
169         if v.V2Signature {
170                 return errors.New("DriverParameters: V2Signature is not supported")
171         }
172
173         defaultResolver := endpoints.NewDefaultResolver()
174
175         cfg := defaults.Config()
176
177         if v.Endpoint == "" && v.Region == "" {
178                 return fmt.Errorf("AWS region or endpoint must be specified")
179         } else if v.Endpoint != "" || ec2metadataHostname != "" {
180                 myCustomResolver := func(service, region string) (aws.Endpoint, error) {
181                         if v.Endpoint != "" && service == "s3" {
182                                 return aws.Endpoint{
183                                         URL:           v.Endpoint,
184                                         SigningRegion: region,
185                                 }, nil
186                         } else if service == "ec2metadata" && ec2metadataHostname != "" {
187                                 return aws.Endpoint{
188                                         URL: ec2metadataHostname,
189                                 }, nil
190                         } else {
191                                 return defaultResolver.ResolveEndpoint(service, region)
192                         }
193                 }
194                 cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
195         }
196         if v.Region == "" {
197                 // Endpoint is already specified (otherwise we would
198                 // have errored out above), but Region is also
199                 // required by the aws sdk, in order to determine
200                 // SignatureVersions.
201                 v.Region = "us-east-1"
202         }
203         cfg.Region = v.Region
204
205         // Zero timeouts mean "wait forever", which is a bad
206         // default. Default to long timeouts instead.
207         if v.ConnectTimeout == 0 {
208                 v.ConnectTimeout = s3DefaultConnectTimeout
209         }
210         if v.ReadTimeout == 0 {
211                 v.ReadTimeout = s3DefaultReadTimeout
212         }
213
214         creds := aws.NewChainProvider(
215                 []aws.CredentialsProvider{
216                         aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
217                         ec2rolecreds.New(ec2metadata.New(cfg)),
218                 })
219
220         cfg.Credentials = creds
221
222         v.bucket = &s3AWSbucket{
223                 bucket: v.Bucket,
224                 svc:    s3.New(cfg),
225         }
226
227         // Set up prometheus metrics
228         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
229         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
230
231         return nil
232 }
233
234 // String implements fmt.Stringer.
235 func (v *S3AWSVolume) String() string {
236         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
237 }
238
239 // GetDeviceID returns a globally unique ID for the storage bucket.
240 func (v *S3AWSVolume) GetDeviceID() string {
241         return "s3://" + v.Endpoint + "/" + v.Bucket
242 }
243
244 // Compare the given data with the stored data.
245 func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
246         key := v.key(loc)
247         errChan := make(chan error, 1)
248         go func() {
249                 _, err := v.head("recent/" + key)
250                 errChan <- err
251         }()
252         var err error
253         select {
254         case <-ctx.Done():
255                 return ctx.Err()
256         case err = <-errChan:
257         }
258         if err != nil {
259                 // Checking for the key itself here would interfere
260                 // with future GET requests.
261                 //
262                 // On AWS, if X doesn't exist, a HEAD or GET request
263                 // for X causes X's non-existence to be cached. Thus,
264                 // if we test for X, then create X and return a
265                 // signature to our client, the client might still get
266                 // 404 from all keepstores when trying to read it.
267                 //
268                 // To avoid this, we avoid doing HEAD X or GET X until
269                 // we know X has been written.
270                 //
271                 // Note that X might exist even though recent/X
272                 // doesn't: for example, the response to HEAD recent/X
273                 // might itself come from a stale cache. In such
274                 // cases, we will return a false negative and
275                 // PutHandler might needlessly create another replica
276                 // on a different volume. That's not ideal, but it's
277                 // better than passing the eventually-consistent
278                 // problem on to our clients.
279                 return v.translateError(err)
280         }
281
282         input := &s3.GetObjectInput{
283                 Bucket: aws.String(v.bucket.bucket),
284                 Key:    aws.String(key),
285         }
286
287         req := v.bucket.svc.GetObjectRequest(input)
288         result, err := req.Send(ctx)
289         if err != nil {
290                 return v.translateError(err)
291         }
292         return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32]))
293 }
294
295 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
296 // and deletes them from the volume.
297 func (v *S3AWSVolume) EmptyTrash() {
298         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
299                 return
300         }
301
302         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
303
304         // Define "ready to delete" as "...when EmptyTrash started".
305         startT := time.Now()
306
307         emptyOneKey := func(trash *s3.Object) {
308                 key := strings.TrimPrefix(*trash.Key, "trash/")
309                 loc, isblk := v.isKeepBlock(key)
310                 if !isblk {
311                         return
312                 }
313                 atomic.AddInt64(&bytesInTrash, *trash.Size)
314                 atomic.AddInt64(&blocksInTrash, 1)
315
316                 trashT := *trash.LastModified
317                 recent, err := v.head("recent/" + key)
318                 if err != nil && os.IsNotExist(v.translateError(err)) {
319                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
320                         err = v.Untrash(loc)
321                         if err != nil {
322                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
323                         }
324                         return
325                 } else if err != nil {
326                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
327                         return
328                 }
329                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
330                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
331                                 // recent/key is too old to protect
332                                 // loc from being Trashed again during
333                                 // the raceWindow that starts if we
334                                 // delete trash/X now.
335                                 //
336                                 // Note this means (TrashSweepInterval
337                                 // < BlobSigningTTL - raceWindow) is
338                                 // necessary to avoid starvation.
339                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
340                                 v.fixRace(key)
341                                 v.Touch(loc)
342                                 return
343                         }
344                         _, err := v.head(key)
345                         if os.IsNotExist(err) {
346                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
347                                 v.fixRace(key)
348                                 return
349                         } else if err != nil {
350                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
351                                 return
352                         }
353                 }
354                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
355                         return
356                 }
357                 err = v.bucket.Del(*trash.Key)
358                 if err != nil {
359                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
360                         return
361                 }
362                 atomic.AddInt64(&bytesDeleted, *trash.Size)
363                 atomic.AddInt64(&blocksDeleted, 1)
364
365                 _, err = v.head(*trash.Key)
366                 if err == nil {
367                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
368                         return
369                 }
370                 if !os.IsNotExist(v.translateError(err)) {
371                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
372                         return
373                 }
374                 err = v.bucket.Del("recent/" + key)
375                 if err != nil {
376                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
377                 }
378         }
379
380         var wg sync.WaitGroup
381         todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
382         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
383                 wg.Add(1)
384                 go func() {
385                         defer wg.Done()
386                         for key := range todo {
387                                 emptyOneKey(key)
388                         }
389                 }()
390         }
391
392         trashL := s3awsLister{
393                 Logger:   v.logger,
394                 Bucket:   v.bucket,
395                 Prefix:   "trash/",
396                 PageSize: v.IndexPageSize,
397                 Stats:    &v.bucket.stats,
398         }
399         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
400                 todo <- trash
401         }
402         close(todo)
403         wg.Wait()
404
405         if err := trashL.Error(); err != nil {
406                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
407         }
408         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
409 }
410
411 // fixRace(X) is called when "recent/X" exists but "X" doesn't
412 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
413 // was a race between Put and Trash, fixRace recovers from the race by
414 // Untrashing the block.
415 func (v *S3AWSVolume) fixRace(key string) bool {
416         trash, err := v.head("trash/" + key)
417         if err != nil {
418                 if !os.IsNotExist(v.translateError(err)) {
419                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
420                 }
421                 return false
422         }
423
424         recent, err := v.head("recent/" + key)
425         if err != nil {
426                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
427                 return false
428         }
429
430         recentTime := *recent.LastModified
431         trashTime := *trash.LastModified
432         ageWhenTrashed := trashTime.Sub(recentTime)
433         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
434                 // No evidence of a race: block hasn't been written
435                 // since it became eligible for Trash. No fix needed.
436                 return false
437         }
438
439         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
440         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
441         err = v.safeCopy(key, "trash/"+key)
442         if err != nil {
443                 v.logger.WithError(err).Error("fixRace: copy failed")
444                 return false
445         }
446         return true
447 }
448
449 func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
450         input := &s3.HeadObjectInput{
451                 Bucket: aws.String(v.bucket.bucket),
452                 Key:    aws.String(key),
453         }
454
455         req := v.bucket.svc.HeadObjectRequest(input)
456         res, err := req.Send(context.TODO())
457
458         v.bucket.stats.TickOps("head")
459         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
460         v.bucket.stats.TickErr(err)
461
462         if err != nil {
463                 return nil, v.translateError(err)
464         }
465         result = res.HeadObjectOutput
466         return
467 }
468
469 // Get a block: copy the block data into buf, and return the number of
470 // bytes copied.
471 func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
472         // Do not use getWithPipe here: the BlockReader interface does not pass
473         // through 'buf []byte', and we don't want to allocate two buffers for each
474         // read request. Instead, use a version of ReadBlock that accepts 'buf []byte'
475         // as an input.
476         key := v.key(loc)
477         count, err := v.readWorker(ctx, key, buf)
478         if err == nil {
479                 return count, err
480         }
481
482         err = v.translateError(err)
483         if !os.IsNotExist(err) {
484                 return 0, err
485         }
486
487         _, err = v.head("recent/" + key)
488         err = v.translateError(err)
489         if err != nil {
490                 // If we can't read recent/X, there's no point in
491                 // trying fixRace. Give up.
492                 return 0, err
493         }
494         if !v.fixRace(key) {
495                 err = os.ErrNotExist
496                 return 0, err
497         }
498
499         count, err = v.readWorker(ctx, key, buf)
500         if err != nil {
501                 v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
502                 err = v.translateError(err)
503                 return 0, err
504         }
505         return count, err
506 }
507
508 func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (int, error) {
509         awsBuf := aws.NewWriteAtBuffer(buf)
510         downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
511                 u.PartSize = PartSize
512                 u.Concurrency = ReadConcurrency
513         })
514
515         v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
516
517         count, err := downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
518                 Bucket: aws.String(v.bucket.bucket),
519                 Key:    aws.String(key),
520         })
521         v.bucket.stats.TickOps("get")
522         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
523         v.bucket.stats.TickErr(err)
524         v.bucket.stats.TickInBytes(uint64(count))
525         return int(count), v.translateError(err)
526 }
527
528 func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
529         if r == nil {
530                 // r == nil leads to a memory violation in func readFillBuf in
531                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
532                 r = bytes.NewReader(nil)
533         }
534
535         uploadInput := s3manager.UploadInput{
536                 Bucket: aws.String(v.bucket.bucket),
537                 Key:    aws.String(key),
538                 Body:   r,
539         }
540
541         if loc, ok := v.isKeepBlock(key); ok {
542                 var contentMD5 string
543                 md5, err := hex.DecodeString(loc)
544                 if err != nil {
545                         return v.translateError(err)
546                 }
547                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
548                 uploadInput.ContentMD5 = &contentMD5
549         }
550
551         // Experimentation indicated that using concurrency 5 yields the best
552         // throughput, better than higher concurrency (10 or 13) by ~5%.
553         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
554         // is detrimental to througput (minus ~15%).
555         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
556                 u.PartSize = PartSize
557                 u.Concurrency = WriteConcurrency
558         })
559
560         // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
561         // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
562         // block, so there is no extra memory use to be concerned about. See
563         // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
564         // calculating the Sha-256 because we don't need it; we already use md5sum
565         // hashes that match the name of the block.
566         _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
567                 r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
568         }))
569
570         v.bucket.stats.TickOps("put")
571         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
572         v.bucket.stats.TickErr(err)
573
574         return v.translateError(err)
575 }
576
577 // Put writes a block.
578 func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
579         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
580         // sdk to avoid memory allocation there. See #17339 for more information.
581         if v.volume.ReadOnly {
582                 return MethodDisabledError
583         }
584
585         rdr := bytes.NewReader(block)
586         r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
587         key := v.key(loc)
588         err := v.writeObject(ctx, key, r)
589         if err != nil {
590                 return err
591         }
592         return v.writeObject(ctx, "recent/"+key, nil)
593 }
594
595 type s3awsLister struct {
596         Logger            logrus.FieldLogger
597         Bucket            *s3AWSbucket
598         Prefix            string
599         PageSize          int
600         Stats             *s3awsbucketStats
601         ContinuationToken string
602         buf               []s3.Object
603         err               error
604 }
605
606 // First fetches the first page and returns the first item. It returns
607 // nil if the response is the empty set or an error occurs.
608 func (lister *s3awsLister) First() *s3.Object {
609         lister.getPage()
610         return lister.pop()
611 }
612
613 // Next returns the next item, fetching the next page if necessary. It
614 // returns nil if the last available item has already been fetched, or
615 // an error occurs.
616 func (lister *s3awsLister) Next() *s3.Object {
617         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
618                 lister.getPage()
619         }
620         return lister.pop()
621 }
622
623 // Return the most recent error encountered by First or Next.
624 func (lister *s3awsLister) Error() error {
625         return lister.err
626 }
627
628 func (lister *s3awsLister) getPage() {
629         lister.Stats.TickOps("list")
630         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
631
632         var input *s3.ListObjectsV2Input
633         if lister.ContinuationToken == "" {
634                 input = &s3.ListObjectsV2Input{
635                         Bucket:  aws.String(lister.Bucket.bucket),
636                         MaxKeys: aws.Int64(int64(lister.PageSize)),
637                         Prefix:  aws.String(lister.Prefix),
638                 }
639         } else {
640                 input = &s3.ListObjectsV2Input{
641                         Bucket:            aws.String(lister.Bucket.bucket),
642                         MaxKeys:           aws.Int64(int64(lister.PageSize)),
643                         Prefix:            aws.String(lister.Prefix),
644                         ContinuationToken: &lister.ContinuationToken,
645                 }
646         }
647
648         req := lister.Bucket.svc.ListObjectsV2Request(input)
649         resp, err := req.Send(context.Background())
650         if err != nil {
651                 if aerr, ok := err.(awserr.Error); ok {
652                         lister.err = aerr
653                 } else {
654                         lister.err = err
655                 }
656                 return
657         }
658
659         if *resp.IsTruncated {
660                 lister.ContinuationToken = *resp.NextContinuationToken
661         } else {
662                 lister.ContinuationToken = ""
663         }
664         lister.buf = make([]s3.Object, 0, len(resp.Contents))
665         for _, key := range resp.Contents {
666                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
667                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
668                         continue
669                 }
670                 lister.buf = append(lister.buf, key)
671         }
672 }
673
674 func (lister *s3awsLister) pop() (k *s3.Object) {
675         if len(lister.buf) > 0 {
676                 k = &lister.buf[0]
677                 lister.buf = lister.buf[1:]
678         }
679         return
680 }
681
682 // IndexTo writes a complete list of locators with the given prefix
683 // for which Get() can retrieve data.
684 func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
685         prefix = v.key(prefix)
686         // Use a merge sort to find matching sets of X and recent/X.
687         dataL := s3awsLister{
688                 Logger:   v.logger,
689                 Bucket:   v.bucket,
690                 Prefix:   prefix,
691                 PageSize: v.IndexPageSize,
692                 Stats:    &v.bucket.stats,
693         }
694         recentL := s3awsLister{
695                 Logger:   v.logger,
696                 Bucket:   v.bucket,
697                 Prefix:   "recent/" + prefix,
698                 PageSize: v.IndexPageSize,
699                 Stats:    &v.bucket.stats,
700         }
701         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
702                 if *data.Key >= "g" {
703                         // Conveniently, "recent/*" and "trash/*" are
704                         // lexically greater than all hex-encoded data
705                         // hashes, so stopping here avoids iterating
706                         // over all of them needlessly with dataL.
707                         break
708                 }
709                 loc, isblk := v.isKeepBlock(*data.Key)
710                 if !isblk {
711                         continue
712                 }
713
714                 // stamp is the list entry we should use to report the
715                 // last-modified time for this data block: it will be
716                 // the recent/X entry if one exists, otherwise the
717                 // entry for the data block itself.
718                 stamp := data
719
720                 // Advance to the corresponding recent/X marker, if any
721                 for recent != nil && recentL.Error() == nil {
722                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
723                                 recent = recentL.Next()
724                                 continue
725                         } else if cmp == 0 {
726                                 stamp = recent
727                                 recent = recentL.Next()
728                                 break
729                         } else {
730                                 // recent/X marker is missing: we'll
731                                 // use the timestamp on the data
732                                 // object.
733                                 break
734                         }
735                 }
736                 if err := recentL.Error(); err != nil {
737                         return err
738                 }
739                 // We truncate sub-second precision here. Otherwise
740                 // timestamps will never match the RFC1123-formatted
741                 // Last-Modified values parsed by Mtime().
742                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
743         }
744         return dataL.Error()
745 }
746
747 // Mtime returns the stored timestamp for the given locator.
748 func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
749         key := v.key(loc)
750         _, err := v.head(key)
751         if err != nil {
752                 return s3AWSZeroTime, v.translateError(err)
753         }
754         resp, err := v.head("recent/" + key)
755         err = v.translateError(err)
756         if os.IsNotExist(err) {
757                 // The data object X exists, but recent/X is missing.
758                 err = v.writeObject(context.Background(), "recent/"+key, nil)
759                 if err != nil {
760                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
761                         return s3AWSZeroTime, v.translateError(err)
762                 }
763                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
764                 resp, err = v.head("recent/" + key)
765                 if err != nil {
766                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
767                         return s3AWSZeroTime, v.translateError(err)
768                 }
769         } else if err != nil {
770                 // HEAD recent/X failed for some other reason.
771                 return s3AWSZeroTime, err
772         }
773         return *resp.LastModified, err
774 }
775
776 // Status returns a *VolumeStatus representing the current in-use
777 // storage capacity and a fake available capacity that doesn't make
778 // the volume seem full or nearly-full.
779 func (v *S3AWSVolume) Status() *VolumeStatus {
780         return &VolumeStatus{
781                 DeviceNum: 1,
782                 BytesFree: BlockSize * 1000,
783                 BytesUsed: 1,
784         }
785 }
786
787 // InternalStats returns bucket I/O and API call counters.
788 func (v *S3AWSVolume) InternalStats() interface{} {
789         return &v.bucket.stats
790 }
791
792 // Touch sets the timestamp for the given locator to the current time.
793 func (v *S3AWSVolume) Touch(loc string) error {
794         if v.volume.ReadOnly {
795                 return MethodDisabledError
796         }
797         key := v.key(loc)
798         _, err := v.head(key)
799         err = v.translateError(err)
800         if os.IsNotExist(err) && v.fixRace(key) {
801                 // The data object got trashed in a race, but fixRace
802                 // rescued it.
803         } else if err != nil {
804                 return err
805         }
806         err = v.writeObject(context.Background(), "recent/"+key, nil)
807         return v.translateError(err)
808 }
809
810 // checkRaceWindow returns a non-nil error if trash/key is, or might
811 // be, in the race window (i.e., it's not safe to trash key).
812 func (v *S3AWSVolume) checkRaceWindow(key string) error {
813         resp, err := v.head("trash/" + key)
814         err = v.translateError(err)
815         if os.IsNotExist(err) {
816                 // OK, trash/X doesn't exist so we're not in the race
817                 // window
818                 return nil
819         } else if err != nil {
820                 // Error looking up trash/X. We don't know whether
821                 // we're in the race window
822                 return err
823         }
824         t := resp.LastModified
825         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
826         if safeWindow <= 0 {
827                 // We can't count on "touch trash/X" to prolong
828                 // trash/X's lifetime. The new timestamp might not
829                 // become visible until now+raceWindow, and EmptyTrash
830                 // is allowed to delete trash/X before then.
831                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
832         }
833         // trash/X exists, but it won't be eligible for deletion until
834         // after now+raceWindow, so it's safe to overwrite it.
835         return nil
836 }
837
838 func (b *s3AWSbucket) Del(path string) error {
839         input := &s3.DeleteObjectInput{
840                 Bucket: aws.String(b.bucket),
841                 Key:    aws.String(path),
842         }
843         req := b.svc.DeleteObjectRequest(input)
844         _, err := req.Send(context.Background())
845         b.stats.TickOps("delete")
846         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
847         b.stats.TickErr(err)
848         return err
849 }
850
851 // Trash a Keep block.
852 func (v *S3AWSVolume) Trash(loc string) error {
853         if v.volume.ReadOnly {
854                 return MethodDisabledError
855         }
856         if t, err := v.Mtime(loc); err != nil {
857                 return err
858         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
859                 return nil
860         }
861         key := v.key(loc)
862         if v.cluster.Collections.BlobTrashLifetime == 0 {
863                 if !v.UnsafeDelete {
864                         return ErrS3TrashDisabled
865                 }
866                 return v.translateError(v.bucket.Del(key))
867         }
868         err := v.checkRaceWindow(key)
869         if err != nil {
870                 return err
871         }
872         err = v.safeCopy("trash/"+key, key)
873         if err != nil {
874                 return err
875         }
876         return v.translateError(v.bucket.Del(key))
877 }
878
879 // Untrash moves block from trash back into store
880 func (v *S3AWSVolume) Untrash(loc string) error {
881         key := v.key(loc)
882         err := v.safeCopy(key, "trash/"+key)
883         if err != nil {
884                 return err
885         }
886         err = v.writeObject(context.Background(), "recent/"+key, nil)
887         return v.translateError(err)
888 }
889
890 type s3awsbucketStats struct {
891         statsTicker
892         Ops     uint64
893         GetOps  uint64
894         PutOps  uint64
895         HeadOps uint64
896         DelOps  uint64
897         ListOps uint64
898 }
899
900 func (s *s3awsbucketStats) TickErr(err error) {
901         if err == nil {
902                 return
903         }
904         errType := fmt.Sprintf("%T", err)
905         if aerr, ok := err.(awserr.Error); ok {
906                 if reqErr, ok := err.(awserr.RequestFailure); ok {
907                         // A service error occurred
908                         errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
909                 } else {
910                         errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
911                 }
912         }
913         s.statsTicker.TickErr(err, errType)
914 }