X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6fb784416db6651b33b921a0684c2f8de84410fc..515a58c0ef8634fca2397a8609f868524a42132c:/services/keepstore/s3_volume.go diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 17923f807d..48ba95b488 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -15,6 +15,7 @@ import ( "regexp" "strings" "sync" + "sync/atomic" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" @@ -148,11 +149,28 @@ type S3Volume struct { ReadOnly bool UnsafeDelete bool - bucket *s3.Bucket + bucket *s3.Bucket + bucketStats bucketStats + volumeStats ioStats startOnce sync.Once } +type bucketStats struct { + Errors uint64 + Ops uint64 + GetOps uint64 + PutOps uint64 + HeadOps uint64 + DelOps uint64 + InBytes uint64 + OutBytes uint64 + + ErrorCodes map[string]uint64 `json:",omitempty"` + + lock sync.Mutex +} + // Examples implements VolumeWithExamples. func (*S3Volume) Examples() []Volume { return []Volume{ @@ -264,13 +282,19 @@ func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io // disappeared in a Trash race, getReader calls fixRace to recover the // data, and tries again. func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) { + v.tick(&v.bucketStats.Ops, &v.bucketStats.GetOps) rdr, err = v.bucket.GetReader(loc) err = v.translateError(err) - if err == nil || !os.IsNotExist(err) { + if err == nil { + rdr = NewCountingReader(rdr, v.tickInBytes) + return + } else if !os.IsNotExist(v.tickErr(err)) { return } + + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) _, err = v.bucket.Head("recent/"+loc, nil) - err = v.translateError(err) + err = v.translateError(v.tickErr(err)) if err != nil { // If we can't read recent/X, there's no point in // trying fixRace. Give up. @@ -280,11 +304,14 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) { err = os.ErrNotExist return } + + v.tick(&v.bucketStats.Ops, &v.bucketStats.GetOps) rdr, err = v.bucket.GetReader(loc) if err != nil { log.Printf("warning: reading %s after successful fixRace: %s", loc, err) - err = v.translateError(err) + err = v.translateError(v.tickErr(err)) } + rdr = NewCountingReader(rdr, v.tickInBytes) return } @@ -369,11 +396,16 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { } }() defer close(ready) - err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts) + v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps) + rdr := NewCountingReader(bufr, v.tickOutBytes) + err = v.bucket.PutReader(loc, rdr, int64(size), "application/octet-stream", s3ACL, opts) if err != nil { + v.tickErr(err) return } + v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps) err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{}) + v.tickErr(err) }() select { case <-ctx.Done(): @@ -397,38 +429,44 @@ func (v *S3Volume) Touch(loc string) error { if v.ReadOnly { return MethodDisabledError } + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) _, err := v.bucket.Head(loc, nil) - err = v.translateError(err) + err = v.translateError(v.tickErr(err)) if os.IsNotExist(err) && v.fixRace(loc) { // The data object got trashed in a race, but fixRace // rescued it. } else if err != nil { return err } + v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps) err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{}) - return v.translateError(err) + return v.translateError(v.tickErr(err)) } // Mtime returns the stored timestamp for the given locator. func (v *S3Volume) Mtime(loc string) (time.Time, error) { + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) _, err := v.bucket.Head(loc, nil) if err != nil { - return zeroTime, v.translateError(err) + return zeroTime, v.translateError(v.tickErr(err)) } + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) resp, err := v.bucket.Head("recent/"+loc, nil) - err = v.translateError(err) + err = v.translateError(v.tickErr(err)) if os.IsNotExist(err) { // The data object X exists, but recent/X is missing. + v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps) err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{}) if err != nil { log.Printf("error: creating %q: %s", "recent/"+loc, err) - return zeroTime, v.translateError(err) + return zeroTime, v.translateError(v.tickErr(err)) } log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc) + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) resp, err = v.bucket.Head("recent/"+loc, nil) if err != nil { log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err) - return zeroTime, v.translateError(err) + return zeroTime, v.translateError(v.tickErr(err)) } } else if err != nil { // HEAD recent/X failed for some other reason. @@ -508,7 +546,8 @@ func (v *S3Volume) Trash(loc string) error { if !s3UnsafeDelete { return ErrS3TrashDisabled } - return v.bucket.Del(loc) + v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps) + return v.translateError(v.tickErr(v.bucket.Del(loc))) } err := v.checkRaceWindow(loc) if err != nil { @@ -518,14 +557,16 @@ func (v *S3Volume) Trash(loc string) error { if err != nil { return err } - return v.translateError(v.bucket.Del(loc)) + v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps) + return v.translateError(v.tickErr(v.bucket.Del(loc))) } // checkRaceWindow returns a non-nil error if trash/loc is, or might // be, in the race window (i.e., it's not safe to trash loc). func (v *S3Volume) checkRaceWindow(loc string) error { + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) resp, err := v.bucket.Head("trash/"+loc, nil) - err = v.translateError(err) + err = v.translateError(v.tickErr(err)) if os.IsNotExist(err) { // OK, trash/X doesn't exist so we're not in the race // window @@ -558,11 +599,12 @@ func (v *S3Volume) checkRaceWindow(loc string) error { // (PutCopy returns 200 OK if the request was received, even if the // copy failed). func (v *S3Volume) safeCopy(dst, src string) error { + v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps) resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{ ContentType: "application/octet-stream", MetadataDirective: "REPLACE", }, v.bucket.Name+"/"+src) - err = v.translateError(err) + err = v.translateError(v.tickErr(err)) if err != nil { return err } @@ -596,8 +638,9 @@ func (v *S3Volume) Untrash(loc string) error { if err != nil { return err } + v.tick(&v.bucketStats.Ops, &v.bucketStats.PutOps) err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{}) - return v.translateError(err) + return v.translateError(v.tickErr(err)) } // Status returns a *VolumeStatus representing the current in-use @@ -611,9 +654,14 @@ func (v *S3Volume) Status() *VolumeStatus { } } +// IOStatus implements InternalStatser. +func (v *S3Volume) InternalStats() interface{} { + return &v.bucketStats +} + // String implements fmt.Stringer. func (v *S3Volume) String() string { - return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name) + return fmt.Sprintf("s3-bucket:%+q", v.Bucket) } // Writable returns false if all future Put, Mtime, and Delete calls @@ -639,9 +687,10 @@ func (v *S3Volume) isKeepBlock(s string) bool { // there was a race between Put and Trash, fixRace recovers from the // race by Untrashing the block. func (v *S3Volume) fixRace(loc string) bool { + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) trash, err := v.bucket.Head("trash/"+loc, nil) if err != nil { - if !os.IsNotExist(v.translateError(err)) { + if !os.IsNotExist(v.translateError(v.tickErr(err))) { log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err) } return false @@ -652,8 +701,10 @@ func (v *S3Volume) fixRace(loc string) bool { return false } + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) recent, err := v.bucket.Head("recent/"+loc, nil) if err != nil { + v.tickErr(err) log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err) return false } @@ -721,8 +772,9 @@ func (v *S3Volume) EmptyTrash() { log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err) continue } + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) recent, err := v.bucket.Head("recent/"+loc, nil) - if err != nil && os.IsNotExist(v.translateError(err)) { + if err != nil && os.IsNotExist(v.translateError(v.tickErr(err))) { log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err) err = v.Untrash(loc) if err != nil { @@ -752,7 +804,10 @@ func (v *S3Volume) EmptyTrash() { v.fixRace(loc) v.Touch(loc) continue - } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) { + } + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) + _, err := v.bucket.Head(loc, nil) + if os.IsNotExist(v.tickErr(err)) { log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc) v.fixRace(loc) continue @@ -764,18 +819,23 @@ func (v *S3Volume) EmptyTrash() { if startT.Sub(trashT) < theConfig.TrashLifetime.Duration() { continue } + v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps) err = v.bucket.Del(trash.Key) if err != nil { + v.tickErr(err) log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err) continue } bytesDeleted += trash.Size blocksDeleted++ + v.tick(&v.bucketStats.Ops, &v.bucketStats.HeadOps) _, err = v.bucket.Head(loc, nil) - if os.IsNotExist(err) { + if os.IsNotExist(v.tickErr(err)) { + v.tick(&v.bucketStats.Ops, &v.bucketStats.DelOps) err = v.bucket.Del("recent/" + loc) if err != nil { + v.tickErr(err) log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err) } } else if err != nil { @@ -788,6 +848,37 @@ func (v *S3Volume) EmptyTrash() { log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) } +func (v *S3Volume) tick(counters ...*uint64) { + for _, counter := range counters { + atomic.AddUint64(counter, 1) + } +} + +func (v *S3Volume) tickErr(err error) error { + if err == nil { + return nil + } + atomic.AddUint64(&v.bucketStats.Errors, 1) + if err, ok := err.(*s3.Error); ok { + errStr := fmt.Sprintf("%d %s", err.StatusCode, err.Code) + v.bucketStats.lock.Lock() + if v.bucketStats.ErrorCodes == nil { + v.bucketStats.ErrorCodes = make(map[string]uint64) + } + v.bucketStats.ErrorCodes[errStr]++ + v.bucketStats.lock.Unlock() + } + return err +} + +func (v *S3Volume) tickInBytes(n uint64) { + atomic.AddUint64(&v.bucketStats.InBytes, n) +} + +func (v *S3Volume) tickOutBytes(n uint64) { + atomic.AddUint64(&v.bucketStats.OutBytes, n) +} + type s3Lister struct { Bucket *s3.Bucket Prefix string