+
+// s3bucket wraps s3.bucket and counts I/O and API usage stats.
+type s3bucket struct {
+ *s3.Bucket
+ stats s3bucketStats
+}
+
+func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
+ rdr, err := b.Bucket.GetReader(path)
+ b.stats.tick(&b.stats.Ops, &b.stats.GetOps)
+ b.stats.tickErr(err)
+ return NewCountingReader(rdr, b.stats.tickInBytes), err
+}
+
+func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
+ resp, err := b.Bucket.Head(path, headers)
+ b.stats.tick(&b.stats.Ops, &b.stats.HeadOps)
+ b.stats.tickErr(err)
+ return resp, err
+}
+
+func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
+ err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.tickOutBytes), length, contType, perm, options)
+ b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+ b.stats.tickErr(err)
+ return err
+}
+
+func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
+ err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.tickOutBytes), int64(len(data)), contType, perm, options)
+ b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+ b.stats.tickErr(err)
+ return err
+}
+
+func (b *s3bucket) Del(path string) error {
+ err := b.Bucket.Del(path)
+ b.stats.tick(&b.stats.Ops, &b.stats.DelOps)
+ b.stats.tickErr(err)
+ return err
+}
+
+type s3bucketStats struct {
+ Errors uint64
+ Ops uint64
+ GetOps uint64
+ PutOps uint64
+ HeadOps uint64
+ DelOps uint64
+ ListOps uint64
+ InBytes uint64
+ OutBytes uint64
+
+ ErrorCodes map[string]uint64 `json:",omitempty"`
+
+ lock sync.Mutex
+}
+
+func (s *s3bucketStats) tickInBytes(n uint64) {
+ atomic.AddUint64(&s.InBytes, n)
+}
+
+func (s *s3bucketStats) tickOutBytes(n uint64) {
+ atomic.AddUint64(&s.OutBytes, n)
+}
+
+func (s *s3bucketStats) tick(counters ...*uint64) {
+ for _, counter := range counters {
+ atomic.AddUint64(counter, 1)
+ }
+}
+
+func (s *s3bucketStats) tickErr(err error) {
+ if err == nil {
+ return
+ }
+ atomic.AddUint64(&s.Errors, 1)
+ errStr := fmt.Sprintf("%T", err)
+ if err, ok := err.(*s3.Error); ok {
+ errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
+ }
+ s.lock.Lock()
+ if s.ErrorCodes == nil {
+ s.ErrorCodes = make(map[string]uint64)
+ }
+ s.ErrorCodes[errStr]++
+ s.lock.Unlock()
+}