//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"bufio"
}
func (v *S3Volume) bootstrapIAMCredentials() error {
- if v.AccessKey != "" || v.SecretKey != "" {
+ if v.AccessKeyID != "" || v.SecretAccessKey != "" {
if v.IAMRole != "" {
- return errors.New("invalid DriverParameters: AccessKey and SecretKey must be blank if IAMRole is specified")
+ return errors.New("invalid DriverParameters: AccessKeyID and SecretAccessKey must be blank if IAMRole is specified")
}
return nil
}
}
func (v *S3Volume) newS3Client() *s3.S3 {
- auth := aws.NewAuth(v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration)
+ auth := aws.NewAuth(v.AccessKeyID, v.SecretAccessKey, v.AuthToken, v.AuthExpiration)
client := s3.New(*auth, v.region)
if !v.V2Signature {
client.Signature = aws.V4Signature
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
- return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKey and SecretKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status)
+ return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKeyID and SecretAccessKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status)
} else if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
}
if err != nil {
return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
}
- v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
+ v.AccessKeyID, v.SecretAccessKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
v.bucket.SetBucket(&s3.Bucket{
S3: v.newS3Client(),
Name: v.Bucket,
return ttl, nil
}
-func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+func (v *S3Volume) getReaderWithContext(ctx context.Context, key string) (rdr io.ReadCloser, err error) {
ready := make(chan bool)
go func() {
- rdr, err = v.getReader(loc)
+ rdr, err = v.getReader(key)
close(ready)
}()
select {
case <-ready:
return
case <-ctx.Done():
- v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
+ v.logger.Debugf("s3: abandoning getReader(%s): %s", key, ctx.Err())
go func() {
<-ready
if err == nil {
// In situations where (Bucket)GetReader would fail because the block
// disappeared in a Trash race, getReader calls fixRace to recover the
// data, and tries again.
-func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
- rdr, err = v.bucket.GetReader(loc)
+func (v *S3Volume) getReader(key string) (rdr io.ReadCloser, err error) {
+ rdr, err = v.bucket.GetReader(key)
err = v.translateError(err)
if err == nil || !os.IsNotExist(err) {
return
}
- _, err = v.bucket.Head("recent/"+loc, nil)
+ _, err = v.bucket.Head("recent/"+key, nil)
err = v.translateError(err)
if err != nil {
// If we can't read recent/X, there's no point in
// trying fixRace. Give up.
return
}
- if !v.fixRace(loc) {
+ if !v.fixRace(key) {
err = os.ErrNotExist
return
}
- rdr, err = v.bucket.GetReader(loc)
+ rdr, err = v.bucket.GetReader(key)
if err != nil {
- v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
+ v.logger.Warnf("reading %s after successful fixRace: %s", key, err)
err = v.translateError(err)
}
return
// Get a block: copy the block data into buf, and return the number of
// bytes copied.
func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
- rdr, err := v.getReaderWithContext(ctx, loc)
+ key := v.key(loc)
+ rdr, err := v.getReaderWithContext(ctx, key)
if err != nil {
return 0, err
}
// Compare the given data with the stored data.
func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+ key := v.key(loc)
errChan := make(chan error, 1)
go func() {
- _, err := v.bucket.Head("recent/"+loc, nil)
+ _, err := v.bucket.Head("recent/"+key, nil)
errChan <- err
}()
var err error
// problem on to our clients.
return v.translateError(err)
}
- rdr, err := v.getReaderWithContext(ctx, loc)
+ rdr, err := v.getReaderWithContext(ctx, key)
if err != nil {
return err
}
opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
}
+ key := v.key(loc)
+
// Send the block data through a pipe, so that (if we need to)
// we can close the pipe early and abandon our PutReader()
// goroutine, without worrying about PutReader() accessing our
}
}()
defer close(ready)
- err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+ err = v.bucket.PutReader(key, bufr, int64(size), "application/octet-stream", s3ACL, opts)
if err != nil {
return
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
}()
select {
case <-ctx.Done():
if v.volume.ReadOnly {
return MethodDisabledError
}
- _, err := v.bucket.Head(loc, nil)
+ key := v.key(loc)
+ _, err := v.bucket.Head(key, nil)
err = v.translateError(err)
- if os.IsNotExist(err) && v.fixRace(loc) {
+ if os.IsNotExist(err) && v.fixRace(key) {
// The data object got trashed in a race, but fixRace
// rescued it.
} else if err != nil {
return err
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// Mtime returns the stored timestamp for the given locator.
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
- _, err := v.bucket.Head(loc, nil)
+ key := v.key(loc)
+ _, err := v.bucket.Head(key, nil)
if err != nil {
return zeroTime, v.translateError(err)
}
- resp, err := v.bucket.Head("recent/"+loc, nil)
+ resp, err := v.bucket.Head("recent/"+key, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// The data object X exists, but recent/X is missing.
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
+ v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
return zeroTime, v.translateError(err)
}
- v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
- resp, err = v.bucket.Head("recent/"+loc, nil)
+ v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+key)
+ resp, err = v.bucket.Head("recent/"+key, nil)
if err != nil {
- v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
+ v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
return zeroTime, v.translateError(err)
}
} else if err != nil {
dataL := s3Lister{
Logger: v.logger,
Bucket: v.bucket.Bucket(),
- Prefix: prefix,
+ Prefix: v.key(prefix),
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
recentL := s3Lister{
Logger: v.logger,
Bucket: v.bucket.Bucket(),
- Prefix: "recent/" + prefix,
+ Prefix: "recent/" + v.key(prefix),
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
// over all of them needlessly with dataL.
break
}
- if !v.isKeepBlock(data.Key) {
+ loc, isBlk := v.isKeepBlock(data.Key)
+ if !isBlk {
continue
}
// We truncate sub-second precision here. Otherwise
// timestamps will never match the RFC1123-formatted
// Last-Modified values parsed by Mtime().
- fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.Unix()*1000000000)
+ fmt.Fprintf(writer, "%s+%d %d\n", loc, data.Size, t.Unix()*1000000000)
}
return dataL.Error()
}
} else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
return nil
}
+ key := v.key(loc)
if v.cluster.Collections.BlobTrashLifetime == 0 {
if !v.UnsafeDelete {
return ErrS3TrashDisabled
}
- return v.translateError(v.bucket.Del(loc))
+ return v.translateError(v.bucket.Del(key))
}
- err := v.checkRaceWindow(loc)
+ err := v.checkRaceWindow(key)
if err != nil {
return err
}
- err = v.safeCopy("trash/"+loc, loc)
+ err = v.safeCopy("trash/"+key, key)
if err != nil {
return err
}
- return v.translateError(v.bucket.Del(loc))
+ return v.translateError(v.bucket.Del(key))
}
-// checkRaceWindow returns a non-nil error if trash/loc is, or might
-// be, in the race window (i.e., it's not safe to trash loc).
-func (v *S3Volume) checkRaceWindow(loc string) error {
- resp, err := v.bucket.Head("trash/"+loc, nil)
+// checkRaceWindow returns a non-nil error if trash/key is, or might
+// be, in the race window (i.e., it's not safe to trash key).
+func (v *S3Volume) checkRaceWindow(key string) error {
+ resp, err := v.bucket.Head("trash/"+key, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// OK, trash/X doesn't exist so we're not in the race
// trash/X's lifetime. The new timestamp might not
// become visible until now+raceWindow, and EmptyTrash
// is allowed to delete trash/X before then.
- return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+ return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
}
// trash/X exists, but it won't be eligible for deletion until
// after now+raceWindow, so it's safe to overwrite it.
// Untrash moves block from trash back into store
func (v *S3Volume) Untrash(loc string) error {
- err := v.safeCopy(loc, "trash/"+loc)
+ key := v.key(loc)
+ err := v.safeCopy(key, "trash/"+key)
if err != nil {
return err
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-func (v *S3Volume) isKeepBlock(s string) bool {
- return s3KeepBlockRegexp.MatchString(s)
+func (v *S3Volume) isKeepBlock(s string) (string, bool) {
+ if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
+ s = s[v.PrefixLength+1:]
+ }
+ return s, s3KeepBlockRegexp.MatchString(s)
+}
+
+// Return the key used for a given loc. If PrefixLength==0 then
+// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
+// "abc/abcdef0123", etc.
+func (v *S3Volume) key(loc string) string {
+ if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
+ return loc[:v.PrefixLength] + "/" + loc
+ } else {
+ return loc
+ }
}
// fixRace(X) is called when "recent/X" exists but "X" doesn't
-// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
-// there was a race between Put and Trash, fixRace recovers from the
-// race by Untrashing the block.
-func (v *S3Volume) fixRace(loc string) bool {
- trash, err := v.bucket.Head("trash/"+loc, nil)
+// exist. If the timestamps on "recent/X" and "trash/X" indicate there
+// was a race between Put and Trash, fixRace recovers from the race by
+// Untrashing the block.
+func (v *S3Volume) fixRace(key string) bool {
+ trash, err := v.bucket.Head("trash/"+key, nil)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
- v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
}
return false
}
return false
}
- recent, err := v.bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+key, nil)
if err != nil {
- v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
return false
}
recentTime, err := v.lastModified(recent)
return false
}
- v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
- v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
- err = v.safeCopy(loc, "trash/"+loc)
+ v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+ v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
+ err = v.safeCopy(key, "trash/"+key)
if err != nil {
v.logger.WithError(err).Error("fixRace: copy failed")
return false
startT := time.Now()
emptyOneKey := func(trash *s3.Key) {
- loc := trash.Key[6:]
- if !v.isKeepBlock(loc) {
+ key := trash.Key[6:]
+ loc, isBlk := v.isKeepBlock(key)
+ if !isBlk {
return
}
atomic.AddInt64(&bytesInTrash, trash.Size)
v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
return
}
- recent, err := v.bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+key, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
err = v.Untrash(loc)
}
return
} else if err != nil {
- v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
return
}
recentT, err := v.lastModified(recent)
if err != nil {
- v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
+ v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+key, recent.Header.Get("Last-Modified"))
return
}
if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
// < BlobSigningTTL - raceWindow) is
// necessary to avoid starvation.
v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
- v.fixRace(loc)
+ v.fixRace(key)
v.Touch(loc)
return
}
- _, err := v.bucket.Head(loc, nil)
+ _, err := v.bucket.Head(key, nil)
if os.IsNotExist(err) {
v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
- v.fixRace(loc)
+ v.fixRace(key)
return
} else if err != nil {
v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
atomic.AddInt64(&bytesDeleted, trash.Size)
atomic.AddInt64(&blocksDeleted, 1)
- _, err = v.bucket.Head(loc, nil)
+ _, err = v.bucket.Head(key, nil)
if err == nil {
- v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
+ v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", key, key)
return
}
if !os.IsNotExist(v.translateError(err)) {
- v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
return
}
- err = v.bucket.Del("recent/" + loc)
+ err = v.bucket.Del("recent/" + key)
if err != nil {
- v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
+ v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
}
}