Merge branch '12859-keepstore-fd-leak'
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/ctxlog"
26         "github.com/Azure/azure-sdk-for-go/storage"
27         "github.com/prometheus/client_golang/prometheus"
28         "github.com/sirupsen/logrus"
29 )
30
31 func init() {
32         driver["Azure"] = newAzureBlobVolume
33 }
34
35 func newAzureBlobVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
36         v := &AzureBlobVolume{
37                 RequestTimeout:    azureDefaultRequestTimeout,
38                 WriteRaceInterval: azureDefaultWriteRaceInterval,
39                 WriteRacePollTime: azureDefaultWriteRacePollTime,
40                 cluster:           cluster,
41                 volume:            volume,
42                 logger:            logger,
43                 metrics:           metrics,
44         }
45         err := json.Unmarshal(volume.DriverParameters, &v)
46         if err != nil {
47                 return nil, err
48         }
49         if v.ListBlobsRetryDelay == 0 {
50                 v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay
51         }
52         if v.ListBlobsMaxAttempts == 0 {
53                 v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts
54         }
55         if v.StorageBaseURL == "" {
56                 v.StorageBaseURL = storage.DefaultBaseURL
57         }
58         if v.ContainerName == "" || v.StorageAccountName == "" || v.StorageAccountKey == "" {
59                 return nil, errors.New("DriverParameters: ContainerName, StorageAccountName, and StorageAccountKey must be provided")
60         }
61         azc, err := storage.NewClient(v.StorageAccountName, v.StorageAccountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
62         if err != nil {
63                 return nil, fmt.Errorf("creating Azure storage client: %s", err)
64         }
65         v.azClient = azc
66         v.azClient.Sender = &singleSender{}
67         v.azClient.HTTPClient = &http.Client{
68                 Timeout: time.Duration(v.RequestTimeout),
69         }
70         bs := v.azClient.GetBlobService()
71         v.container = &azureContainer{
72                 ctr: bs.GetContainerReference(v.ContainerName),
73         }
74
75         if ok, err := v.container.Exists(); err != nil {
76                 return nil, err
77         } else if !ok {
78                 return nil, fmt.Errorf("Azure container %q does not exist: %s", v.ContainerName, err)
79         }
80         return v, v.check()
81 }
82
83 func (v *AzureBlobVolume) check() error {
84         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
85         v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
86         return nil
87 }
88
89 const (
90         azureDefaultRequestTimeout       = arvados.Duration(10 * time.Minute)
91         azureDefaultListBlobsMaxAttempts = 12
92         azureDefaultListBlobsRetryDelay  = arvados.Duration(10 * time.Second)
93         azureDefaultWriteRaceInterval    = arvados.Duration(15 * time.Second)
94         azureDefaultWriteRacePollTime    = arvados.Duration(time.Second)
95 )
96
97 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
98 // container.
99 type AzureBlobVolume struct {
100         StorageAccountName   string
101         StorageAccountKey    string
102         StorageBaseURL       string // "" means default, "core.windows.net"
103         ContainerName        string
104         RequestTimeout       arvados.Duration
105         ListBlobsRetryDelay  arvados.Duration
106         ListBlobsMaxAttempts int
107         MaxGetBytes          int
108         WriteRaceInterval    arvados.Duration
109         WriteRacePollTime    arvados.Duration
110
111         cluster   *arvados.Cluster
112         volume    arvados.Volume
113         logger    logrus.FieldLogger
114         metrics   *volumeMetricsVecs
115         azClient  storage.Client
116         container *azureContainer
117 }
118
119 // singleSender is a single-attempt storage.Sender.
120 type singleSender struct{}
121
122 // Send performs req exactly once.
123 func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
124         return c.HTTPClient.Do(req)
125 }
126
127 // Type implements Volume.
128 func (v *AzureBlobVolume) Type() string {
129         return "Azure"
130 }
131
132 // GetDeviceID returns a globally unique ID for the storage container.
133 func (v *AzureBlobVolume) GetDeviceID() string {
134         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
135 }
136
137 // Return true if expires_at metadata attribute is found on the block
138 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
139         metadata, err := v.container.GetBlobMetadata(loc)
140         if err != nil {
141                 return false, metadata, v.translateError(err)
142         }
143         if metadata["expires_at"] != "" {
144                 return true, metadata, nil
145         }
146         return false, metadata, nil
147 }
148
149 // Get reads a Keep block that has been stored as a block blob in the
150 // container.
151 //
152 // If the block is younger than azureWriteRaceInterval and is
153 // unexpectedly empty, assume a PutBlob operation is in progress, and
154 // wait for it to finish writing.
155 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
156         trashed, _, err := v.checkTrashed(loc)
157         if err != nil {
158                 return 0, err
159         }
160         if trashed {
161                 return 0, os.ErrNotExist
162         }
163         var deadline time.Time
164         haveDeadline := false
165         size, err := v.get(ctx, loc, buf)
166         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
167                 // Seeing a brand new empty block probably means we're
168                 // in a race with CreateBlob, which under the hood
169                 // (apparently) does "CreateEmpty" and "CommitData"
170                 // with no additional transaction locking.
171                 if !haveDeadline {
172                         t, err := v.Mtime(loc)
173                         if err != nil {
174                                 ctxlog.FromContext(ctx).Print("Got empty block (possible race) but Mtime failed: ", err)
175                                 break
176                         }
177                         deadline = t.Add(v.WriteRaceInterval.Duration())
178                         if time.Now().After(deadline) {
179                                 break
180                         }
181                         ctxlog.FromContext(ctx).Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
182                         haveDeadline = true
183                 } else if time.Now().After(deadline) {
184                         break
185                 }
186                 select {
187                 case <-ctx.Done():
188                         return 0, ctx.Err()
189                 case <-time.After(v.WriteRacePollTime.Duration()):
190                 }
191                 size, err = v.get(ctx, loc, buf)
192         }
193         if haveDeadline {
194                 ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size)
195         }
196         return size, err
197 }
198
199 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
200         ctx, cancel := context.WithCancel(ctx)
201         defer cancel()
202
203         pieceSize := BlockSize
204         if v.MaxGetBytes > 0 && v.MaxGetBytes < BlockSize {
205                 pieceSize = v.MaxGetBytes
206         }
207
208         pieces := 1
209         expectSize := len(buf)
210         if pieceSize < BlockSize {
211                 // Unfortunately the handler doesn't tell us how long the blob
212                 // is expected to be, so we have to ask Azure.
213                 props, err := v.container.GetBlobProperties(loc)
214                 if err != nil {
215                         return 0, v.translateError(err)
216                 }
217                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
218                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
219                 }
220                 expectSize = int(props.ContentLength)
221                 pieces = (expectSize + pieceSize - 1) / pieceSize
222         }
223
224         if expectSize == 0 {
225                 return 0, nil
226         }
227
228         // We'll update this actualSize if/when we get the last piece.
229         actualSize := -1
230         errors := make(chan error, pieces)
231         var wg sync.WaitGroup
232         wg.Add(pieces)
233         for p := 0; p < pieces; p++ {
234                 // Each goroutine retrieves one piece. If we hit an
235                 // error, it is sent to the errors chan so get() can
236                 // return it -- but only if the error happens before
237                 // ctx is done. This way, if ctx is done before we hit
238                 // any other error (e.g., requesting client has hung
239                 // up), we return the original ctx.Err() instead of
240                 // the secondary errors from the transfers that got
241                 // interrupted as a result.
242                 go func(p int) {
243                         defer wg.Done()
244                         startPos := p * pieceSize
245                         endPos := startPos + pieceSize
246                         if endPos > expectSize {
247                                 endPos = expectSize
248                         }
249                         var rdr io.ReadCloser
250                         var err error
251                         gotRdr := make(chan struct{})
252                         go func() {
253                                 defer close(gotRdr)
254                                 if startPos == 0 && endPos == expectSize {
255                                         rdr, err = v.container.GetBlob(loc)
256                                 } else {
257                                         rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
258                                 }
259                         }()
260                         select {
261                         case <-ctx.Done():
262                                 go func() {
263                                         <-gotRdr
264                                         if err == nil {
265                                                 rdr.Close()
266                                         }
267                                 }()
268                                 return
269                         case <-gotRdr:
270                         }
271                         if err != nil {
272                                 errors <- err
273                                 cancel()
274                                 return
275                         }
276                         go func() {
277                                 // Close the reader when the client
278                                 // hangs up or another piece fails
279                                 // (possibly interrupting ReadFull())
280                                 // or when all pieces succeed and
281                                 // get() returns.
282                                 <-ctx.Done()
283                                 rdr.Close()
284                         }()
285                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
286                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
287                                 // If we don't know the actual size,
288                                 // and just tried reading 64 MiB, it's
289                                 // normal to encounter EOF.
290                         } else if err != nil {
291                                 if ctx.Err() == nil {
292                                         errors <- err
293                                 }
294                                 cancel()
295                                 return
296                         }
297                         if p == pieces-1 {
298                                 actualSize = startPos + n
299                         }
300                 }(p)
301         }
302         wg.Wait()
303         close(errors)
304         if len(errors) > 0 {
305                 return 0, v.translateError(<-errors)
306         }
307         if ctx.Err() != nil {
308                 return 0, ctx.Err()
309         }
310         return actualSize, nil
311 }
312
313 // Compare the given data with existing stored data.
314 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
315         trashed, _, err := v.checkTrashed(loc)
316         if err != nil {
317                 return err
318         }
319         if trashed {
320                 return os.ErrNotExist
321         }
322         var rdr io.ReadCloser
323         gotRdr := make(chan struct{})
324         go func() {
325                 defer close(gotRdr)
326                 rdr, err = v.container.GetBlob(loc)
327         }()
328         select {
329         case <-ctx.Done():
330                 go func() {
331                         <-gotRdr
332                         if err == nil {
333                                 rdr.Close()
334                         }
335                 }()
336                 return ctx.Err()
337         case <-gotRdr:
338         }
339         if err != nil {
340                 return v.translateError(err)
341         }
342         defer rdr.Close()
343         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
344 }
345
346 // Put stores a Keep block as a block blob in the container.
347 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
348         if v.volume.ReadOnly {
349                 return MethodDisabledError
350         }
351         // Send the block data through a pipe, so that (if we need to)
352         // we can close the pipe early and abandon our
353         // CreateBlockBlobFromReader() goroutine, without worrying
354         // about CreateBlockBlobFromReader() accessing our block
355         // buffer after we release it.
356         bufr, bufw := io.Pipe()
357         go func() {
358                 io.Copy(bufw, bytes.NewReader(block))
359                 bufw.Close()
360         }()
361         errChan := make(chan error)
362         go func() {
363                 var body io.Reader = bufr
364                 if len(block) == 0 {
365                         // We must send a "Content-Length: 0" header,
366                         // but the http client interprets
367                         // ContentLength==0 as "unknown" unless it can
368                         // confirm by introspection that Body will
369                         // read 0 bytes.
370                         body = http.NoBody
371                         bufr.Close()
372                 }
373                 errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
374         }()
375         select {
376         case <-ctx.Done():
377                 ctxlog.FromContext(ctx).Debugf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
378                 // Our pipe might be stuck in Write(), waiting for
379                 // io.Copy() to read. If so, un-stick it. This means
380                 // CreateBlockBlobFromReader will get corrupt data,
381                 // but that's OK: the size won't match, so the write
382                 // will fail.
383                 go io.Copy(ioutil.Discard, bufr)
384                 // CloseWithError() will return once pending I/O is done.
385                 bufw.CloseWithError(ctx.Err())
386                 ctxlog.FromContext(ctx).Debugf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
387                 return ctx.Err()
388         case err := <-errChan:
389                 return err
390         }
391 }
392
393 // Touch updates the last-modified property of a block blob.
394 func (v *AzureBlobVolume) Touch(loc string) error {
395         if v.volume.ReadOnly {
396                 return MethodDisabledError
397         }
398         trashed, metadata, err := v.checkTrashed(loc)
399         if err != nil {
400                 return err
401         }
402         if trashed {
403                 return os.ErrNotExist
404         }
405
406         metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
407         return v.container.SetBlobMetadata(loc, metadata, nil)
408 }
409
410 // Mtime returns the last-modified property of a block blob.
411 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
412         trashed, _, err := v.checkTrashed(loc)
413         if err != nil {
414                 return time.Time{}, err
415         }
416         if trashed {
417                 return time.Time{}, os.ErrNotExist
418         }
419
420         props, err := v.container.GetBlobProperties(loc)
421         if err != nil {
422                 return time.Time{}, err
423         }
424         return time.Time(props.LastModified), nil
425 }
426
427 // IndexTo writes a list of Keep blocks that are stored in the
428 // container.
429 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
430         params := storage.ListBlobsParameters{
431                 Prefix:  prefix,
432                 Include: &storage.IncludeBlobDataset{Metadata: true},
433         }
434         for page := 1; ; page++ {
435                 resp, err := v.listBlobs(page, params)
436                 if err != nil {
437                         return err
438                 }
439                 for _, b := range resp.Blobs {
440                         if !v.isKeepBlock(b.Name) {
441                                 continue
442                         }
443                         modtime := time.Time(b.Properties.LastModified)
444                         if b.Properties.ContentLength == 0 && modtime.Add(v.WriteRaceInterval.Duration()).After(time.Now()) {
445                                 // A new zero-length blob is probably
446                                 // just a new non-empty blob that
447                                 // hasn't committed its data yet (see
448                                 // Get()), and in any case has no
449                                 // value.
450                                 continue
451                         }
452                         if b.Metadata["expires_at"] != "" {
453                                 // Trashed blob; exclude it from response
454                                 continue
455                         }
456                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
457                 }
458                 if resp.NextMarker == "" {
459                         return nil
460                 }
461                 params.Marker = resp.NextMarker
462         }
463 }
464
465 // call v.container.ListBlobs, retrying if needed.
466 func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
467         for i := 0; i < v.ListBlobsMaxAttempts; i++ {
468                 resp, err = v.container.ListBlobs(params)
469                 err = v.translateError(err)
470                 if err == VolumeBusyError {
471                         v.logger.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err)
472                         time.Sleep(time.Duration(v.ListBlobsRetryDelay))
473                         continue
474                 } else {
475                         break
476                 }
477         }
478         return
479 }
480
481 // Trash a Keep block.
482 func (v *AzureBlobVolume) Trash(loc string) error {
483         if v.volume.ReadOnly {
484                 return MethodDisabledError
485         }
486
487         // Ideally we would use If-Unmodified-Since, but that
488         // particular condition seems to be ignored by Azure. Instead,
489         // we get the Etag before checking Mtime, and use If-Match to
490         // ensure we don't delete data if Put() or Touch() happens
491         // between our calls to Mtime() and DeleteBlob().
492         props, err := v.container.GetBlobProperties(loc)
493         if err != nil {
494                 return err
495         }
496         if t, err := v.Mtime(loc); err != nil {
497                 return err
498         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
499                 return nil
500         }
501
502         // If BlobTrashLifetime == 0, just delete it
503         if v.cluster.Collections.BlobTrashLifetime == 0 {
504                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
505                         IfMatch: props.Etag,
506                 })
507         }
508
509         // Otherwise, mark as trash
510         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
511                 "expires_at": fmt.Sprintf("%d", time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix()),
512         }, &storage.SetBlobMetadataOptions{
513                 IfMatch: props.Etag,
514         })
515 }
516
517 // Untrash a Keep block.
518 // Delete the expires_at metadata attribute
519 func (v *AzureBlobVolume) Untrash(loc string) error {
520         // if expires_at does not exist, return NotFoundError
521         metadata, err := v.container.GetBlobMetadata(loc)
522         if err != nil {
523                 return v.translateError(err)
524         }
525         if metadata["expires_at"] == "" {
526                 return os.ErrNotExist
527         }
528
529         // reset expires_at metadata attribute
530         metadata["expires_at"] = ""
531         err = v.container.SetBlobMetadata(loc, metadata, nil)
532         return v.translateError(err)
533 }
534
535 // Status returns a VolumeStatus struct with placeholder data.
536 func (v *AzureBlobVolume) Status() *VolumeStatus {
537         return &VolumeStatus{
538                 DeviceNum: 1,
539                 BytesFree: BlockSize * 1000,
540                 BytesUsed: 1,
541         }
542 }
543
544 // String returns a volume label, including the container name.
545 func (v *AzureBlobVolume) String() string {
546         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
547 }
548
549 // If possible, translate an Azure SDK error to a recognizable error
550 // like os.ErrNotExist.
551 func (v *AzureBlobVolume) translateError(err error) error {
552         switch {
553         case err == nil:
554                 return err
555         case strings.Contains(err.Error(), "StatusCode=503"):
556                 // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
557                 return VolumeBusyError
558         case strings.Contains(err.Error(), "Not Found"):
559                 // "storage: service returned without a response body (404 Not Found)"
560                 return os.ErrNotExist
561         case strings.Contains(err.Error(), "ErrorCode=BlobNotFound"):
562                 // "storage: service returned error: StatusCode=404, ErrorCode=BlobNotFound, ErrorMessage=The specified blob does not exist.\n..."
563                 return os.ErrNotExist
564         default:
565                 return err
566         }
567 }
568
569 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
570
571 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
572         return keepBlockRegexp.MatchString(s)
573 }
574
575 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
576 // and deletes them from the volume.
577 func (v *AzureBlobVolume) EmptyTrash() {
578         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
579                 return
580         }
581
582         var bytesDeleted, bytesInTrash int64
583         var blocksDeleted, blocksInTrash int64
584
585         doBlob := func(b storage.Blob) {
586                 // Check whether the block is flagged as trash
587                 if b.Metadata["expires_at"] == "" {
588                         return
589                 }
590
591                 atomic.AddInt64(&blocksInTrash, 1)
592                 atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
593
594                 expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
595                 if err != nil {
596                         v.logger.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
597                         return
598                 }
599
600                 if expiresAt > time.Now().Unix() {
601                         return
602                 }
603
604                 err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
605                         IfMatch: b.Properties.Etag,
606                 })
607                 if err != nil {
608                         v.logger.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
609                         return
610                 }
611                 atomic.AddInt64(&blocksDeleted, 1)
612                 atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
613         }
614
615         var wg sync.WaitGroup
616         todo := make(chan storage.Blob, v.cluster.Collections.BlobDeleteConcurrency)
617         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
618                 wg.Add(1)
619                 go func() {
620                         defer wg.Done()
621                         for b := range todo {
622                                 doBlob(b)
623                         }
624                 }()
625         }
626
627         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
628         for page := 1; ; page++ {
629                 resp, err := v.listBlobs(page, params)
630                 if err != nil {
631                         v.logger.Printf("EmptyTrash: ListBlobs: %v", err)
632                         break
633                 }
634                 for _, b := range resp.Blobs {
635                         todo <- b
636                 }
637                 if resp.NextMarker == "" {
638                         break
639                 }
640                 params.Marker = resp.NextMarker
641         }
642         close(todo)
643         wg.Wait()
644
645         v.logger.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
646 }
647
648 // InternalStats returns bucket I/O and API call counters.
649 func (v *AzureBlobVolume) InternalStats() interface{} {
650         return &v.container.stats
651 }
652
653 type azureBlobStats struct {
654         statsTicker
655         Ops              uint64
656         GetOps           uint64
657         GetRangeOps      uint64
658         GetMetadataOps   uint64
659         GetPropertiesOps uint64
660         CreateOps        uint64
661         SetMetadataOps   uint64
662         DelOps           uint64
663         ListOps          uint64
664 }
665
666 func (s *azureBlobStats) TickErr(err error) {
667         if err == nil {
668                 return
669         }
670         errType := fmt.Sprintf("%T", err)
671         if err, ok := err.(storage.AzureStorageServiceError); ok {
672                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
673         }
674         s.statsTicker.TickErr(err, errType)
675 }
676
677 // azureContainer wraps storage.Container in order to count I/O and
678 // API usage stats.
679 type azureContainer struct {
680         ctr   *storage.Container
681         stats azureBlobStats
682 }
683
684 func (c *azureContainer) Exists() (bool, error) {
685         c.stats.TickOps("exists")
686         c.stats.Tick(&c.stats.Ops)
687         ok, err := c.ctr.Exists()
688         c.stats.TickErr(err)
689         return ok, err
690 }
691
692 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
693         c.stats.TickOps("get_metadata")
694         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
695         b := c.ctr.GetBlobReference(bname)
696         err := b.GetMetadata(nil)
697         c.stats.TickErr(err)
698         return b.Metadata, err
699 }
700
701 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
702         c.stats.TickOps("get_properties")
703         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
704         b := c.ctr.GetBlobReference(bname)
705         err := b.GetProperties(nil)
706         c.stats.TickErr(err)
707         return &b.Properties, err
708 }
709
710 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
711         c.stats.TickOps("get")
712         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
713         b := c.ctr.GetBlobReference(bname)
714         rdr, err := b.Get(nil)
715         c.stats.TickErr(err)
716         return NewCountingReader(rdr, c.stats.TickInBytes), err
717 }
718
719 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
720         c.stats.TickOps("get_range")
721         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
722         b := c.ctr.GetBlobReference(bname)
723         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
724                 Range: &storage.BlobRange{
725                         Start: uint64(start),
726                         End:   uint64(end),
727                 },
728                 GetBlobOptions: opts,
729         })
730         c.stats.TickErr(err)
731         return NewCountingReader(rdr, c.stats.TickInBytes), err
732 }
733
734 // If we give it an io.Reader that doesn't also have a Len() int
735 // method, the Azure SDK determines data size by copying the data into
736 // a new buffer, which is not a good use of memory.
737 type readerWithAzureLen struct {
738         io.Reader
739         len int
740 }
741
742 // Len satisfies the private lener interface in azure-sdk-for-go.
743 func (r *readerWithAzureLen) Len() int {
744         return r.len
745 }
746
747 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
748         c.stats.TickOps("create")
749         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
750         if size != 0 {
751                 rdr = &readerWithAzureLen{
752                         Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
753                         len:    size,
754                 }
755         }
756         b := c.ctr.GetBlobReference(bname)
757         err := b.CreateBlockBlobFromReader(rdr, opts)
758         c.stats.TickErr(err)
759         return err
760 }
761
762 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
763         c.stats.TickOps("set_metadata")
764         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
765         b := c.ctr.GetBlobReference(bname)
766         b.Metadata = m
767         err := b.SetMetadata(opts)
768         c.stats.TickErr(err)
769         return err
770 }
771
772 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
773         c.stats.TickOps("list")
774         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
775         resp, err := c.ctr.ListBlobs(params)
776         c.stats.TickErr(err)
777         return resp, err
778 }
779
780 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
781         c.stats.TickOps("delete")
782         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
783         b := c.ctr.GetBlobReference(bname)
784         err := b.Delete(opts)
785         c.stats.TickErr(err)
786         return err
787 }