10877: Configurable base URL for Azure storage, e.g., StorageBaseURL: core.usgovcloud...
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         "os"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         log "github.com/Sirupsen/logrus"
21         "github.com/curoverse/azure-sdk-for-go/storage"
22 )
23
24 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
25
26 var (
27         azureMaxGetBytes           int
28         azureStorageAccountName    string
29         azureStorageAccountKeyFile string
30         azureStorageReplication    int
31         azureWriteRaceInterval     = 15 * time.Second
32         azureWriteRacePollTime     = time.Second
33 )
34
35 func readKeyFromFile(file string) (string, error) {
36         buf, err := ioutil.ReadFile(file)
37         if err != nil {
38                 return "", errors.New("reading key from " + file + ": " + err.Error())
39         }
40         accountKey := strings.TrimSpace(string(buf))
41         if accountKey == "" {
42                 return "", errors.New("empty account key in " + file)
43         }
44         return accountKey, nil
45 }
46
47 type azureVolumeAdder struct {
48         *Config
49 }
50
51 // String implements flag.Value
52 func (s *azureVolumeAdder) String() string {
53         return "-"
54 }
55
56 func (s *azureVolumeAdder) Set(containerName string) error {
57         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
58                 ContainerName:         containerName,
59                 StorageAccountName:    azureStorageAccountName,
60                 StorageAccountKeyFile: azureStorageAccountKeyFile,
61                 AzureReplication:      azureStorageReplication,
62                 ReadOnly:              deprecated.flagReadonly,
63         })
64         return nil
65 }
66
67 func init() {
68         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
69
70         flag.Var(&azureVolumeAdder{theConfig},
71                 "azure-storage-container-volume",
72                 "Use the given container as a storage volume. Can be given multiple times.")
73         flag.StringVar(
74                 &azureStorageAccountName,
75                 "azure-storage-account-name",
76                 "",
77                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
78         flag.StringVar(
79                 &azureStorageAccountKeyFile,
80                 "azure-storage-account-key-file",
81                 "",
82                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
83         flag.IntVar(
84                 &azureStorageReplication,
85                 "azure-storage-replication",
86                 3,
87                 "Replication level to report to clients when data is stored in an Azure container.")
88         flag.IntVar(
89                 &azureMaxGetBytes,
90                 "azure-max-get-bytes",
91                 BlockSize,
92                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
93 }
94
95 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
96 // container.
97 type AzureBlobVolume struct {
98         StorageAccountName    string
99         StorageAccountKeyFile string
100         StorageBaseURL        string // "" means default, "core.windows.net"
101         ContainerName         string
102         AzureReplication      int
103         ReadOnly              bool
104         RequestTimeout        arvados.Duration
105
106         azClient storage.Client
107         bsClient *azureBlobClient
108 }
109
110 // Examples implements VolumeWithExamples.
111 func (*AzureBlobVolume) Examples() []Volume {
112         return []Volume{
113                 &AzureBlobVolume{
114                         StorageAccountName:    "example-account-name",
115                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
116                         ContainerName:         "example-container-name",
117                         AzureReplication:      3,
118                         RequestTimeout:        azureDefaultRequestTimeout,
119                 },
120         }
121 }
122
123 // Type implements Volume.
124 func (v *AzureBlobVolume) Type() string {
125         return "Azure"
126 }
127
128 // Start implements Volume.
129 func (v *AzureBlobVolume) Start() error {
130         if v.ContainerName == "" {
131                 return errors.New("no container name given")
132         }
133         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
134                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
135         }
136         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
137         if err != nil {
138                 return err
139         }
140         if v.StorageBaseURL == "" {
141                 v.StorageBaseURL = storage.DefaultBaseURL
142         }
143         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
144         if err != nil {
145                 return fmt.Errorf("creating Azure storage client: %s", err)
146         }
147
148         if v.RequestTimeout == 0 {
149                 v.RequestTimeout = azureDefaultRequestTimeout
150         }
151         v.azClient.HTTPClient = &http.Client{
152                 Timeout: time.Duration(v.RequestTimeout),
153         }
154         bs := v.azClient.GetBlobService()
155         v.bsClient = &azureBlobClient{
156                 client: &bs,
157         }
158
159         ok, err := v.bsClient.ContainerExists(v.ContainerName)
160         if err != nil {
161                 return err
162         }
163         if !ok {
164                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
165         }
166         return nil
167 }
168
169 // Return true if expires_at metadata attribute is found on the block
170 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
171         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
172         if err != nil {
173                 return false, metadata, v.translateError(err)
174         }
175         if metadata["expires_at"] != "" {
176                 return true, metadata, nil
177         }
178         return false, metadata, nil
179 }
180
181 // Get reads a Keep block that has been stored as a block blob in the
182 // container.
183 //
184 // If the block is younger than azureWriteRaceInterval and is
185 // unexpectedly empty, assume a PutBlob operation is in progress, and
186 // wait for it to finish writing.
187 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
188         trashed, _, err := v.checkTrashed(loc)
189         if err != nil {
190                 return 0, err
191         }
192         if trashed {
193                 return 0, os.ErrNotExist
194         }
195         var deadline time.Time
196         haveDeadline := false
197         size, err := v.get(ctx, loc, buf)
198         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
199                 // Seeing a brand new empty block probably means we're
200                 // in a race with CreateBlob, which under the hood
201                 // (apparently) does "CreateEmpty" and "CommitData"
202                 // with no additional transaction locking.
203                 if !haveDeadline {
204                         t, err := v.Mtime(loc)
205                         if err != nil {
206                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
207                                 break
208                         }
209                         deadline = t.Add(azureWriteRaceInterval)
210                         if time.Now().After(deadline) {
211                                 break
212                         }
213                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
214                         haveDeadline = true
215                 } else if time.Now().After(deadline) {
216                         break
217                 }
218                 select {
219                 case <-ctx.Done():
220                         return 0, ctx.Err()
221                 case <-time.After(azureWriteRacePollTime):
222                 }
223                 size, err = v.get(ctx, loc, buf)
224         }
225         if haveDeadline {
226                 log.Printf("Race ended with size==%d", size)
227         }
228         return size, err
229 }
230
231 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
232         ctx, cancel := context.WithCancel(ctx)
233         defer cancel()
234         expectSize := len(buf)
235         if azureMaxGetBytes < BlockSize {
236                 // Unfortunately the handler doesn't tell us how long the blob
237                 // is expected to be, so we have to ask Azure.
238                 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
239                 if err != nil {
240                         return 0, v.translateError(err)
241                 }
242                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
243                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
244                 }
245                 expectSize = int(props.ContentLength)
246         }
247
248         if expectSize == 0 {
249                 return 0, nil
250         }
251
252         // We'll update this actualSize if/when we get the last piece.
253         actualSize := -1
254         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
255         errors := make(chan error, pieces)
256         var wg sync.WaitGroup
257         wg.Add(pieces)
258         for p := 0; p < pieces; p++ {
259                 // Each goroutine retrieves one piece. If we hit an
260                 // error, it is sent to the errors chan so get() can
261                 // return it -- but only if the error happens before
262                 // ctx is done. This way, if ctx is done before we hit
263                 // any other error (e.g., requesting client has hung
264                 // up), we return the original ctx.Err() instead of
265                 // the secondary errors from the transfers that got
266                 // interrupted as a result.
267                 go func(p int) {
268                         defer wg.Done()
269                         startPos := p * azureMaxGetBytes
270                         endPos := startPos + azureMaxGetBytes
271                         if endPos > expectSize {
272                                 endPos = expectSize
273                         }
274                         var rdr io.ReadCloser
275                         var err error
276                         gotRdr := make(chan struct{})
277                         go func() {
278                                 defer close(gotRdr)
279                                 if startPos == 0 && endPos == expectSize {
280                                         rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
281                                 } else {
282                                         rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
283                                 }
284                         }()
285                         select {
286                         case <-ctx.Done():
287                                 go func() {
288                                         <-gotRdr
289                                         if err == nil {
290                                                 rdr.Close()
291                                         }
292                                 }()
293                                 return
294                         case <-gotRdr:
295                         }
296                         if err != nil {
297                                 errors <- err
298                                 cancel()
299                                 return
300                         }
301                         go func() {
302                                 // Close the reader when the client
303                                 // hangs up or another piece fails
304                                 // (possibly interrupting ReadFull())
305                                 // or when all pieces succeed and
306                                 // get() returns.
307                                 <-ctx.Done()
308                                 rdr.Close()
309                         }()
310                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
311                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
312                                 // If we don't know the actual size,
313                                 // and just tried reading 64 MiB, it's
314                                 // normal to encounter EOF.
315                         } else if err != nil {
316                                 if ctx.Err() == nil {
317                                         errors <- err
318                                 }
319                                 cancel()
320                                 return
321                         }
322                         if p == pieces-1 {
323                                 actualSize = startPos + n
324                         }
325                 }(p)
326         }
327         wg.Wait()
328         close(errors)
329         if len(errors) > 0 {
330                 return 0, v.translateError(<-errors)
331         }
332         if ctx.Err() != nil {
333                 return 0, ctx.Err()
334         }
335         return actualSize, nil
336 }
337
338 // Compare the given data with existing stored data.
339 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
340         trashed, _, err := v.checkTrashed(loc)
341         if err != nil {
342                 return err
343         }
344         if trashed {
345                 return os.ErrNotExist
346         }
347         var rdr io.ReadCloser
348         gotRdr := make(chan struct{})
349         go func() {
350                 defer close(gotRdr)
351                 rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
352         }()
353         select {
354         case <-ctx.Done():
355                 go func() {
356                         <-gotRdr
357                         if err == nil {
358                                 rdr.Close()
359                         }
360                 }()
361                 return ctx.Err()
362         case <-gotRdr:
363         }
364         if err != nil {
365                 return v.translateError(err)
366         }
367         defer rdr.Close()
368         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
369 }
370
371 // Put stores a Keep block as a block blob in the container.
372 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
373         if v.ReadOnly {
374                 return MethodDisabledError
375         }
376         // Send the block data through a pipe, so that (if we need to)
377         // we can close the pipe early and abandon our
378         // CreateBlockBlobFromReader() goroutine, without worrying
379         // about CreateBlockBlobFromReader() accessing our block
380         // buffer after we release it.
381         bufr, bufw := io.Pipe()
382         go func() {
383                 io.Copy(bufw, bytes.NewReader(block))
384                 bufw.Close()
385         }()
386         errChan := make(chan error)
387         go func() {
388                 errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bufr, nil)
389         }()
390         select {
391         case <-ctx.Done():
392                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
393                 // Our pipe might be stuck in Write(), waiting for
394                 // io.Copy() to read. If so, un-stick it. This means
395                 // CreateBlockBlobFromReader will get corrupt data,
396                 // but that's OK: the size won't match, so the write
397                 // will fail.
398                 go io.Copy(ioutil.Discard, bufr)
399                 // CloseWithError() will return once pending I/O is done.
400                 bufw.CloseWithError(ctx.Err())
401                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
402                 return ctx.Err()
403         case err := <-errChan:
404                 return err
405         }
406 }
407
408 // Touch updates the last-modified property of a block blob.
409 func (v *AzureBlobVolume) Touch(loc string) error {
410         if v.ReadOnly {
411                 return MethodDisabledError
412         }
413         trashed, metadata, err := v.checkTrashed(loc)
414         if err != nil {
415                 return err
416         }
417         if trashed {
418                 return os.ErrNotExist
419         }
420
421         metadata["touch"] = fmt.Sprintf("%d", time.Now())
422         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
423 }
424
425 // Mtime returns the last-modified property of a block blob.
426 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
427         trashed, _, err := v.checkTrashed(loc)
428         if err != nil {
429                 return time.Time{}, err
430         }
431         if trashed {
432                 return time.Time{}, os.ErrNotExist
433         }
434
435         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
436         if err != nil {
437                 return time.Time{}, err
438         }
439         return time.Parse(time.RFC1123, props.LastModified)
440 }
441
442 // IndexTo writes a list of Keep blocks that are stored in the
443 // container.
444 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
445         params := storage.ListBlobsParameters{
446                 Prefix:  prefix,
447                 Include: "metadata",
448         }
449         for {
450                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
451                 if err != nil {
452                         return err
453                 }
454                 for _, b := range resp.Blobs {
455                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
456                         if err != nil {
457                                 return err
458                         }
459                         if !v.isKeepBlock(b.Name) {
460                                 continue
461                         }
462                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
463                                 // A new zero-length blob is probably
464                                 // just a new non-empty blob that
465                                 // hasn't committed its data yet (see
466                                 // Get()), and in any case has no
467                                 // value.
468                                 continue
469                         }
470                         if b.Metadata["expires_at"] != "" {
471                                 // Trashed blob; exclude it from response
472                                 continue
473                         }
474                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
475                 }
476                 if resp.NextMarker == "" {
477                         return nil
478                 }
479                 params.Marker = resp.NextMarker
480         }
481 }
482
483 // Trash a Keep block.
484 func (v *AzureBlobVolume) Trash(loc string) error {
485         if v.ReadOnly {
486                 return MethodDisabledError
487         }
488
489         // Ideally we would use If-Unmodified-Since, but that
490         // particular condition seems to be ignored by Azure. Instead,
491         // we get the Etag before checking Mtime, and use If-Match to
492         // ensure we don't delete data if Put() or Touch() happens
493         // between our calls to Mtime() and DeleteBlob().
494         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
495         if err != nil {
496                 return err
497         }
498         if t, err := v.Mtime(loc); err != nil {
499                 return err
500         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
501                 return nil
502         }
503
504         // If TrashLifetime == 0, just delete it
505         if theConfig.TrashLifetime == 0 {
506                 return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
507                         "If-Match": props.Etag,
508                 })
509         }
510
511         // Otherwise, mark as trash
512         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
513                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
514         }, map[string]string{
515                 "If-Match": props.Etag,
516         })
517 }
518
519 // Untrash a Keep block.
520 // Delete the expires_at metadata attribute
521 func (v *AzureBlobVolume) Untrash(loc string) error {
522         // if expires_at does not exist, return NotFoundError
523         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
524         if err != nil {
525                 return v.translateError(err)
526         }
527         if metadata["expires_at"] == "" {
528                 return os.ErrNotExist
529         }
530
531         // reset expires_at metadata attribute
532         metadata["expires_at"] = ""
533         err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
534         return v.translateError(err)
535 }
536
537 // Status returns a VolumeStatus struct with placeholder data.
538 func (v *AzureBlobVolume) Status() *VolumeStatus {
539         return &VolumeStatus{
540                 DeviceNum: 1,
541                 BytesFree: BlockSize * 1000,
542                 BytesUsed: 1,
543         }
544 }
545
546 // String returns a volume label, including the container name.
547 func (v *AzureBlobVolume) String() string {
548         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
549 }
550
551 // Writable returns true, unless the -readonly flag was on when the
552 // volume was added.
553 func (v *AzureBlobVolume) Writable() bool {
554         return !v.ReadOnly
555 }
556
557 // Replication returns the replication level of the container, as
558 // specified by the -azure-storage-replication argument.
559 func (v *AzureBlobVolume) Replication() int {
560         return v.AzureReplication
561 }
562
563 // If possible, translate an Azure SDK error to a recognizable error
564 // like os.ErrNotExist.
565 func (v *AzureBlobVolume) translateError(err error) error {
566         switch {
567         case err == nil:
568                 return err
569         case strings.Contains(err.Error(), "Not Found"):
570                 // "storage: service returned without a response body (404 Not Found)"
571                 return os.ErrNotExist
572         default:
573                 return err
574         }
575 }
576
577 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
578
579 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
580         return keepBlockRegexp.MatchString(s)
581 }
582
583 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
584 // and deletes them from the volume.
585 func (v *AzureBlobVolume) EmptyTrash() {
586         var bytesDeleted, bytesInTrash int64
587         var blocksDeleted, blocksInTrash int
588         params := storage.ListBlobsParameters{Include: "metadata"}
589
590         for {
591                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
592                 if err != nil {
593                         log.Printf("EmptyTrash: ListBlobs: %v", err)
594                         break
595                 }
596                 for _, b := range resp.Blobs {
597                         // Check if the block is expired
598                         if b.Metadata["expires_at"] == "" {
599                                 continue
600                         }
601
602                         blocksInTrash++
603                         bytesInTrash += b.Properties.ContentLength
604
605                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
606                         if err != nil {
607                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
608                                 continue
609                         }
610
611                         if expiresAt > time.Now().Unix() {
612                                 continue
613                         }
614
615                         err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
616                                 "If-Match": b.Properties.Etag,
617                         })
618                         if err != nil {
619                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
620                                 continue
621                         }
622                         blocksDeleted++
623                         bytesDeleted += b.Properties.ContentLength
624                 }
625                 if resp.NextMarker == "" {
626                         break
627                 }
628                 params.Marker = resp.NextMarker
629         }
630
631         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
632 }
633
634 // InternalStats returns bucket I/O and API call counters.
635 func (v *AzureBlobVolume) InternalStats() interface{} {
636         return &v.bsClient.stats
637 }
638
639 type azureBlobStats struct {
640         statsTicker
641         Ops              uint64
642         GetOps           uint64
643         GetRangeOps      uint64
644         GetMetadataOps   uint64
645         GetPropertiesOps uint64
646         CreateOps        uint64
647         SetMetadataOps   uint64
648         DelOps           uint64
649         ListOps          uint64
650 }
651
652 func (s *azureBlobStats) TickErr(err error) {
653         if err == nil {
654                 return
655         }
656         errType := fmt.Sprintf("%T", err)
657         if err, ok := err.(storage.AzureStorageServiceError); ok {
658                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
659         }
660         log.Printf("errType %T, err %s", err, err)
661         s.statsTicker.TickErr(err, errType)
662 }
663
664 // azureBlobClient wraps storage.BlobStorageClient in order to count
665 // I/O and API usage stats.
666 type azureBlobClient struct {
667         client *storage.BlobStorageClient
668         stats  azureBlobStats
669 }
670
671 func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
672         c.stats.Tick(&c.stats.Ops)
673         ok, err := c.client.ContainerExists(cname)
674         c.stats.TickErr(err)
675         return ok, err
676 }
677
678 func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
679         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
680         m, err := c.client.GetBlobMetadata(cname, bname)
681         c.stats.TickErr(err)
682         return m, err
683 }
684
685 func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
686         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
687         p, err := c.client.GetBlobProperties(cname, bname)
688         c.stats.TickErr(err)
689         return p, err
690 }
691
692 func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
693         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
694         rdr, err := c.client.GetBlob(cname, bname)
695         c.stats.TickErr(err)
696         return NewCountingReader(rdr, c.stats.TickInBytes), err
697 }
698
699 func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
700         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
701         rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
702         c.stats.TickErr(err)
703         return NewCountingReader(rdr, c.stats.TickInBytes), err
704 }
705
706 func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
707         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
708         rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
709         err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
710         c.stats.TickErr(err)
711         return err
712 }
713
714 func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
715         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
716         err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
717         c.stats.TickErr(err)
718         return err
719 }
720
721 func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
722         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
723         resp, err := c.client.ListBlobs(cname, params)
724         c.stats.TickErr(err)
725         return resp, err
726 }
727
728 func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
729         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
730         err := c.client.DeleteBlob(cname, bname, hdrs)
731         c.stats.TickErr(err)
732         return err
733 }