13647: Use cluster config instead of custom keepstore config.
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/sha256"
11         "encoding/base64"
12         "encoding/hex"
13         "encoding/json"
14         "errors"
15         "fmt"
16         "io"
17         "io/ioutil"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "strings"
23         "sync"
24         "sync/atomic"
25         "time"
26
27         "git.curoverse.com/arvados.git/sdk/go/arvados"
28         "github.com/AdRoll/goamz/aws"
29         "github.com/AdRoll/goamz/s3"
30         "github.com/prometheus/client_golang/prometheus"
31         "github.com/sirupsen/logrus"
32 )
33
34 func init() {
35         driver["S3"] = newS3Volume
36 }
37
38 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
39         v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
40         err := json.Unmarshal(volume.DriverParameters, &v)
41         if err != nil {
42                 return nil, err
43         }
44         return v, v.check()
45 }
46
47 func (v *S3Volume) check() error {
48         if v.Bucket == "" || v.AccessKey == "" || v.SecretKey == "" {
49                 return errors.New("DriverParameters: Bucket, AccessKey, and SecretKey must be provided")
50         }
51         if v.IndexPageSize == 0 {
52                 v.IndexPageSize = 1000
53         }
54         if v.RaceWindow < 0 {
55                 return errors.New("DriverParameters: RaceWindow must not be negative")
56         }
57
58         region, ok := aws.Regions[v.Region]
59         if v.Endpoint == "" {
60                 if !ok {
61                         return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
62                 }
63         } else if ok {
64                 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
65                         "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
66         } else {
67                 region = aws.Region{
68                         Name:                 v.Region,
69                         S3Endpoint:           v.Endpoint,
70                         S3LocationConstraint: v.LocationConstraint,
71                 }
72         }
73
74         auth := aws.Auth{
75                 AccessKey: v.AccessKey,
76                 SecretKey: v.SecretKey,
77         }
78
79         // Zero timeouts mean "wait forever", which is a bad
80         // default. Default to long timeouts instead.
81         if v.ConnectTimeout == 0 {
82                 v.ConnectTimeout = s3DefaultConnectTimeout
83         }
84         if v.ReadTimeout == 0 {
85                 v.ReadTimeout = s3DefaultReadTimeout
86         }
87
88         client := s3.New(auth, region)
89         if region.EC2Endpoint.Signer == aws.V4Signature {
90                 // Currently affects only eu-central-1
91                 client.Signature = aws.V4Signature
92         }
93         client.ConnectTimeout = time.Duration(v.ConnectTimeout)
94         client.ReadTimeout = time.Duration(v.ReadTimeout)
95         v.bucket = &s3bucket{
96                 Bucket: &s3.Bucket{
97                         S3:   client,
98                         Name: v.Bucket,
99                 },
100         }
101         // Set up prometheus metrics
102         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
103         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
104
105         return nil
106 }
107
108 const (
109         s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
110         s3DefaultConnectTimeout = arvados.Duration(time.Minute)
111 )
112
113 var (
114         // ErrS3TrashDisabled is returned by Trash if that operation
115         // is impossible with the current config.
116         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
117
118         s3ACL = s3.Private
119
120         zeroTime time.Time
121 )
122
123 const (
124         maxClockSkew  = 600 * time.Second
125         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
126 )
127
128 func s3regions() (okList []string) {
129         for r := range aws.Regions {
130                 okList = append(okList, r)
131         }
132         return
133 }
134
135 // S3Volume implements Volume using an S3 bucket.
136 type S3Volume struct {
137         AccessKey          string
138         SecretKey          string
139         Endpoint           string
140         Region             string
141         Bucket             string
142         LocationConstraint bool
143         IndexPageSize      int
144         ConnectTimeout     arvados.Duration
145         ReadTimeout        arvados.Duration
146         RaceWindow         arvados.Duration
147         UnsafeDelete       bool
148
149         cluster   *arvados.Cluster
150         volume    arvados.Volume
151         logger    logrus.FieldLogger
152         metrics   *volumeMetricsVecs
153         bucket    *s3bucket
154         startOnce sync.Once
155 }
156
157 // GetDeviceID returns a globally unique ID for the storage bucket.
158 func (v *S3Volume) GetDeviceID() string {
159         return "s3://" + v.Endpoint + "/" + v.Bucket
160 }
161
162 func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
163         ready := make(chan bool)
164         go func() {
165                 rdr, err = v.getReader(loc)
166                 close(ready)
167         }()
168         select {
169         case <-ready:
170                 return
171         case <-ctx.Done():
172                 v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
173                 go func() {
174                         <-ready
175                         if err == nil {
176                                 rdr.Close()
177                         }
178                 }()
179                 return nil, ctx.Err()
180         }
181 }
182
183 // getReader wraps (Bucket)GetReader.
184 //
185 // In situations where (Bucket)GetReader would fail because the block
186 // disappeared in a Trash race, getReader calls fixRace to recover the
187 // data, and tries again.
188 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
189         rdr, err = v.bucket.GetReader(loc)
190         err = v.translateError(err)
191         if err == nil || !os.IsNotExist(err) {
192                 return
193         }
194
195         _, err = v.bucket.Head("recent/"+loc, nil)
196         err = v.translateError(err)
197         if err != nil {
198                 // If we can't read recent/X, there's no point in
199                 // trying fixRace. Give up.
200                 return
201         }
202         if !v.fixRace(loc) {
203                 err = os.ErrNotExist
204                 return
205         }
206
207         rdr, err = v.bucket.GetReader(loc)
208         if err != nil {
209                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
210                 err = v.translateError(err)
211         }
212         return
213 }
214
215 // Get a block: copy the block data into buf, and return the number of
216 // bytes copied.
217 func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
218         rdr, err := v.getReaderWithContext(ctx, loc)
219         if err != nil {
220                 return 0, err
221         }
222
223         var n int
224         ready := make(chan bool)
225         go func() {
226                 defer close(ready)
227
228                 defer rdr.Close()
229                 n, err = io.ReadFull(rdr, buf)
230
231                 switch err {
232                 case nil, io.EOF, io.ErrUnexpectedEOF:
233                         err = nil
234                 default:
235                         err = v.translateError(err)
236                 }
237         }()
238         select {
239         case <-ctx.Done():
240                 v.logger.Debugf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
241                 rdr.Close()
242                 // Must wait for ReadFull to return, to ensure it
243                 // doesn't write to buf after we return.
244                 v.logger.Debug("s3: waiting for ReadFull() to fail")
245                 <-ready
246                 return 0, ctx.Err()
247         case <-ready:
248                 return n, err
249         }
250 }
251
252 // Compare the given data with the stored data.
253 func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
254         errChan := make(chan error, 1)
255         go func() {
256                 _, err := v.bucket.Head("recent/"+loc, nil)
257                 errChan <- err
258         }()
259         var err error
260         select {
261         case <-ctx.Done():
262                 return ctx.Err()
263         case err = <-errChan:
264         }
265         if err != nil {
266                 // Checking for "loc" itself here would interfere with
267                 // future GET requests.
268                 //
269                 // On AWS, if X doesn't exist, a HEAD or GET request
270                 // for X causes X's non-existence to be cached. Thus,
271                 // if we test for X, then create X and return a
272                 // signature to our client, the client might still get
273                 // 404 from all keepstores when trying to read it.
274                 //
275                 // To avoid this, we avoid doing HEAD X or GET X until
276                 // we know X has been written.
277                 //
278                 // Note that X might exist even though recent/X
279                 // doesn't: for example, the response to HEAD recent/X
280                 // might itself come from a stale cache. In such
281                 // cases, we will return a false negative and
282                 // PutHandler might needlessly create another replica
283                 // on a different volume. That's not ideal, but it's
284                 // better than passing the eventually-consistent
285                 // problem on to our clients.
286                 return v.translateError(err)
287         }
288         rdr, err := v.getReaderWithContext(ctx, loc)
289         if err != nil {
290                 return err
291         }
292         defer rdr.Close()
293         return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
294 }
295
296 // Put writes a block.
297 func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
298         if v.volume.ReadOnly {
299                 return MethodDisabledError
300         }
301         var opts s3.Options
302         size := len(block)
303         if size > 0 {
304                 md5, err := hex.DecodeString(loc)
305                 if err != nil {
306                         return err
307                 }
308                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
309                 // In AWS regions that use V4 signatures, we need to
310                 // provide ContentSHA256 up front. Otherwise, the S3
311                 // library reads the request body (from our buffer)
312                 // into another new buffer in order to compute the
313                 // SHA256 before sending the request -- which would
314                 // mean consuming 128 MiB of memory for the duration
315                 // of a 64 MiB write.
316                 opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
317         }
318
319         // Send the block data through a pipe, so that (if we need to)
320         // we can close the pipe early and abandon our PutReader()
321         // goroutine, without worrying about PutReader() accessing our
322         // block buffer after we release it.
323         bufr, bufw := io.Pipe()
324         go func() {
325                 io.Copy(bufw, bytes.NewReader(block))
326                 bufw.Close()
327         }()
328
329         var err error
330         ready := make(chan bool)
331         go func() {
332                 defer func() {
333                         if ctx.Err() != nil {
334                                 v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
335                         }
336                 }()
337                 defer close(ready)
338                 err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
339                 if err != nil {
340                         return
341                 }
342                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
343         }()
344         select {
345         case <-ctx.Done():
346                 v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
347                 // Our pipe might be stuck in Write(), waiting for
348                 // PutReader() to read. If so, un-stick it. This means
349                 // PutReader will get corrupt data, but that's OK: the
350                 // size and MD5 won't match, so the write will fail.
351                 go io.Copy(ioutil.Discard, bufr)
352                 // CloseWithError() will return once pending I/O is done.
353                 bufw.CloseWithError(ctx.Err())
354                 v.logger.Debugf("%s: abandoning PutReader goroutine", v)
355                 return ctx.Err()
356         case <-ready:
357                 // Unblock pipe in case PutReader did not consume it.
358                 io.Copy(ioutil.Discard, bufr)
359                 return v.translateError(err)
360         }
361 }
362
363 // Touch sets the timestamp for the given locator to the current time.
364 func (v *S3Volume) Touch(loc string) error {
365         if v.volume.ReadOnly {
366                 return MethodDisabledError
367         }
368         _, err := v.bucket.Head(loc, nil)
369         err = v.translateError(err)
370         if os.IsNotExist(err) && v.fixRace(loc) {
371                 // The data object got trashed in a race, but fixRace
372                 // rescued it.
373         } else if err != nil {
374                 return err
375         }
376         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
377         return v.translateError(err)
378 }
379
380 // Mtime returns the stored timestamp for the given locator.
381 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
382         _, err := v.bucket.Head(loc, nil)
383         if err != nil {
384                 return zeroTime, v.translateError(err)
385         }
386         resp, err := v.bucket.Head("recent/"+loc, nil)
387         err = v.translateError(err)
388         if os.IsNotExist(err) {
389                 // The data object X exists, but recent/X is missing.
390                 err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
391                 if err != nil {
392                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
393                         return zeroTime, v.translateError(err)
394                 }
395                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
396                 resp, err = v.bucket.Head("recent/"+loc, nil)
397                 if err != nil {
398                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
399                         return zeroTime, v.translateError(err)
400                 }
401         } else if err != nil {
402                 // HEAD recent/X failed for some other reason.
403                 return zeroTime, err
404         }
405         return v.lastModified(resp)
406 }
407
408 // IndexTo writes a complete list of locators with the given prefix
409 // for which Get() can retrieve data.
410 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
411         // Use a merge sort to find matching sets of X and recent/X.
412         dataL := s3Lister{
413                 Bucket:   v.bucket.Bucket,
414                 Prefix:   prefix,
415                 PageSize: v.IndexPageSize,
416                 Stats:    &v.bucket.stats,
417         }
418         recentL := s3Lister{
419                 Bucket:   v.bucket.Bucket,
420                 Prefix:   "recent/" + prefix,
421                 PageSize: v.IndexPageSize,
422                 Stats:    &v.bucket.stats,
423         }
424         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
425                 if data.Key >= "g" {
426                         // Conveniently, "recent/*" and "trash/*" are
427                         // lexically greater than all hex-encoded data
428                         // hashes, so stopping here avoids iterating
429                         // over all of them needlessly with dataL.
430                         break
431                 }
432                 if !v.isKeepBlock(data.Key) {
433                         continue
434                 }
435
436                 // stamp is the list entry we should use to report the
437                 // last-modified time for this data block: it will be
438                 // the recent/X entry if one exists, otherwise the
439                 // entry for the data block itself.
440                 stamp := data
441
442                 // Advance to the corresponding recent/X marker, if any
443                 for recent != nil && recentL.Error() == nil {
444                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
445                                 recent = recentL.Next()
446                                 continue
447                         } else if cmp == 0 {
448                                 stamp = recent
449                                 recent = recentL.Next()
450                                 break
451                         } else {
452                                 // recent/X marker is missing: we'll
453                                 // use the timestamp on the data
454                                 // object.
455                                 break
456                         }
457                 }
458                 if err := recentL.Error(); err != nil {
459                         return err
460                 }
461                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
462                 if err != nil {
463                         return err
464                 }
465                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
466         }
467         return dataL.Error()
468 }
469
470 // Trash a Keep block.
471 func (v *S3Volume) Trash(loc string) error {
472         if v.volume.ReadOnly {
473                 return MethodDisabledError
474         }
475         if t, err := v.Mtime(loc); err != nil {
476                 return err
477         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
478                 return nil
479         }
480         if v.cluster.Collections.BlobTrashLifetime == 0 {
481                 if !v.UnsafeDelete {
482                         return ErrS3TrashDisabled
483                 }
484                 return v.translateError(v.bucket.Del(loc))
485         }
486         err := v.checkRaceWindow(loc)
487         if err != nil {
488                 return err
489         }
490         err = v.safeCopy("trash/"+loc, loc)
491         if err != nil {
492                 return err
493         }
494         return v.translateError(v.bucket.Del(loc))
495 }
496
497 // checkRaceWindow returns a non-nil error if trash/loc is, or might
498 // be, in the race window (i.e., it's not safe to trash loc).
499 func (v *S3Volume) checkRaceWindow(loc string) error {
500         resp, err := v.bucket.Head("trash/"+loc, nil)
501         err = v.translateError(err)
502         if os.IsNotExist(err) {
503                 // OK, trash/X doesn't exist so we're not in the race
504                 // window
505                 return nil
506         } else if err != nil {
507                 // Error looking up trash/X. We don't know whether
508                 // we're in the race window
509                 return err
510         }
511         t, err := v.lastModified(resp)
512         if err != nil {
513                 // Can't parse timestamp
514                 return err
515         }
516         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
517         if safeWindow <= 0 {
518                 // We can't count on "touch trash/X" to prolong
519                 // trash/X's lifetime. The new timestamp might not
520                 // become visible until now+raceWindow, and EmptyTrash
521                 // is allowed to delete trash/X before then.
522                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
523         }
524         // trash/X exists, but it won't be eligible for deletion until
525         // after now+raceWindow, so it's safe to overwrite it.
526         return nil
527 }
528
529 // safeCopy calls PutCopy, and checks the response to make sure the
530 // copy succeeded and updated the timestamp on the destination object
531 // (PutCopy returns 200 OK if the request was received, even if the
532 // copy failed).
533 func (v *S3Volume) safeCopy(dst, src string) error {
534         resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
535                 ContentType:       "application/octet-stream",
536                 MetadataDirective: "REPLACE",
537         }, v.bucket.Name+"/"+src)
538         err = v.translateError(err)
539         if os.IsNotExist(err) {
540                 return err
541         } else if err != nil {
542                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
543         }
544         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
545                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
546         } else if time.Now().Sub(t) > maxClockSkew {
547                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
548         }
549         return nil
550 }
551
552 // Get the LastModified header from resp, and parse it as RFC1123 or
553 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
554 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
555         s := resp.Header.Get("Last-Modified")
556         t, err = time.Parse(time.RFC1123, s)
557         if err != nil && s != "" {
558                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
559                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
560                 // as required by HTTP spec. If it's not a valid HTTP
561                 // header value, it's probably AWS (or s3test) giving
562                 // us a nearly-RFC1123 timestamp.
563                 t, err = time.Parse(nearlyRFC1123, s)
564         }
565         return
566 }
567
568 // Untrash moves block from trash back into store
569 func (v *S3Volume) Untrash(loc string) error {
570         err := v.safeCopy(loc, "trash/"+loc)
571         if err != nil {
572                 return err
573         }
574         err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
575         return v.translateError(err)
576 }
577
578 // Status returns a *VolumeStatus representing the current in-use
579 // storage capacity and a fake available capacity that doesn't make
580 // the volume seem full or nearly-full.
581 func (v *S3Volume) Status() *VolumeStatus {
582         return &VolumeStatus{
583                 DeviceNum: 1,
584                 BytesFree: BlockSize * 1000,
585                 BytesUsed: 1,
586         }
587 }
588
589 // InternalStats returns bucket I/O and API call counters.
590 func (v *S3Volume) InternalStats() interface{} {
591         return &v.bucket.stats
592 }
593
594 // String implements fmt.Stringer.
595 func (v *S3Volume) String() string {
596         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
597 }
598
599 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
600
601 func (v *S3Volume) isKeepBlock(s string) bool {
602         return s3KeepBlockRegexp.MatchString(s)
603 }
604
605 // fixRace(X) is called when "recent/X" exists but "X" doesn't
606 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
607 // there was a race between Put and Trash, fixRace recovers from the
608 // race by Untrashing the block.
609 func (v *S3Volume) fixRace(loc string) bool {
610         trash, err := v.bucket.Head("trash/"+loc, nil)
611         if err != nil {
612                 if !os.IsNotExist(v.translateError(err)) {
613                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
614                 }
615                 return false
616         }
617         trashTime, err := v.lastModified(trash)
618         if err != nil {
619                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
620                 return false
621         }
622
623         recent, err := v.bucket.Head("recent/"+loc, nil)
624         if err != nil {
625                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
626                 return false
627         }
628         recentTime, err := v.lastModified(recent)
629         if err != nil {
630                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
631                 return false
632         }
633
634         ageWhenTrashed := trashTime.Sub(recentTime)
635         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
636                 // No evidence of a race: block hasn't been written
637                 // since it became eligible for Trash. No fix needed.
638                 return false
639         }
640
641         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
642         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
643         err = v.safeCopy(loc, "trash/"+loc)
644         if err != nil {
645                 log.Printf("error: fixRace: %s", err)
646                 return false
647         }
648         return true
649 }
650
651 func (v *S3Volume) translateError(err error) error {
652         switch err := err.(type) {
653         case *s3.Error:
654                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
655                         strings.Contains(err.Error(), "Not Found") {
656                         return os.ErrNotExist
657                 }
658                 // Other 404 errors like NoSuchVersion and
659                 // NoSuchBucket are different problems which should
660                 // get called out downstream, so we don't convert them
661                 // to os.ErrNotExist.
662         }
663         return err
664 }
665
666 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
667 // and deletes them from the volume.
668 func (v *S3Volume) EmptyTrash() {
669         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
670                 return
671         }
672
673         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
674
675         // Define "ready to delete" as "...when EmptyTrash started".
676         startT := time.Now()
677
678         emptyOneKey := func(trash *s3.Key) {
679                 loc := trash.Key[6:]
680                 if !v.isKeepBlock(loc) {
681                         return
682                 }
683                 atomic.AddInt64(&bytesInTrash, trash.Size)
684                 atomic.AddInt64(&blocksInTrash, 1)
685
686                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
687                 if err != nil {
688                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
689                         return
690                 }
691                 recent, err := v.bucket.Head("recent/"+loc, nil)
692                 if err != nil && os.IsNotExist(v.translateError(err)) {
693                         log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
694                         err = v.Untrash(loc)
695                         if err != nil {
696                                 log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
697                         }
698                         return
699                 } else if err != nil {
700                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
701                         return
702                 }
703                 recentT, err := v.lastModified(recent)
704                 if err != nil {
705                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
706                         return
707                 }
708                 if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
709                         if age := startT.Sub(recentT); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
710                                 // recent/loc is too old to protect
711                                 // loc from being Trashed again during
712                                 // the raceWindow that starts if we
713                                 // delete trash/X now.
714                                 //
715                                 // Note this means (TrashCheckInterval
716                                 // < BlobSignatureTTL - raceWindow) is
717                                 // necessary to avoid starvation.
718                                 log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
719                                 v.fixRace(loc)
720                                 v.Touch(loc)
721                                 return
722                         }
723                         _, err := v.bucket.Head(loc, nil)
724                         if os.IsNotExist(err) {
725                                 log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
726                                 v.fixRace(loc)
727                                 return
728                         } else if err != nil {
729                                 log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
730                                 return
731                         }
732                 }
733                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
734                         return
735                 }
736                 err = v.bucket.Del(trash.Key)
737                 if err != nil {
738                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
739                         return
740                 }
741                 atomic.AddInt64(&bytesDeleted, trash.Size)
742                 atomic.AddInt64(&blocksDeleted, 1)
743
744                 _, err = v.bucket.Head(loc, nil)
745                 if err == nil {
746                         log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
747                         return
748                 }
749                 if !os.IsNotExist(v.translateError(err)) {
750                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
751                         return
752                 }
753                 err = v.bucket.Del("recent/" + loc)
754                 if err != nil {
755                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
756                 }
757         }
758
759         var wg sync.WaitGroup
760         todo := make(chan *s3.Key, v.cluster.Collections.BlobDeleteConcurrency)
761         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
762                 wg.Add(1)
763                 go func() {
764                         defer wg.Done()
765                         for key := range todo {
766                                 emptyOneKey(key)
767                         }
768                 }()
769         }
770
771         trashL := s3Lister{
772                 Bucket:   v.bucket.Bucket,
773                 Prefix:   "trash/",
774                 PageSize: v.IndexPageSize,
775                 Stats:    &v.bucket.stats,
776         }
777         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
778                 todo <- trash
779         }
780         close(todo)
781         wg.Wait()
782
783         if err := trashL.Error(); err != nil {
784                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
785         }
786         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
787 }
788
789 type s3Lister struct {
790         Bucket     *s3.Bucket
791         Prefix     string
792         PageSize   int
793         Stats      *s3bucketStats
794         nextMarker string
795         buf        []s3.Key
796         err        error
797 }
798
799 // First fetches the first page and returns the first item. It returns
800 // nil if the response is the empty set or an error occurs.
801 func (lister *s3Lister) First() *s3.Key {
802         lister.getPage()
803         return lister.pop()
804 }
805
806 // Next returns the next item, fetching the next page if necessary. It
807 // returns nil if the last available item has already been fetched, or
808 // an error occurs.
809 func (lister *s3Lister) Next() *s3.Key {
810         if len(lister.buf) == 0 && lister.nextMarker != "" {
811                 lister.getPage()
812         }
813         return lister.pop()
814 }
815
816 // Return the most recent error encountered by First or Next.
817 func (lister *s3Lister) Error() error {
818         return lister.err
819 }
820
821 func (lister *s3Lister) getPage() {
822         lister.Stats.TickOps("list")
823         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
824         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
825         lister.nextMarker = ""
826         if err != nil {
827                 lister.err = err
828                 return
829         }
830         if resp.IsTruncated {
831                 lister.nextMarker = resp.NextMarker
832         }
833         lister.buf = make([]s3.Key, 0, len(resp.Contents))
834         for _, key := range resp.Contents {
835                 if !strings.HasPrefix(key.Key, lister.Prefix) {
836                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
837                         continue
838                 }
839                 lister.buf = append(lister.buf, key)
840         }
841 }
842
843 func (lister *s3Lister) pop() (k *s3.Key) {
844         if len(lister.buf) > 0 {
845                 k = &lister.buf[0]
846                 lister.buf = lister.buf[1:]
847         }
848         return
849 }
850
851 // s3bucket wraps s3.bucket and counts I/O and API usage stats.
852 type s3bucket struct {
853         *s3.Bucket
854         stats s3bucketStats
855 }
856
857 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
858         rdr, err := b.Bucket.GetReader(path)
859         b.stats.TickOps("get")
860         b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
861         b.stats.TickErr(err)
862         return NewCountingReader(rdr, b.stats.TickInBytes), err
863 }
864
865 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
866         resp, err := b.Bucket.Head(path, headers)
867         b.stats.TickOps("head")
868         b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
869         b.stats.TickErr(err)
870         return resp, err
871 }
872
873 func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
874         if length == 0 {
875                 // goamz will only send Content-Length: 0 when reader
876                 // is nil due to net.http.Request.ContentLength
877                 // behavior.  Otherwise, Content-Length header is
878                 // omitted which will cause some S3 services
879                 // (including AWS and Ceph RadosGW) to fail to create
880                 // empty objects.
881                 r = nil
882         } else {
883                 r = NewCountingReader(r, b.stats.TickOutBytes)
884         }
885         err := b.Bucket.PutReader(path, r, length, contType, perm, options)
886         b.stats.TickOps("put")
887         b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
888         b.stats.TickErr(err)
889         return err
890 }
891
892 func (b *s3bucket) Del(path string) error {
893         err := b.Bucket.Del(path)
894         b.stats.TickOps("delete")
895         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
896         b.stats.TickErr(err)
897         return err
898 }
899
900 type s3bucketStats struct {
901         statsTicker
902         Ops     uint64
903         GetOps  uint64
904         PutOps  uint64
905         HeadOps uint64
906         DelOps  uint64
907         ListOps uint64
908 }
909
910 func (s *s3bucketStats) TickErr(err error) {
911         if err == nil {
912                 return
913         }
914         errType := fmt.Sprintf("%T", err)
915         if err, ok := err.(*s3.Error); ok {
916                 errType = errType + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
917         }
918         s.statsTicker.TickErr(err, errType)
919 }