12199: Merge branch 'master' into 12199-dispatch-to-node-type
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         log "github.com/Sirupsen/logrus"
25         "github.com/curoverse/azure-sdk-for-go/storage"
26 )
27
28 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
29
30 var (
31         azureMaxGetBytes           int
32         azureStorageAccountName    string
33         azureStorageAccountKeyFile string
34         azureStorageReplication    int
35         azureWriteRaceInterval     = 15 * time.Second
36         azureWriteRacePollTime     = time.Second
37 )
38
39 func readKeyFromFile(file string) (string, error) {
40         buf, err := ioutil.ReadFile(file)
41         if err != nil {
42                 return "", errors.New("reading key from " + file + ": " + err.Error())
43         }
44         accountKey := strings.TrimSpace(string(buf))
45         if accountKey == "" {
46                 return "", errors.New("empty account key in " + file)
47         }
48         return accountKey, nil
49 }
50
51 type azureVolumeAdder struct {
52         *Config
53 }
54
55 // String implements flag.Value
56 func (s *azureVolumeAdder) String() string {
57         return "-"
58 }
59
60 func (s *azureVolumeAdder) Set(containerName string) error {
61         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
62                 ContainerName:         containerName,
63                 StorageAccountName:    azureStorageAccountName,
64                 StorageAccountKeyFile: azureStorageAccountKeyFile,
65                 AzureReplication:      azureStorageReplication,
66                 ReadOnly:              deprecated.flagReadonly,
67         })
68         return nil
69 }
70
71 func init() {
72         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
73
74         flag.Var(&azureVolumeAdder{theConfig},
75                 "azure-storage-container-volume",
76                 "Use the given container as a storage volume. Can be given multiple times.")
77         flag.StringVar(
78                 &azureStorageAccountName,
79                 "azure-storage-account-name",
80                 "",
81                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
82         flag.StringVar(
83                 &azureStorageAccountKeyFile,
84                 "azure-storage-account-key-file",
85                 "",
86                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
87         flag.IntVar(
88                 &azureStorageReplication,
89                 "azure-storage-replication",
90                 3,
91                 "Replication level to report to clients when data is stored in an Azure container.")
92         flag.IntVar(
93                 &azureMaxGetBytes,
94                 "azure-max-get-bytes",
95                 BlockSize,
96                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
97 }
98
99 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
100 // container.
101 type AzureBlobVolume struct {
102         StorageAccountName    string
103         StorageAccountKeyFile string
104         StorageBaseURL        string // "" means default, "core.windows.net"
105         ContainerName         string
106         AzureReplication      int
107         ReadOnly              bool
108         RequestTimeout        arvados.Duration
109
110         azClient storage.Client
111         bsClient *azureBlobClient
112 }
113
114 // Examples implements VolumeWithExamples.
115 func (*AzureBlobVolume) Examples() []Volume {
116         return []Volume{
117                 &AzureBlobVolume{
118                         StorageAccountName:    "example-account-name",
119                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
120                         ContainerName:         "example-container-name",
121                         AzureReplication:      3,
122                         RequestTimeout:        azureDefaultRequestTimeout,
123                 },
124                 &AzureBlobVolume{
125                         StorageAccountName:    "cn-account-name",
126                         StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
127                         StorageBaseURL:        "core.chinacloudapi.cn",
128                         ContainerName:         "cn-container-name",
129                         AzureReplication:      3,
130                         RequestTimeout:        azureDefaultRequestTimeout,
131                 },
132         }
133 }
134
135 // Type implements Volume.
136 func (v *AzureBlobVolume) Type() string {
137         return "Azure"
138 }
139
140 // Start implements Volume.
141 func (v *AzureBlobVolume) Start() error {
142         if v.ContainerName == "" {
143                 return errors.New("no container name given")
144         }
145         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
146                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
147         }
148         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
149         if err != nil {
150                 return err
151         }
152         if v.StorageBaseURL == "" {
153                 v.StorageBaseURL = storage.DefaultBaseURL
154         }
155         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
156         if err != nil {
157                 return fmt.Errorf("creating Azure storage client: %s", err)
158         }
159
160         if v.RequestTimeout == 0 {
161                 v.RequestTimeout = azureDefaultRequestTimeout
162         }
163         v.azClient.HTTPClient = &http.Client{
164                 Timeout: time.Duration(v.RequestTimeout),
165         }
166         bs := v.azClient.GetBlobService()
167         v.bsClient = &azureBlobClient{
168                 client: &bs,
169         }
170
171         ok, err := v.bsClient.ContainerExists(v.ContainerName)
172         if err != nil {
173                 return err
174         }
175         if !ok {
176                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
177         }
178         return nil
179 }
180
181 // DeviceID returns a globally unique ID for the storage container.
182 func (v *AzureBlobVolume) DeviceID() string {
183         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
184 }
185
186 // Return true if expires_at metadata attribute is found on the block
187 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
188         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
189         if err != nil {
190                 return false, metadata, v.translateError(err)
191         }
192         if metadata["expires_at"] != "" {
193                 return true, metadata, nil
194         }
195         return false, metadata, nil
196 }
197
198 // Get reads a Keep block that has been stored as a block blob in the
199 // container.
200 //
201 // If the block is younger than azureWriteRaceInterval and is
202 // unexpectedly empty, assume a PutBlob operation is in progress, and
203 // wait for it to finish writing.
204 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
205         trashed, _, err := v.checkTrashed(loc)
206         if err != nil {
207                 return 0, err
208         }
209         if trashed {
210                 return 0, os.ErrNotExist
211         }
212         var deadline time.Time
213         haveDeadline := false
214         size, err := v.get(ctx, loc, buf)
215         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
216                 // Seeing a brand new empty block probably means we're
217                 // in a race with CreateBlob, which under the hood
218                 // (apparently) does "CreateEmpty" and "CommitData"
219                 // with no additional transaction locking.
220                 if !haveDeadline {
221                         t, err := v.Mtime(loc)
222                         if err != nil {
223                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
224                                 break
225                         }
226                         deadline = t.Add(azureWriteRaceInterval)
227                         if time.Now().After(deadline) {
228                                 break
229                         }
230                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
231                         haveDeadline = true
232                 } else if time.Now().After(deadline) {
233                         break
234                 }
235                 select {
236                 case <-ctx.Done():
237                         return 0, ctx.Err()
238                 case <-time.After(azureWriteRacePollTime):
239                 }
240                 size, err = v.get(ctx, loc, buf)
241         }
242         if haveDeadline {
243                 log.Printf("Race ended with size==%d", size)
244         }
245         return size, err
246 }
247
248 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
249         ctx, cancel := context.WithCancel(ctx)
250         defer cancel()
251         expectSize := len(buf)
252         if azureMaxGetBytes < BlockSize {
253                 // Unfortunately the handler doesn't tell us how long the blob
254                 // is expected to be, so we have to ask Azure.
255                 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
256                 if err != nil {
257                         return 0, v.translateError(err)
258                 }
259                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
260                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
261                 }
262                 expectSize = int(props.ContentLength)
263         }
264
265         if expectSize == 0 {
266                 return 0, nil
267         }
268
269         // We'll update this actualSize if/when we get the last piece.
270         actualSize := -1
271         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
272         errors := make(chan error, pieces)
273         var wg sync.WaitGroup
274         wg.Add(pieces)
275         for p := 0; p < pieces; p++ {
276                 // Each goroutine retrieves one piece. If we hit an
277                 // error, it is sent to the errors chan so get() can
278                 // return it -- but only if the error happens before
279                 // ctx is done. This way, if ctx is done before we hit
280                 // any other error (e.g., requesting client has hung
281                 // up), we return the original ctx.Err() instead of
282                 // the secondary errors from the transfers that got
283                 // interrupted as a result.
284                 go func(p int) {
285                         defer wg.Done()
286                         startPos := p * azureMaxGetBytes
287                         endPos := startPos + azureMaxGetBytes
288                         if endPos > expectSize {
289                                 endPos = expectSize
290                         }
291                         var rdr io.ReadCloser
292                         var err error
293                         gotRdr := make(chan struct{})
294                         go func() {
295                                 defer close(gotRdr)
296                                 if startPos == 0 && endPos == expectSize {
297                                         rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
298                                 } else {
299                                         rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
300                                 }
301                         }()
302                         select {
303                         case <-ctx.Done():
304                                 go func() {
305                                         <-gotRdr
306                                         if err == nil {
307                                                 rdr.Close()
308                                         }
309                                 }()
310                                 return
311                         case <-gotRdr:
312                         }
313                         if err != nil {
314                                 errors <- err
315                                 cancel()
316                                 return
317                         }
318                         go func() {
319                                 // Close the reader when the client
320                                 // hangs up or another piece fails
321                                 // (possibly interrupting ReadFull())
322                                 // or when all pieces succeed and
323                                 // get() returns.
324                                 <-ctx.Done()
325                                 rdr.Close()
326                         }()
327                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
328                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
329                                 // If we don't know the actual size,
330                                 // and just tried reading 64 MiB, it's
331                                 // normal to encounter EOF.
332                         } else if err != nil {
333                                 if ctx.Err() == nil {
334                                         errors <- err
335                                 }
336                                 cancel()
337                                 return
338                         }
339                         if p == pieces-1 {
340                                 actualSize = startPos + n
341                         }
342                 }(p)
343         }
344         wg.Wait()
345         close(errors)
346         if len(errors) > 0 {
347                 return 0, v.translateError(<-errors)
348         }
349         if ctx.Err() != nil {
350                 return 0, ctx.Err()
351         }
352         return actualSize, nil
353 }
354
355 // Compare the given data with existing stored data.
356 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
357         trashed, _, err := v.checkTrashed(loc)
358         if err != nil {
359                 return err
360         }
361         if trashed {
362                 return os.ErrNotExist
363         }
364         var rdr io.ReadCloser
365         gotRdr := make(chan struct{})
366         go func() {
367                 defer close(gotRdr)
368                 rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
369         }()
370         select {
371         case <-ctx.Done():
372                 go func() {
373                         <-gotRdr
374                         if err == nil {
375                                 rdr.Close()
376                         }
377                 }()
378                 return ctx.Err()
379         case <-gotRdr:
380         }
381         if err != nil {
382                 return v.translateError(err)
383         }
384         defer rdr.Close()
385         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
386 }
387
388 // Put stores a Keep block as a block blob in the container.
389 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
390         if v.ReadOnly {
391                 return MethodDisabledError
392         }
393         // Send the block data through a pipe, so that (if we need to)
394         // we can close the pipe early and abandon our
395         // CreateBlockBlobFromReader() goroutine, without worrying
396         // about CreateBlockBlobFromReader() accessing our block
397         // buffer after we release it.
398         bufr, bufw := io.Pipe()
399         go func() {
400                 io.Copy(bufw, bytes.NewReader(block))
401                 bufw.Close()
402         }()
403         errChan := make(chan error)
404         go func() {
405                 var body io.Reader = bufr
406                 if len(block) == 0 {
407                         // We must send a "Content-Length: 0" header,
408                         // but the http client interprets
409                         // ContentLength==0 as "unknown" unless it can
410                         // confirm by introspection that Body will
411                         // read 0 bytes.
412                         body = http.NoBody
413                         bufr.Close()
414                 }
415                 errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), body, nil)
416         }()
417         select {
418         case <-ctx.Done():
419                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
420                 // Our pipe might be stuck in Write(), waiting for
421                 // io.Copy() to read. If so, un-stick it. This means
422                 // CreateBlockBlobFromReader will get corrupt data,
423                 // but that's OK: the size won't match, so the write
424                 // will fail.
425                 go io.Copy(ioutil.Discard, bufr)
426                 // CloseWithError() will return once pending I/O is done.
427                 bufw.CloseWithError(ctx.Err())
428                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
429                 return ctx.Err()
430         case err := <-errChan:
431                 return err
432         }
433 }
434
435 // Touch updates the last-modified property of a block blob.
436 func (v *AzureBlobVolume) Touch(loc string) error {
437         if v.ReadOnly {
438                 return MethodDisabledError
439         }
440         trashed, metadata, err := v.checkTrashed(loc)
441         if err != nil {
442                 return err
443         }
444         if trashed {
445                 return os.ErrNotExist
446         }
447
448         metadata["touch"] = fmt.Sprintf("%d", time.Now())
449         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
450 }
451
452 // Mtime returns the last-modified property of a block blob.
453 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
454         trashed, _, err := v.checkTrashed(loc)
455         if err != nil {
456                 return time.Time{}, err
457         }
458         if trashed {
459                 return time.Time{}, os.ErrNotExist
460         }
461
462         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
463         if err != nil {
464                 return time.Time{}, err
465         }
466         return time.Parse(time.RFC1123, props.LastModified)
467 }
468
469 // IndexTo writes a list of Keep blocks that are stored in the
470 // container.
471 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
472         params := storage.ListBlobsParameters{
473                 Prefix:  prefix,
474                 Include: "metadata",
475         }
476         for {
477                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
478                 if err != nil {
479                         return err
480                 }
481                 for _, b := range resp.Blobs {
482                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
483                         if err != nil {
484                                 return err
485                         }
486                         if !v.isKeepBlock(b.Name) {
487                                 continue
488                         }
489                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
490                                 // A new zero-length blob is probably
491                                 // just a new non-empty blob that
492                                 // hasn't committed its data yet (see
493                                 // Get()), and in any case has no
494                                 // value.
495                                 continue
496                         }
497                         if b.Metadata["expires_at"] != "" {
498                                 // Trashed blob; exclude it from response
499                                 continue
500                         }
501                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
502                 }
503                 if resp.NextMarker == "" {
504                         return nil
505                 }
506                 params.Marker = resp.NextMarker
507         }
508 }
509
510 // Trash a Keep block.
511 func (v *AzureBlobVolume) Trash(loc string) error {
512         if v.ReadOnly {
513                 return MethodDisabledError
514         }
515
516         // Ideally we would use If-Unmodified-Since, but that
517         // particular condition seems to be ignored by Azure. Instead,
518         // we get the Etag before checking Mtime, and use If-Match to
519         // ensure we don't delete data if Put() or Touch() happens
520         // between our calls to Mtime() and DeleteBlob().
521         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
522         if err != nil {
523                 return err
524         }
525         if t, err := v.Mtime(loc); err != nil {
526                 return err
527         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
528                 return nil
529         }
530
531         // If TrashLifetime == 0, just delete it
532         if theConfig.TrashLifetime == 0 {
533                 return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
534                         "If-Match": props.Etag,
535                 })
536         }
537
538         // Otherwise, mark as trash
539         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
540                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
541         }, map[string]string{
542                 "If-Match": props.Etag,
543         })
544 }
545
546 // Untrash a Keep block.
547 // Delete the expires_at metadata attribute
548 func (v *AzureBlobVolume) Untrash(loc string) error {
549         // if expires_at does not exist, return NotFoundError
550         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
551         if err != nil {
552                 return v.translateError(err)
553         }
554         if metadata["expires_at"] == "" {
555                 return os.ErrNotExist
556         }
557
558         // reset expires_at metadata attribute
559         metadata["expires_at"] = ""
560         err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
561         return v.translateError(err)
562 }
563
564 // Status returns a VolumeStatus struct with placeholder data.
565 func (v *AzureBlobVolume) Status() *VolumeStatus {
566         return &VolumeStatus{
567                 DeviceNum: 1,
568                 BytesFree: BlockSize * 1000,
569                 BytesUsed: 1,
570         }
571 }
572
573 // String returns a volume label, including the container name.
574 func (v *AzureBlobVolume) String() string {
575         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
576 }
577
578 // Writable returns true, unless the -readonly flag was on when the
579 // volume was added.
580 func (v *AzureBlobVolume) Writable() bool {
581         return !v.ReadOnly
582 }
583
584 // Replication returns the replication level of the container, as
585 // specified by the -azure-storage-replication argument.
586 func (v *AzureBlobVolume) Replication() int {
587         return v.AzureReplication
588 }
589
590 // If possible, translate an Azure SDK error to a recognizable error
591 // like os.ErrNotExist.
592 func (v *AzureBlobVolume) translateError(err error) error {
593         switch {
594         case err == nil:
595                 return err
596         case strings.Contains(err.Error(), "Not Found"):
597                 // "storage: service returned without a response body (404 Not Found)"
598                 return os.ErrNotExist
599         default:
600                 return err
601         }
602 }
603
604 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
605
606 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
607         return keepBlockRegexp.MatchString(s)
608 }
609
610 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
611 // and deletes them from the volume.
612 func (v *AzureBlobVolume) EmptyTrash() {
613         var bytesDeleted, bytesInTrash int64
614         var blocksDeleted, blocksInTrash int
615         params := storage.ListBlobsParameters{Include: "metadata"}
616
617         for {
618                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
619                 if err != nil {
620                         log.Printf("EmptyTrash: ListBlobs: %v", err)
621                         break
622                 }
623                 for _, b := range resp.Blobs {
624                         // Check if the block is expired
625                         if b.Metadata["expires_at"] == "" {
626                                 continue
627                         }
628
629                         blocksInTrash++
630                         bytesInTrash += b.Properties.ContentLength
631
632                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
633                         if err != nil {
634                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
635                                 continue
636                         }
637
638                         if expiresAt > time.Now().Unix() {
639                                 continue
640                         }
641
642                         err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
643                                 "If-Match": b.Properties.Etag,
644                         })
645                         if err != nil {
646                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
647                                 continue
648                         }
649                         blocksDeleted++
650                         bytesDeleted += b.Properties.ContentLength
651                 }
652                 if resp.NextMarker == "" {
653                         break
654                 }
655                 params.Marker = resp.NextMarker
656         }
657
658         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
659 }
660
661 // InternalStats returns bucket I/O and API call counters.
662 func (v *AzureBlobVolume) InternalStats() interface{} {
663         return &v.bsClient.stats
664 }
665
666 type azureBlobStats struct {
667         statsTicker
668         Ops              uint64
669         GetOps           uint64
670         GetRangeOps      uint64
671         GetMetadataOps   uint64
672         GetPropertiesOps uint64
673         CreateOps        uint64
674         SetMetadataOps   uint64
675         DelOps           uint64
676         ListOps          uint64
677 }
678
679 func (s *azureBlobStats) TickErr(err error) {
680         if err == nil {
681                 return
682         }
683         errType := fmt.Sprintf("%T", err)
684         if err, ok := err.(storage.AzureStorageServiceError); ok {
685                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
686         }
687         log.Printf("errType %T, err %s", err, err)
688         s.statsTicker.TickErr(err, errType)
689 }
690
691 // azureBlobClient wraps storage.BlobStorageClient in order to count
692 // I/O and API usage stats.
693 type azureBlobClient struct {
694         client *storage.BlobStorageClient
695         stats  azureBlobStats
696 }
697
698 func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
699         c.stats.Tick(&c.stats.Ops)
700         ok, err := c.client.ContainerExists(cname)
701         c.stats.TickErr(err)
702         return ok, err
703 }
704
705 func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
706         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
707         m, err := c.client.GetBlobMetadata(cname, bname)
708         c.stats.TickErr(err)
709         return m, err
710 }
711
712 func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
713         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
714         p, err := c.client.GetBlobProperties(cname, bname)
715         c.stats.TickErr(err)
716         return p, err
717 }
718
719 func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
720         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
721         rdr, err := c.client.GetBlob(cname, bname)
722         c.stats.TickErr(err)
723         return NewCountingReader(rdr, c.stats.TickInBytes), err
724 }
725
726 func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
727         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
728         rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
729         c.stats.TickErr(err)
730         return NewCountingReader(rdr, c.stats.TickInBytes), err
731 }
732
733 func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
734         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
735         if size != 0 {
736                 rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
737         }
738         err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
739         c.stats.TickErr(err)
740         return err
741 }
742
743 func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
744         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
745         err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
746         c.stats.TickErr(err)
747         return err
748 }
749
750 func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
751         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
752         resp, err := c.client.ListBlobs(cname, params)
753         c.stats.TickErr(err)
754         return resp, err
755 }
756
757 func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
758         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
759         err := c.client.DeleteBlob(cname, bname, hdrs)
760         c.stats.TickErr(err)
761         return err
762 }