Merge branch '12199-dispatch-to-node-type'
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/curoverse/azure-sdk-for-go/storage"
25 )
26
27 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
28
29 var (
30         azureMaxGetBytes           int
31         azureStorageAccountName    string
32         azureStorageAccountKeyFile string
33         azureStorageReplication    int
34         azureWriteRaceInterval     = 15 * time.Second
35         azureWriteRacePollTime     = time.Second
36 )
37
38 func readKeyFromFile(file string) (string, error) {
39         buf, err := ioutil.ReadFile(file)
40         if err != nil {
41                 return "", errors.New("reading key from " + file + ": " + err.Error())
42         }
43         accountKey := strings.TrimSpace(string(buf))
44         if accountKey == "" {
45                 return "", errors.New("empty account key in " + file)
46         }
47         return accountKey, nil
48 }
49
50 type azureVolumeAdder struct {
51         *Config
52 }
53
54 // String implements flag.Value
55 func (s *azureVolumeAdder) String() string {
56         return "-"
57 }
58
59 func (s *azureVolumeAdder) Set(containerName string) error {
60         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
61                 ContainerName:         containerName,
62                 StorageAccountName:    azureStorageAccountName,
63                 StorageAccountKeyFile: azureStorageAccountKeyFile,
64                 AzureReplication:      azureStorageReplication,
65                 ReadOnly:              deprecated.flagReadonly,
66         })
67         return nil
68 }
69
70 func init() {
71         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
72
73         flag.Var(&azureVolumeAdder{theConfig},
74                 "azure-storage-container-volume",
75                 "Use the given container as a storage volume. Can be given multiple times.")
76         flag.StringVar(
77                 &azureStorageAccountName,
78                 "azure-storage-account-name",
79                 "",
80                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
81         flag.StringVar(
82                 &azureStorageAccountKeyFile,
83                 "azure-storage-account-key-file",
84                 "",
85                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
86         flag.IntVar(
87                 &azureStorageReplication,
88                 "azure-storage-replication",
89                 3,
90                 "Replication level to report to clients when data is stored in an Azure container.")
91         flag.IntVar(
92                 &azureMaxGetBytes,
93                 "azure-max-get-bytes",
94                 BlockSize,
95                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
96 }
97
98 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
99 // container.
100 type AzureBlobVolume struct {
101         StorageAccountName    string
102         StorageAccountKeyFile string
103         StorageBaseURL        string // "" means default, "core.windows.net"
104         ContainerName         string
105         AzureReplication      int
106         ReadOnly              bool
107         RequestTimeout        arvados.Duration
108
109         azClient storage.Client
110         bsClient *azureBlobClient
111 }
112
113 // Examples implements VolumeWithExamples.
114 func (*AzureBlobVolume) Examples() []Volume {
115         return []Volume{
116                 &AzureBlobVolume{
117                         StorageAccountName:    "example-account-name",
118                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
119                         ContainerName:         "example-container-name",
120                         AzureReplication:      3,
121                         RequestTimeout:        azureDefaultRequestTimeout,
122                 },
123                 &AzureBlobVolume{
124                         StorageAccountName:    "cn-account-name",
125                         StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
126                         StorageBaseURL:        "core.chinacloudapi.cn",
127                         ContainerName:         "cn-container-name",
128                         AzureReplication:      3,
129                         RequestTimeout:        azureDefaultRequestTimeout,
130                 },
131         }
132 }
133
134 // Type implements Volume.
135 func (v *AzureBlobVolume) Type() string {
136         return "Azure"
137 }
138
139 // Start implements Volume.
140 func (v *AzureBlobVolume) Start() error {
141         if v.ContainerName == "" {
142                 return errors.New("no container name given")
143         }
144         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
145                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
146         }
147         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
148         if err != nil {
149                 return err
150         }
151         if v.StorageBaseURL == "" {
152                 v.StorageBaseURL = storage.DefaultBaseURL
153         }
154         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
155         if err != nil {
156                 return fmt.Errorf("creating Azure storage client: %s", err)
157         }
158
159         if v.RequestTimeout == 0 {
160                 v.RequestTimeout = azureDefaultRequestTimeout
161         }
162         v.azClient.HTTPClient = &http.Client{
163                 Timeout: time.Duration(v.RequestTimeout),
164         }
165         bs := v.azClient.GetBlobService()
166         v.bsClient = &azureBlobClient{
167                 client: &bs,
168         }
169
170         ok, err := v.bsClient.ContainerExists(v.ContainerName)
171         if err != nil {
172                 return err
173         }
174         if !ok {
175                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
176         }
177         return nil
178 }
179
180 // DeviceID returns a globally unique ID for the storage container.
181 func (v *AzureBlobVolume) DeviceID() string {
182         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
183 }
184
185 // Return true if expires_at metadata attribute is found on the block
186 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
187         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
188         if err != nil {
189                 return false, metadata, v.translateError(err)
190         }
191         if metadata["expires_at"] != "" {
192                 return true, metadata, nil
193         }
194         return false, metadata, nil
195 }
196
197 // Get reads a Keep block that has been stored as a block blob in the
198 // container.
199 //
200 // If the block is younger than azureWriteRaceInterval and is
201 // unexpectedly empty, assume a PutBlob operation is in progress, and
202 // wait for it to finish writing.
203 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
204         trashed, _, err := v.checkTrashed(loc)
205         if err != nil {
206                 return 0, err
207         }
208         if trashed {
209                 return 0, os.ErrNotExist
210         }
211         var deadline time.Time
212         haveDeadline := false
213         size, err := v.get(ctx, loc, buf)
214         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
215                 // Seeing a brand new empty block probably means we're
216                 // in a race with CreateBlob, which under the hood
217                 // (apparently) does "CreateEmpty" and "CommitData"
218                 // with no additional transaction locking.
219                 if !haveDeadline {
220                         t, err := v.Mtime(loc)
221                         if err != nil {
222                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
223                                 break
224                         }
225                         deadline = t.Add(azureWriteRaceInterval)
226                         if time.Now().After(deadline) {
227                                 break
228                         }
229                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
230                         haveDeadline = true
231                 } else if time.Now().After(deadline) {
232                         break
233                 }
234                 select {
235                 case <-ctx.Done():
236                         return 0, ctx.Err()
237                 case <-time.After(azureWriteRacePollTime):
238                 }
239                 size, err = v.get(ctx, loc, buf)
240         }
241         if haveDeadline {
242                 log.Printf("Race ended with size==%d", size)
243         }
244         return size, err
245 }
246
247 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
248         ctx, cancel := context.WithCancel(ctx)
249         defer cancel()
250         expectSize := len(buf)
251         if azureMaxGetBytes < BlockSize {
252                 // Unfortunately the handler doesn't tell us how long the blob
253                 // is expected to be, so we have to ask Azure.
254                 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
255                 if err != nil {
256                         return 0, v.translateError(err)
257                 }
258                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
259                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
260                 }
261                 expectSize = int(props.ContentLength)
262         }
263
264         if expectSize == 0 {
265                 return 0, nil
266         }
267
268         // We'll update this actualSize if/when we get the last piece.
269         actualSize := -1
270         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
271         errors := make(chan error, pieces)
272         var wg sync.WaitGroup
273         wg.Add(pieces)
274         for p := 0; p < pieces; p++ {
275                 // Each goroutine retrieves one piece. If we hit an
276                 // error, it is sent to the errors chan so get() can
277                 // return it -- but only if the error happens before
278                 // ctx is done. This way, if ctx is done before we hit
279                 // any other error (e.g., requesting client has hung
280                 // up), we return the original ctx.Err() instead of
281                 // the secondary errors from the transfers that got
282                 // interrupted as a result.
283                 go func(p int) {
284                         defer wg.Done()
285                         startPos := p * azureMaxGetBytes
286                         endPos := startPos + azureMaxGetBytes
287                         if endPos > expectSize {
288                                 endPos = expectSize
289                         }
290                         var rdr io.ReadCloser
291                         var err error
292                         gotRdr := make(chan struct{})
293                         go func() {
294                                 defer close(gotRdr)
295                                 if startPos == 0 && endPos == expectSize {
296                                         rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
297                                 } else {
298                                         rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
299                                 }
300                         }()
301                         select {
302                         case <-ctx.Done():
303                                 go func() {
304                                         <-gotRdr
305                                         if err == nil {
306                                                 rdr.Close()
307                                         }
308                                 }()
309                                 return
310                         case <-gotRdr:
311                         }
312                         if err != nil {
313                                 errors <- err
314                                 cancel()
315                                 return
316                         }
317                         go func() {
318                                 // Close the reader when the client
319                                 // hangs up or another piece fails
320                                 // (possibly interrupting ReadFull())
321                                 // or when all pieces succeed and
322                                 // get() returns.
323                                 <-ctx.Done()
324                                 rdr.Close()
325                         }()
326                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
327                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
328                                 // If we don't know the actual size,
329                                 // and just tried reading 64 MiB, it's
330                                 // normal to encounter EOF.
331                         } else if err != nil {
332                                 if ctx.Err() == nil {
333                                         errors <- err
334                                 }
335                                 cancel()
336                                 return
337                         }
338                         if p == pieces-1 {
339                                 actualSize = startPos + n
340                         }
341                 }(p)
342         }
343         wg.Wait()
344         close(errors)
345         if len(errors) > 0 {
346                 return 0, v.translateError(<-errors)
347         }
348         if ctx.Err() != nil {
349                 return 0, ctx.Err()
350         }
351         return actualSize, nil
352 }
353
354 // Compare the given data with existing stored data.
355 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
356         trashed, _, err := v.checkTrashed(loc)
357         if err != nil {
358                 return err
359         }
360         if trashed {
361                 return os.ErrNotExist
362         }
363         var rdr io.ReadCloser
364         gotRdr := make(chan struct{})
365         go func() {
366                 defer close(gotRdr)
367                 rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
368         }()
369         select {
370         case <-ctx.Done():
371                 go func() {
372                         <-gotRdr
373                         if err == nil {
374                                 rdr.Close()
375                         }
376                 }()
377                 return ctx.Err()
378         case <-gotRdr:
379         }
380         if err != nil {
381                 return v.translateError(err)
382         }
383         defer rdr.Close()
384         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
385 }
386
387 // Put stores a Keep block as a block blob in the container.
388 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
389         if v.ReadOnly {
390                 return MethodDisabledError
391         }
392         // Send the block data through a pipe, so that (if we need to)
393         // we can close the pipe early and abandon our
394         // CreateBlockBlobFromReader() goroutine, without worrying
395         // about CreateBlockBlobFromReader() accessing our block
396         // buffer after we release it.
397         bufr, bufw := io.Pipe()
398         go func() {
399                 io.Copy(bufw, bytes.NewReader(block))
400                 bufw.Close()
401         }()
402         errChan := make(chan error)
403         go func() {
404                 var body io.Reader = bufr
405                 if len(block) == 0 {
406                         // We must send a "Content-Length: 0" header,
407                         // but the http client interprets
408                         // ContentLength==0 as "unknown" unless it can
409                         // confirm by introspection that Body will
410                         // read 0 bytes.
411                         body = http.NoBody
412                         bufr.Close()
413                 }
414                 errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), body, nil)
415         }()
416         select {
417         case <-ctx.Done():
418                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
419                 // Our pipe might be stuck in Write(), waiting for
420                 // io.Copy() to read. If so, un-stick it. This means
421                 // CreateBlockBlobFromReader will get corrupt data,
422                 // but that's OK: the size won't match, so the write
423                 // will fail.
424                 go io.Copy(ioutil.Discard, bufr)
425                 // CloseWithError() will return once pending I/O is done.
426                 bufw.CloseWithError(ctx.Err())
427                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
428                 return ctx.Err()
429         case err := <-errChan:
430                 return err
431         }
432 }
433
434 // Touch updates the last-modified property of a block blob.
435 func (v *AzureBlobVolume) Touch(loc string) error {
436         if v.ReadOnly {
437                 return MethodDisabledError
438         }
439         trashed, metadata, err := v.checkTrashed(loc)
440         if err != nil {
441                 return err
442         }
443         if trashed {
444                 return os.ErrNotExist
445         }
446
447         metadata["touch"] = fmt.Sprintf("%d", time.Now())
448         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
449 }
450
451 // Mtime returns the last-modified property of a block blob.
452 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
453         trashed, _, err := v.checkTrashed(loc)
454         if err != nil {
455                 return time.Time{}, err
456         }
457         if trashed {
458                 return time.Time{}, os.ErrNotExist
459         }
460
461         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
462         if err != nil {
463                 return time.Time{}, err
464         }
465         return time.Parse(time.RFC1123, props.LastModified)
466 }
467
468 // IndexTo writes a list of Keep blocks that are stored in the
469 // container.
470 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
471         params := storage.ListBlobsParameters{
472                 Prefix:  prefix,
473                 Include: "metadata",
474         }
475         for {
476                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
477                 if err != nil {
478                         return err
479                 }
480                 for _, b := range resp.Blobs {
481                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
482                         if err != nil {
483                                 return err
484                         }
485                         if !v.isKeepBlock(b.Name) {
486                                 continue
487                         }
488                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
489                                 // A new zero-length blob is probably
490                                 // just a new non-empty blob that
491                                 // hasn't committed its data yet (see
492                                 // Get()), and in any case has no
493                                 // value.
494                                 continue
495                         }
496                         if b.Metadata["expires_at"] != "" {
497                                 // Trashed blob; exclude it from response
498                                 continue
499                         }
500                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
501                 }
502                 if resp.NextMarker == "" {
503                         return nil
504                 }
505                 params.Marker = resp.NextMarker
506         }
507 }
508
509 // Trash a Keep block.
510 func (v *AzureBlobVolume) Trash(loc string) error {
511         if v.ReadOnly {
512                 return MethodDisabledError
513         }
514
515         // Ideally we would use If-Unmodified-Since, but that
516         // particular condition seems to be ignored by Azure. Instead,
517         // we get the Etag before checking Mtime, and use If-Match to
518         // ensure we don't delete data if Put() or Touch() happens
519         // between our calls to Mtime() and DeleteBlob().
520         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
521         if err != nil {
522                 return err
523         }
524         if t, err := v.Mtime(loc); err != nil {
525                 return err
526         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
527                 return nil
528         }
529
530         // If TrashLifetime == 0, just delete it
531         if theConfig.TrashLifetime == 0 {
532                 return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
533                         "If-Match": props.Etag,
534                 })
535         }
536
537         // Otherwise, mark as trash
538         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
539                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
540         }, map[string]string{
541                 "If-Match": props.Etag,
542         })
543 }
544
545 // Untrash a Keep block.
546 // Delete the expires_at metadata attribute
547 func (v *AzureBlobVolume) Untrash(loc string) error {
548         // if expires_at does not exist, return NotFoundError
549         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
550         if err != nil {
551                 return v.translateError(err)
552         }
553         if metadata["expires_at"] == "" {
554                 return os.ErrNotExist
555         }
556
557         // reset expires_at metadata attribute
558         metadata["expires_at"] = ""
559         err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
560         return v.translateError(err)
561 }
562
563 // Status returns a VolumeStatus struct with placeholder data.
564 func (v *AzureBlobVolume) Status() *VolumeStatus {
565         return &VolumeStatus{
566                 DeviceNum: 1,
567                 BytesFree: BlockSize * 1000,
568                 BytesUsed: 1,
569         }
570 }
571
572 // String returns a volume label, including the container name.
573 func (v *AzureBlobVolume) String() string {
574         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
575 }
576
577 // Writable returns true, unless the -readonly flag was on when the
578 // volume was added.
579 func (v *AzureBlobVolume) Writable() bool {
580         return !v.ReadOnly
581 }
582
583 // Replication returns the replication level of the container, as
584 // specified by the -azure-storage-replication argument.
585 func (v *AzureBlobVolume) Replication() int {
586         return v.AzureReplication
587 }
588
589 // If possible, translate an Azure SDK error to a recognizable error
590 // like os.ErrNotExist.
591 func (v *AzureBlobVolume) translateError(err error) error {
592         switch {
593         case err == nil:
594                 return err
595         case strings.Contains(err.Error(), "Not Found"):
596                 // "storage: service returned without a response body (404 Not Found)"
597                 return os.ErrNotExist
598         default:
599                 return err
600         }
601 }
602
603 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
604
605 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
606         return keepBlockRegexp.MatchString(s)
607 }
608
609 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
610 // and deletes them from the volume.
611 func (v *AzureBlobVolume) EmptyTrash() {
612         var bytesDeleted, bytesInTrash int64
613         var blocksDeleted, blocksInTrash int
614         params := storage.ListBlobsParameters{Include: "metadata"}
615
616         for {
617                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
618                 if err != nil {
619                         log.Printf("EmptyTrash: ListBlobs: %v", err)
620                         break
621                 }
622                 for _, b := range resp.Blobs {
623                         // Check if the block is expired
624                         if b.Metadata["expires_at"] == "" {
625                                 continue
626                         }
627
628                         blocksInTrash++
629                         bytesInTrash += b.Properties.ContentLength
630
631                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
632                         if err != nil {
633                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
634                                 continue
635                         }
636
637                         if expiresAt > time.Now().Unix() {
638                                 continue
639                         }
640
641                         err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
642                                 "If-Match": b.Properties.Etag,
643                         })
644                         if err != nil {
645                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
646                                 continue
647                         }
648                         blocksDeleted++
649                         bytesDeleted += b.Properties.ContentLength
650                 }
651                 if resp.NextMarker == "" {
652                         break
653                 }
654                 params.Marker = resp.NextMarker
655         }
656
657         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
658 }
659
660 // InternalStats returns bucket I/O and API call counters.
661 func (v *AzureBlobVolume) InternalStats() interface{} {
662         return &v.bsClient.stats
663 }
664
665 type azureBlobStats struct {
666         statsTicker
667         Ops              uint64
668         GetOps           uint64
669         GetRangeOps      uint64
670         GetMetadataOps   uint64
671         GetPropertiesOps uint64
672         CreateOps        uint64
673         SetMetadataOps   uint64
674         DelOps           uint64
675         ListOps          uint64
676 }
677
678 func (s *azureBlobStats) TickErr(err error) {
679         if err == nil {
680                 return
681         }
682         errType := fmt.Sprintf("%T", err)
683         if err, ok := err.(storage.AzureStorageServiceError); ok {
684                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
685         }
686         log.Printf("errType %T, err %s", err, err)
687         s.statsTicker.TickErr(err, errType)
688 }
689
690 // azureBlobClient wraps storage.BlobStorageClient in order to count
691 // I/O and API usage stats.
692 type azureBlobClient struct {
693         client *storage.BlobStorageClient
694         stats  azureBlobStats
695 }
696
697 func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
698         c.stats.Tick(&c.stats.Ops)
699         ok, err := c.client.ContainerExists(cname)
700         c.stats.TickErr(err)
701         return ok, err
702 }
703
704 func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
705         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
706         m, err := c.client.GetBlobMetadata(cname, bname)
707         c.stats.TickErr(err)
708         return m, err
709 }
710
711 func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
712         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
713         p, err := c.client.GetBlobProperties(cname, bname)
714         c.stats.TickErr(err)
715         return p, err
716 }
717
718 func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
719         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
720         rdr, err := c.client.GetBlob(cname, bname)
721         c.stats.TickErr(err)
722         return NewCountingReader(rdr, c.stats.TickInBytes), err
723 }
724
725 func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
726         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
727         rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
728         c.stats.TickErr(err)
729         return NewCountingReader(rdr, c.stats.TickInBytes), err
730 }
731
732 func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
733         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
734         if size != 0 {
735                 rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
736         }
737         err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
738         c.stats.TickErr(err)
739         return err
740 }
741
742 func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
743         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
744         err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
745         c.stats.TickErr(err)
746         return err
747 }
748
749 func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
750         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
751         resp, err := c.client.ListBlobs(cname, params)
752         c.stats.TickErr(err)
753         return resp, err
754 }
755
756 func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
757         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
758         err := c.client.DeleteBlob(cname, bname, hdrs)
759         c.stats.TickErr(err)
760         return err
761 }