refs #10516
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         "os"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         log "github.com/Sirupsen/logrus"
21         "github.com/curoverse/azure-sdk-for-go/storage"
22 )
23
24 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
25
26 var (
27         azureMaxGetBytes           int
28         azureStorageAccountName    string
29         azureStorageAccountKeyFile string
30         azureStorageReplication    int
31         azureWriteRaceInterval     = 15 * time.Second
32         azureWriteRacePollTime     = time.Second
33 )
34
35 func readKeyFromFile(file string) (string, error) {
36         buf, err := ioutil.ReadFile(file)
37         if err != nil {
38                 return "", errors.New("reading key from " + file + ": " + err.Error())
39         }
40         accountKey := strings.TrimSpace(string(buf))
41         if accountKey == "" {
42                 return "", errors.New("empty account key in " + file)
43         }
44         return accountKey, nil
45 }
46
47 type azureVolumeAdder struct {
48         *Config
49 }
50
51 // String implements flag.Value
52 func (s *azureVolumeAdder) String() string {
53         return "-"
54 }
55
56 func (s *azureVolumeAdder) Set(containerName string) error {
57         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
58                 ContainerName:         containerName,
59                 StorageAccountName:    azureStorageAccountName,
60                 StorageAccountKeyFile: azureStorageAccountKeyFile,
61                 AzureReplication:      azureStorageReplication,
62                 ReadOnly:              deprecated.flagReadonly,
63         })
64         return nil
65 }
66
67 func init() {
68         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
69
70         flag.Var(&azureVolumeAdder{theConfig},
71                 "azure-storage-container-volume",
72                 "Use the given container as a storage volume. Can be given multiple times.")
73         flag.StringVar(
74                 &azureStorageAccountName,
75                 "azure-storage-account-name",
76                 "",
77                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
78         flag.StringVar(
79                 &azureStorageAccountKeyFile,
80                 "azure-storage-account-key-file",
81                 "",
82                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
83         flag.IntVar(
84                 &azureStorageReplication,
85                 "azure-storage-replication",
86                 3,
87                 "Replication level to report to clients when data is stored in an Azure container.")
88         flag.IntVar(
89                 &azureMaxGetBytes,
90                 "azure-max-get-bytes",
91                 BlockSize,
92                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
93 }
94
95 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
96 // container.
97 type AzureBlobVolume struct {
98         StorageAccountName    string
99         StorageAccountKeyFile string
100         ContainerName         string
101         AzureReplication      int
102         ReadOnly              bool
103         RequestTimeout        arvados.Duration
104
105         azClient storage.Client
106         bsClient *azureBlobClient
107 }
108
109 // Examples implements VolumeWithExamples.
110 func (*AzureBlobVolume) Examples() []Volume {
111         return []Volume{
112                 &AzureBlobVolume{
113                         StorageAccountName:    "example-account-name",
114                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
115                         ContainerName:         "example-container-name",
116                         AzureReplication:      3,
117                         RequestTimeout:        azureDefaultRequestTimeout,
118                 },
119         }
120 }
121
122 // Type implements Volume.
123 func (v *AzureBlobVolume) Type() string {
124         return "Azure"
125 }
126
127 // Start implements Volume.
128 func (v *AzureBlobVolume) Start() error {
129         if v.ContainerName == "" {
130                 return errors.New("no container name given")
131         }
132         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
133                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
134         }
135         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
136         if err != nil {
137                 return err
138         }
139         v.azClient, err = storage.NewBasicClient(v.StorageAccountName, accountKey)
140         if err != nil {
141                 return fmt.Errorf("creating Azure storage client: %s", err)
142         }
143
144         if v.RequestTimeout == 0 {
145                 v.RequestTimeout = azureDefaultRequestTimeout
146         }
147         v.azClient.HTTPClient = &http.Client{
148                 Timeout: time.Duration(v.RequestTimeout),
149         }
150         bs := v.azClient.GetBlobService()
151         v.bsClient = &azureBlobClient{
152                 client: &bs,
153         }
154
155         ok, err := v.bsClient.ContainerExists(v.ContainerName)
156         if err != nil {
157                 return err
158         }
159         if !ok {
160                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
161         }
162         return nil
163 }
164
165 // Return true if expires_at metadata attribute is found on the block
166 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
167         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
168         if err != nil {
169                 return false, metadata, v.translateError(err)
170         }
171         if metadata["expires_at"] != "" {
172                 return true, metadata, nil
173         }
174         return false, metadata, nil
175 }
176
177 // Get reads a Keep block that has been stored as a block blob in the
178 // container.
179 //
180 // If the block is younger than azureWriteRaceInterval and is
181 // unexpectedly empty, assume a PutBlob operation is in progress, and
182 // wait for it to finish writing.
183 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
184         trashed, _, err := v.checkTrashed(loc)
185         if err != nil {
186                 return 0, err
187         }
188         if trashed {
189                 return 0, os.ErrNotExist
190         }
191         var deadline time.Time
192         haveDeadline := false
193         size, err := v.get(ctx, loc, buf)
194         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
195                 // Seeing a brand new empty block probably means we're
196                 // in a race with CreateBlob, which under the hood
197                 // (apparently) does "CreateEmpty" and "CommitData"
198                 // with no additional transaction locking.
199                 if !haveDeadline {
200                         t, err := v.Mtime(loc)
201                         if err != nil {
202                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
203                                 break
204                         }
205                         deadline = t.Add(azureWriteRaceInterval)
206                         if time.Now().After(deadline) {
207                                 break
208                         }
209                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
210                         haveDeadline = true
211                 } else if time.Now().After(deadline) {
212                         break
213                 }
214                 select {
215                 case <-ctx.Done():
216                         return 0, ctx.Err()
217                 case <-time.After(azureWriteRacePollTime):
218                 }
219                 size, err = v.get(ctx, loc, buf)
220         }
221         if haveDeadline {
222                 log.Printf("Race ended with size==%d", size)
223         }
224         return size, err
225 }
226
227 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
228         ctx, cancel := context.WithCancel(ctx)
229         defer cancel()
230         expectSize := len(buf)
231         if azureMaxGetBytes < BlockSize {
232                 // Unfortunately the handler doesn't tell us how long the blob
233                 // is expected to be, so we have to ask Azure.
234                 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
235                 if err != nil {
236                         return 0, v.translateError(err)
237                 }
238                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
239                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
240                 }
241                 expectSize = int(props.ContentLength)
242         }
243
244         if expectSize == 0 {
245                 return 0, nil
246         }
247
248         // We'll update this actualSize if/when we get the last piece.
249         actualSize := -1
250         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
251         errors := make(chan error, pieces)
252         var wg sync.WaitGroup
253         wg.Add(pieces)
254         for p := 0; p < pieces; p++ {
255                 // Each goroutine retrieves one piece. If we hit an
256                 // error, it is sent to the errors chan so get() can
257                 // return it -- but only if the error happens before
258                 // ctx is done. This way, if ctx is done before we hit
259                 // any other error (e.g., requesting client has hung
260                 // up), we return the original ctx.Err() instead of
261                 // the secondary errors from the transfers that got
262                 // interrupted as a result.
263                 go func(p int) {
264                         defer wg.Done()
265                         startPos := p * azureMaxGetBytes
266                         endPos := startPos + azureMaxGetBytes
267                         if endPos > expectSize {
268                                 endPos = expectSize
269                         }
270                         var rdr io.ReadCloser
271                         var err error
272                         gotRdr := make(chan struct{})
273                         go func() {
274                                 defer close(gotRdr)
275                                 if startPos == 0 && endPos == expectSize {
276                                         rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
277                                 } else {
278                                         rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
279                                 }
280                         }()
281                         select {
282                         case <-ctx.Done():
283                                 go func() {
284                                         <-gotRdr
285                                         if err == nil {
286                                                 rdr.Close()
287                                         }
288                                 }()
289                                 return
290                         case <-gotRdr:
291                         }
292                         if err != nil {
293                                 errors <- err
294                                 cancel()
295                                 return
296                         }
297                         go func() {
298                                 // Close the reader when the client
299                                 // hangs up or another piece fails
300                                 // (possibly interrupting ReadFull())
301                                 // or when all pieces succeed and
302                                 // get() returns.
303                                 <-ctx.Done()
304                                 rdr.Close()
305                         }()
306                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
307                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
308                                 // If we don't know the actual size,
309                                 // and just tried reading 64 MiB, it's
310                                 // normal to encounter EOF.
311                         } else if err != nil {
312                                 if ctx.Err() == nil {
313                                         errors <- err
314                                 }
315                                 cancel()
316                                 return
317                         }
318                         if p == pieces-1 {
319                                 actualSize = startPos + n
320                         }
321                 }(p)
322         }
323         wg.Wait()
324         close(errors)
325         if len(errors) > 0 {
326                 return 0, v.translateError(<-errors)
327         }
328         if ctx.Err() != nil {
329                 return 0, ctx.Err()
330         }
331         return actualSize, nil
332 }
333
334 // Compare the given data with existing stored data.
335 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
336         trashed, _, err := v.checkTrashed(loc)
337         if err != nil {
338                 return err
339         }
340         if trashed {
341                 return os.ErrNotExist
342         }
343         var rdr io.ReadCloser
344         gotRdr := make(chan struct{})
345         go func() {
346                 defer close(gotRdr)
347                 rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
348         }()
349         select {
350         case <-ctx.Done():
351                 go func() {
352                         <-gotRdr
353                         if err == nil {
354                                 rdr.Close()
355                         }
356                 }()
357                 return ctx.Err()
358         case <-gotRdr:
359         }
360         if err != nil {
361                 return v.translateError(err)
362         }
363         defer rdr.Close()
364         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
365 }
366
367 // Put stores a Keep block as a block blob in the container.
368 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
369         if v.ReadOnly {
370                 return MethodDisabledError
371         }
372         // Send the block data through a pipe, so that (if we need to)
373         // we can close the pipe early and abandon our
374         // CreateBlockBlobFromReader() goroutine, without worrying
375         // about CreateBlockBlobFromReader() accessing our block
376         // buffer after we release it.
377         bufr, bufw := io.Pipe()
378         go func() {
379                 io.Copy(bufw, bytes.NewReader(block))
380                 bufw.Close()
381         }()
382         errChan := make(chan error)
383         go func() {
384                 errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bufr, nil)
385         }()
386         select {
387         case <-ctx.Done():
388                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
389                 // Our pipe might be stuck in Write(), waiting for
390                 // io.Copy() to read. If so, un-stick it. This means
391                 // CreateBlockBlobFromReader will get corrupt data,
392                 // but that's OK: the size won't match, so the write
393                 // will fail.
394                 go io.Copy(ioutil.Discard, bufr)
395                 // CloseWithError() will return once pending I/O is done.
396                 bufw.CloseWithError(ctx.Err())
397                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
398                 return ctx.Err()
399         case err := <-errChan:
400                 return err
401         }
402 }
403
404 // Touch updates the last-modified property of a block blob.
405 func (v *AzureBlobVolume) Touch(loc string) error {
406         if v.ReadOnly {
407                 return MethodDisabledError
408         }
409         trashed, metadata, err := v.checkTrashed(loc)
410         if err != nil {
411                 return err
412         }
413         if trashed {
414                 return os.ErrNotExist
415         }
416
417         metadata["touch"] = fmt.Sprintf("%d", time.Now())
418         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
419 }
420
421 // Mtime returns the last-modified property of a block blob.
422 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
423         trashed, _, err := v.checkTrashed(loc)
424         if err != nil {
425                 return time.Time{}, err
426         }
427         if trashed {
428                 return time.Time{}, os.ErrNotExist
429         }
430
431         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
432         if err != nil {
433                 return time.Time{}, err
434         }
435         return time.Parse(time.RFC1123, props.LastModified)
436 }
437
438 // IndexTo writes a list of Keep blocks that are stored in the
439 // container.
440 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
441         params := storage.ListBlobsParameters{
442                 Prefix:  prefix,
443                 Include: "metadata",
444         }
445         for {
446                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
447                 if err != nil {
448                         return err
449                 }
450                 for _, b := range resp.Blobs {
451                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
452                         if err != nil {
453                                 return err
454                         }
455                         if !v.isKeepBlock(b.Name) {
456                                 continue
457                         }
458                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
459                                 // A new zero-length blob is probably
460                                 // just a new non-empty blob that
461                                 // hasn't committed its data yet (see
462                                 // Get()), and in any case has no
463                                 // value.
464                                 continue
465                         }
466                         if b.Metadata["expires_at"] != "" {
467                                 // Trashed blob; exclude it from response
468                                 continue
469                         }
470                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
471                 }
472                 if resp.NextMarker == "" {
473                         return nil
474                 }
475                 params.Marker = resp.NextMarker
476         }
477 }
478
479 // Trash a Keep block.
480 func (v *AzureBlobVolume) Trash(loc string) error {
481         if v.ReadOnly {
482                 return MethodDisabledError
483         }
484
485         // Ideally we would use If-Unmodified-Since, but that
486         // particular condition seems to be ignored by Azure. Instead,
487         // we get the Etag before checking Mtime, and use If-Match to
488         // ensure we don't delete data if Put() or Touch() happens
489         // between our calls to Mtime() and DeleteBlob().
490         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
491         if err != nil {
492                 return err
493         }
494         if t, err := v.Mtime(loc); err != nil {
495                 return err
496         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
497                 return nil
498         }
499
500         // If TrashLifetime == 0, just delete it
501         if theConfig.TrashLifetime == 0 {
502                 return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
503                         "If-Match": props.Etag,
504                 })
505         }
506
507         // Otherwise, mark as trash
508         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
509                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
510         }, map[string]string{
511                 "If-Match": props.Etag,
512         })
513 }
514
515 // Untrash a Keep block.
516 // Delete the expires_at metadata attribute
517 func (v *AzureBlobVolume) Untrash(loc string) error {
518         // if expires_at does not exist, return NotFoundError
519         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
520         if err != nil {
521                 return v.translateError(err)
522         }
523         if metadata["expires_at"] == "" {
524                 return os.ErrNotExist
525         }
526
527         // reset expires_at metadata attribute
528         metadata["expires_at"] = ""
529         err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
530         return v.translateError(err)
531 }
532
533 // Status returns a VolumeStatus struct with placeholder data.
534 func (v *AzureBlobVolume) Status() *VolumeStatus {
535         return &VolumeStatus{
536                 DeviceNum: 1,
537                 BytesFree: BlockSize * 1000,
538                 BytesUsed: 1,
539         }
540 }
541
542 // String returns a volume label, including the container name.
543 func (v *AzureBlobVolume) String() string {
544         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
545 }
546
547 // Writable returns true, unless the -readonly flag was on when the
548 // volume was added.
549 func (v *AzureBlobVolume) Writable() bool {
550         return !v.ReadOnly
551 }
552
553 // Replication returns the replication level of the container, as
554 // specified by the -azure-storage-replication argument.
555 func (v *AzureBlobVolume) Replication() int {
556         return v.AzureReplication
557 }
558
559 // If possible, translate an Azure SDK error to a recognizable error
560 // like os.ErrNotExist.
561 func (v *AzureBlobVolume) translateError(err error) error {
562         switch {
563         case err == nil:
564                 return err
565         case strings.Contains(err.Error(), "Not Found"):
566                 // "storage: service returned without a response body (404 Not Found)"
567                 return os.ErrNotExist
568         default:
569                 return err
570         }
571 }
572
573 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
574
575 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
576         return keepBlockRegexp.MatchString(s)
577 }
578
579 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
580 // and deletes them from the volume.
581 func (v *AzureBlobVolume) EmptyTrash() {
582         var bytesDeleted, bytesInTrash int64
583         var blocksDeleted, blocksInTrash int
584         params := storage.ListBlobsParameters{Include: "metadata"}
585
586         for {
587                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
588                 if err != nil {
589                         log.Printf("EmptyTrash: ListBlobs: %v", err)
590                         break
591                 }
592                 for _, b := range resp.Blobs {
593                         // Check if the block is expired
594                         if b.Metadata["expires_at"] == "" {
595                                 continue
596                         }
597
598                         blocksInTrash++
599                         bytesInTrash += b.Properties.ContentLength
600
601                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
602                         if err != nil {
603                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
604                                 continue
605                         }
606
607                         if expiresAt > time.Now().Unix() {
608                                 continue
609                         }
610
611                         err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
612                                 "If-Match": b.Properties.Etag,
613                         })
614                         if err != nil {
615                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
616                                 continue
617                         }
618                         blocksDeleted++
619                         bytesDeleted += b.Properties.ContentLength
620                 }
621                 if resp.NextMarker == "" {
622                         break
623                 }
624                 params.Marker = resp.NextMarker
625         }
626
627         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
628 }
629
630 // InternalStats returns bucket I/O and API call counters.
631 func (v *AzureBlobVolume) InternalStats() interface{} {
632         return &v.bsClient.stats
633 }
634
635 type azureBlobStats struct {
636         statsTicker
637         Ops              uint64
638         GetOps           uint64
639         GetRangeOps      uint64
640         GetMetadataOps   uint64
641         GetPropertiesOps uint64
642         CreateOps        uint64
643         SetMetadataOps   uint64
644         DelOps           uint64
645         ListOps          uint64
646 }
647
648 func (s *azureBlobStats) TickErr(err error) {
649         if err == nil {
650                 return
651         }
652         errType := fmt.Sprintf("%T", err)
653         if err, ok := err.(storage.AzureStorageServiceError); ok {
654                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
655         }
656         log.Printf("errType %T, err %s", err, err)
657         s.statsTicker.TickErr(err, errType)
658 }
659
660 // azureBlobClient wraps storage.BlobStorageClient in order to count
661 // I/O and API usage stats.
662 type azureBlobClient struct {
663         client *storage.BlobStorageClient
664         stats  azureBlobStats
665 }
666
667 func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
668         c.stats.Tick(&c.stats.Ops)
669         ok, err := c.client.ContainerExists(cname)
670         c.stats.TickErr(err)
671         return ok, err
672 }
673
674 func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
675         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
676         m, err := c.client.GetBlobMetadata(cname, bname)
677         c.stats.TickErr(err)
678         return m, err
679 }
680
681 func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
682         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
683         p, err := c.client.GetBlobProperties(cname, bname)
684         c.stats.TickErr(err)
685         return p, err
686 }
687
688 func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
689         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
690         rdr, err := c.client.GetBlob(cname, bname)
691         c.stats.TickErr(err)
692         return NewCountingReader(rdr, c.stats.TickInBytes), err
693 }
694
695 func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
696         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
697         rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
698         c.stats.TickErr(err)
699         return NewCountingReader(rdr, c.stats.TickInBytes), err
700 }
701
702 func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
703         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
704         rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
705         err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
706         c.stats.TickErr(err)
707         return err
708 }
709
710 func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
711         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
712         err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
713         c.stats.TickErr(err)
714         return err
715 }
716
717 func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
718         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
719         resp, err := c.client.ListBlobs(cname, params)
720         c.stats.TickErr(err)
721         return resp, err
722 }
723
724 func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
725         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
726         err := c.client.DeleteBlob(cname, bname, hdrs)
727         c.stats.TickErr(err)
728         return err
729 }