Merge branch '11642-null-job-log' refs #11642
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/http"
12         "os"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         log "github.com/Sirupsen/logrus"
21         "github.com/curoverse/azure-sdk-for-go/storage"
22 )
23
24 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
25
26 var (
27         azureMaxGetBytes           int
28         azureStorageAccountName    string
29         azureStorageAccountKeyFile string
30         azureStorageReplication    int
31         azureWriteRaceInterval     = 15 * time.Second
32         azureWriteRacePollTime     = time.Second
33 )
34
35 func readKeyFromFile(file string) (string, error) {
36         buf, err := ioutil.ReadFile(file)
37         if err != nil {
38                 return "", errors.New("reading key from " + file + ": " + err.Error())
39         }
40         accountKey := strings.TrimSpace(string(buf))
41         if accountKey == "" {
42                 return "", errors.New("empty account key in " + file)
43         }
44         return accountKey, nil
45 }
46
47 type azureVolumeAdder struct {
48         *Config
49 }
50
51 // String implements flag.Value
52 func (s *azureVolumeAdder) String() string {
53         return "-"
54 }
55
56 func (s *azureVolumeAdder) Set(containerName string) error {
57         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
58                 ContainerName:         containerName,
59                 StorageAccountName:    azureStorageAccountName,
60                 StorageAccountKeyFile: azureStorageAccountKeyFile,
61                 AzureReplication:      azureStorageReplication,
62                 ReadOnly:              deprecated.flagReadonly,
63         })
64         return nil
65 }
66
67 func init() {
68         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
69
70         flag.Var(&azureVolumeAdder{theConfig},
71                 "azure-storage-container-volume",
72                 "Use the given container as a storage volume. Can be given multiple times.")
73         flag.StringVar(
74                 &azureStorageAccountName,
75                 "azure-storage-account-name",
76                 "",
77                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
78         flag.StringVar(
79                 &azureStorageAccountKeyFile,
80                 "azure-storage-account-key-file",
81                 "",
82                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
83         flag.IntVar(
84                 &azureStorageReplication,
85                 "azure-storage-replication",
86                 3,
87                 "Replication level to report to clients when data is stored in an Azure container.")
88         flag.IntVar(
89                 &azureMaxGetBytes,
90                 "azure-max-get-bytes",
91                 BlockSize,
92                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
93 }
94
95 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
96 // container.
97 type AzureBlobVolume struct {
98         StorageAccountName    string
99         StorageAccountKeyFile string
100         StorageBaseURL        string // "" means default, "core.windows.net"
101         ContainerName         string
102         AzureReplication      int
103         ReadOnly              bool
104         RequestTimeout        arvados.Duration
105
106         azClient storage.Client
107         bsClient *azureBlobClient
108 }
109
110 // Examples implements VolumeWithExamples.
111 func (*AzureBlobVolume) Examples() []Volume {
112         return []Volume{
113                 &AzureBlobVolume{
114                         StorageAccountName:    "example-account-name",
115                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
116                         ContainerName:         "example-container-name",
117                         AzureReplication:      3,
118                         RequestTimeout:        azureDefaultRequestTimeout,
119                 },
120                 &AzureBlobVolume{
121                         StorageAccountName:    "cn-account-name",
122                         StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
123                         StorageBaseURL:        "core.chinacloudapi.cn",
124                         ContainerName:         "cn-container-name",
125                         AzureReplication:      3,
126                         RequestTimeout:        azureDefaultRequestTimeout,
127                 },
128         }
129 }
130
131 // Type implements Volume.
132 func (v *AzureBlobVolume) Type() string {
133         return "Azure"
134 }
135
136 // Start implements Volume.
137 func (v *AzureBlobVolume) Start() error {
138         if v.ContainerName == "" {
139                 return errors.New("no container name given")
140         }
141         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
142                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
143         }
144         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
145         if err != nil {
146                 return err
147         }
148         if v.StorageBaseURL == "" {
149                 v.StorageBaseURL = storage.DefaultBaseURL
150         }
151         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
152         if err != nil {
153                 return fmt.Errorf("creating Azure storage client: %s", err)
154         }
155
156         if v.RequestTimeout == 0 {
157                 v.RequestTimeout = azureDefaultRequestTimeout
158         }
159         v.azClient.HTTPClient = &http.Client{
160                 Timeout: time.Duration(v.RequestTimeout),
161         }
162         bs := v.azClient.GetBlobService()
163         v.bsClient = &azureBlobClient{
164                 client: &bs,
165         }
166
167         ok, err := v.bsClient.ContainerExists(v.ContainerName)
168         if err != nil {
169                 return err
170         }
171         if !ok {
172                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
173         }
174         return nil
175 }
176
177 // DeviceID returns a globally unique ID for the storage container.
178 func (v *AzureBlobVolume) DeviceID() string {
179         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
180 }
181
182 // Return true if expires_at metadata attribute is found on the block
183 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
184         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
185         if err != nil {
186                 return false, metadata, v.translateError(err)
187         }
188         if metadata["expires_at"] != "" {
189                 return true, metadata, nil
190         }
191         return false, metadata, nil
192 }
193
194 // Get reads a Keep block that has been stored as a block blob in the
195 // container.
196 //
197 // If the block is younger than azureWriteRaceInterval and is
198 // unexpectedly empty, assume a PutBlob operation is in progress, and
199 // wait for it to finish writing.
200 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
201         trashed, _, err := v.checkTrashed(loc)
202         if err != nil {
203                 return 0, err
204         }
205         if trashed {
206                 return 0, os.ErrNotExist
207         }
208         var deadline time.Time
209         haveDeadline := false
210         size, err := v.get(ctx, loc, buf)
211         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
212                 // Seeing a brand new empty block probably means we're
213                 // in a race with CreateBlob, which under the hood
214                 // (apparently) does "CreateEmpty" and "CommitData"
215                 // with no additional transaction locking.
216                 if !haveDeadline {
217                         t, err := v.Mtime(loc)
218                         if err != nil {
219                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
220                                 break
221                         }
222                         deadline = t.Add(azureWriteRaceInterval)
223                         if time.Now().After(deadline) {
224                                 break
225                         }
226                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
227                         haveDeadline = true
228                 } else if time.Now().After(deadline) {
229                         break
230                 }
231                 select {
232                 case <-ctx.Done():
233                         return 0, ctx.Err()
234                 case <-time.After(azureWriteRacePollTime):
235                 }
236                 size, err = v.get(ctx, loc, buf)
237         }
238         if haveDeadline {
239                 log.Printf("Race ended with size==%d", size)
240         }
241         return size, err
242 }
243
244 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
245         ctx, cancel := context.WithCancel(ctx)
246         defer cancel()
247         expectSize := len(buf)
248         if azureMaxGetBytes < BlockSize {
249                 // Unfortunately the handler doesn't tell us how long the blob
250                 // is expected to be, so we have to ask Azure.
251                 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
252                 if err != nil {
253                         return 0, v.translateError(err)
254                 }
255                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
256                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
257                 }
258                 expectSize = int(props.ContentLength)
259         }
260
261         if expectSize == 0 {
262                 return 0, nil
263         }
264
265         // We'll update this actualSize if/when we get the last piece.
266         actualSize := -1
267         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
268         errors := make(chan error, pieces)
269         var wg sync.WaitGroup
270         wg.Add(pieces)
271         for p := 0; p < pieces; p++ {
272                 // Each goroutine retrieves one piece. If we hit an
273                 // error, it is sent to the errors chan so get() can
274                 // return it -- but only if the error happens before
275                 // ctx is done. This way, if ctx is done before we hit
276                 // any other error (e.g., requesting client has hung
277                 // up), we return the original ctx.Err() instead of
278                 // the secondary errors from the transfers that got
279                 // interrupted as a result.
280                 go func(p int) {
281                         defer wg.Done()
282                         startPos := p * azureMaxGetBytes
283                         endPos := startPos + azureMaxGetBytes
284                         if endPos > expectSize {
285                                 endPos = expectSize
286                         }
287                         var rdr io.ReadCloser
288                         var err error
289                         gotRdr := make(chan struct{})
290                         go func() {
291                                 defer close(gotRdr)
292                                 if startPos == 0 && endPos == expectSize {
293                                         rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
294                                 } else {
295                                         rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
296                                 }
297                         }()
298                         select {
299                         case <-ctx.Done():
300                                 go func() {
301                                         <-gotRdr
302                                         if err == nil {
303                                                 rdr.Close()
304                                         }
305                                 }()
306                                 return
307                         case <-gotRdr:
308                         }
309                         if err != nil {
310                                 errors <- err
311                                 cancel()
312                                 return
313                         }
314                         go func() {
315                                 // Close the reader when the client
316                                 // hangs up or another piece fails
317                                 // (possibly interrupting ReadFull())
318                                 // or when all pieces succeed and
319                                 // get() returns.
320                                 <-ctx.Done()
321                                 rdr.Close()
322                         }()
323                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
324                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
325                                 // If we don't know the actual size,
326                                 // and just tried reading 64 MiB, it's
327                                 // normal to encounter EOF.
328                         } else if err != nil {
329                                 if ctx.Err() == nil {
330                                         errors <- err
331                                 }
332                                 cancel()
333                                 return
334                         }
335                         if p == pieces-1 {
336                                 actualSize = startPos + n
337                         }
338                 }(p)
339         }
340         wg.Wait()
341         close(errors)
342         if len(errors) > 0 {
343                 return 0, v.translateError(<-errors)
344         }
345         if ctx.Err() != nil {
346                 return 0, ctx.Err()
347         }
348         return actualSize, nil
349 }
350
351 // Compare the given data with existing stored data.
352 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
353         trashed, _, err := v.checkTrashed(loc)
354         if err != nil {
355                 return err
356         }
357         if trashed {
358                 return os.ErrNotExist
359         }
360         var rdr io.ReadCloser
361         gotRdr := make(chan struct{})
362         go func() {
363                 defer close(gotRdr)
364                 rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
365         }()
366         select {
367         case <-ctx.Done():
368                 go func() {
369                         <-gotRdr
370                         if err == nil {
371                                 rdr.Close()
372                         }
373                 }()
374                 return ctx.Err()
375         case <-gotRdr:
376         }
377         if err != nil {
378                 return v.translateError(err)
379         }
380         defer rdr.Close()
381         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
382 }
383
384 // Put stores a Keep block as a block blob in the container.
385 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
386         if v.ReadOnly {
387                 return MethodDisabledError
388         }
389         // Send the block data through a pipe, so that (if we need to)
390         // we can close the pipe early and abandon our
391         // CreateBlockBlobFromReader() goroutine, without worrying
392         // about CreateBlockBlobFromReader() accessing our block
393         // buffer after we release it.
394         bufr, bufw := io.Pipe()
395         go func() {
396                 io.Copy(bufw, bytes.NewReader(block))
397                 bufw.Close()
398         }()
399         errChan := make(chan error)
400         go func() {
401                 errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bufr, nil)
402         }()
403         select {
404         case <-ctx.Done():
405                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
406                 // Our pipe might be stuck in Write(), waiting for
407                 // io.Copy() to read. If so, un-stick it. This means
408                 // CreateBlockBlobFromReader will get corrupt data,
409                 // but that's OK: the size won't match, so the write
410                 // will fail.
411                 go io.Copy(ioutil.Discard, bufr)
412                 // CloseWithError() will return once pending I/O is done.
413                 bufw.CloseWithError(ctx.Err())
414                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
415                 return ctx.Err()
416         case err := <-errChan:
417                 return err
418         }
419 }
420
421 // Touch updates the last-modified property of a block blob.
422 func (v *AzureBlobVolume) Touch(loc string) error {
423         if v.ReadOnly {
424                 return MethodDisabledError
425         }
426         trashed, metadata, err := v.checkTrashed(loc)
427         if err != nil {
428                 return err
429         }
430         if trashed {
431                 return os.ErrNotExist
432         }
433
434         metadata["touch"] = fmt.Sprintf("%d", time.Now())
435         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
436 }
437
438 // Mtime returns the last-modified property of a block blob.
439 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
440         trashed, _, err := v.checkTrashed(loc)
441         if err != nil {
442                 return time.Time{}, err
443         }
444         if trashed {
445                 return time.Time{}, os.ErrNotExist
446         }
447
448         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
449         if err != nil {
450                 return time.Time{}, err
451         }
452         return time.Parse(time.RFC1123, props.LastModified)
453 }
454
455 // IndexTo writes a list of Keep blocks that are stored in the
456 // container.
457 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
458         params := storage.ListBlobsParameters{
459                 Prefix:  prefix,
460                 Include: "metadata",
461         }
462         for {
463                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
464                 if err != nil {
465                         return err
466                 }
467                 for _, b := range resp.Blobs {
468                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
469                         if err != nil {
470                                 return err
471                         }
472                         if !v.isKeepBlock(b.Name) {
473                                 continue
474                         }
475                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
476                                 // A new zero-length blob is probably
477                                 // just a new non-empty blob that
478                                 // hasn't committed its data yet (see
479                                 // Get()), and in any case has no
480                                 // value.
481                                 continue
482                         }
483                         if b.Metadata["expires_at"] != "" {
484                                 // Trashed blob; exclude it from response
485                                 continue
486                         }
487                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
488                 }
489                 if resp.NextMarker == "" {
490                         return nil
491                 }
492                 params.Marker = resp.NextMarker
493         }
494 }
495
496 // Trash a Keep block.
497 func (v *AzureBlobVolume) Trash(loc string) error {
498         if v.ReadOnly {
499                 return MethodDisabledError
500         }
501
502         // Ideally we would use If-Unmodified-Since, but that
503         // particular condition seems to be ignored by Azure. Instead,
504         // we get the Etag before checking Mtime, and use If-Match to
505         // ensure we don't delete data if Put() or Touch() happens
506         // between our calls to Mtime() and DeleteBlob().
507         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
508         if err != nil {
509                 return err
510         }
511         if t, err := v.Mtime(loc); err != nil {
512                 return err
513         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
514                 return nil
515         }
516
517         // If TrashLifetime == 0, just delete it
518         if theConfig.TrashLifetime == 0 {
519                 return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
520                         "If-Match": props.Etag,
521                 })
522         }
523
524         // Otherwise, mark as trash
525         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
526                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
527         }, map[string]string{
528                 "If-Match": props.Etag,
529         })
530 }
531
532 // Untrash a Keep block.
533 // Delete the expires_at metadata attribute
534 func (v *AzureBlobVolume) Untrash(loc string) error {
535         // if expires_at does not exist, return NotFoundError
536         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
537         if err != nil {
538                 return v.translateError(err)
539         }
540         if metadata["expires_at"] == "" {
541                 return os.ErrNotExist
542         }
543
544         // reset expires_at metadata attribute
545         metadata["expires_at"] = ""
546         err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
547         return v.translateError(err)
548 }
549
550 // Status returns a VolumeStatus struct with placeholder data.
551 func (v *AzureBlobVolume) Status() *VolumeStatus {
552         return &VolumeStatus{
553                 DeviceNum: 1,
554                 BytesFree: BlockSize * 1000,
555                 BytesUsed: 1,
556         }
557 }
558
559 // String returns a volume label, including the container name.
560 func (v *AzureBlobVolume) String() string {
561         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
562 }
563
564 // Writable returns true, unless the -readonly flag was on when the
565 // volume was added.
566 func (v *AzureBlobVolume) Writable() bool {
567         return !v.ReadOnly
568 }
569
570 // Replication returns the replication level of the container, as
571 // specified by the -azure-storage-replication argument.
572 func (v *AzureBlobVolume) Replication() int {
573         return v.AzureReplication
574 }
575
576 // If possible, translate an Azure SDK error to a recognizable error
577 // like os.ErrNotExist.
578 func (v *AzureBlobVolume) translateError(err error) error {
579         switch {
580         case err == nil:
581                 return err
582         case strings.Contains(err.Error(), "Not Found"):
583                 // "storage: service returned without a response body (404 Not Found)"
584                 return os.ErrNotExist
585         default:
586                 return err
587         }
588 }
589
590 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
591
592 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
593         return keepBlockRegexp.MatchString(s)
594 }
595
596 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
597 // and deletes them from the volume.
598 func (v *AzureBlobVolume) EmptyTrash() {
599         var bytesDeleted, bytesInTrash int64
600         var blocksDeleted, blocksInTrash int
601         params := storage.ListBlobsParameters{Include: "metadata"}
602
603         for {
604                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
605                 if err != nil {
606                         log.Printf("EmptyTrash: ListBlobs: %v", err)
607                         break
608                 }
609                 for _, b := range resp.Blobs {
610                         // Check if the block is expired
611                         if b.Metadata["expires_at"] == "" {
612                                 continue
613                         }
614
615                         blocksInTrash++
616                         bytesInTrash += b.Properties.ContentLength
617
618                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
619                         if err != nil {
620                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
621                                 continue
622                         }
623
624                         if expiresAt > time.Now().Unix() {
625                                 continue
626                         }
627
628                         err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
629                                 "If-Match": b.Properties.Etag,
630                         })
631                         if err != nil {
632                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
633                                 continue
634                         }
635                         blocksDeleted++
636                         bytesDeleted += b.Properties.ContentLength
637                 }
638                 if resp.NextMarker == "" {
639                         break
640                 }
641                 params.Marker = resp.NextMarker
642         }
643
644         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
645 }
646
647 // InternalStats returns bucket I/O and API call counters.
648 func (v *AzureBlobVolume) InternalStats() interface{} {
649         return &v.bsClient.stats
650 }
651
652 type azureBlobStats struct {
653         statsTicker
654         Ops              uint64
655         GetOps           uint64
656         GetRangeOps      uint64
657         GetMetadataOps   uint64
658         GetPropertiesOps uint64
659         CreateOps        uint64
660         SetMetadataOps   uint64
661         DelOps           uint64
662         ListOps          uint64
663 }
664
665 func (s *azureBlobStats) TickErr(err error) {
666         if err == nil {
667                 return
668         }
669         errType := fmt.Sprintf("%T", err)
670         if err, ok := err.(storage.AzureStorageServiceError); ok {
671                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
672         }
673         log.Printf("errType %T, err %s", err, err)
674         s.statsTicker.TickErr(err, errType)
675 }
676
677 // azureBlobClient wraps storage.BlobStorageClient in order to count
678 // I/O and API usage stats.
679 type azureBlobClient struct {
680         client *storage.BlobStorageClient
681         stats  azureBlobStats
682 }
683
684 func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
685         c.stats.Tick(&c.stats.Ops)
686         ok, err := c.client.ContainerExists(cname)
687         c.stats.TickErr(err)
688         return ok, err
689 }
690
691 func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
692         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
693         m, err := c.client.GetBlobMetadata(cname, bname)
694         c.stats.TickErr(err)
695         return m, err
696 }
697
698 func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
699         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
700         p, err := c.client.GetBlobProperties(cname, bname)
701         c.stats.TickErr(err)
702         return p, err
703 }
704
705 func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
706         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
707         rdr, err := c.client.GetBlob(cname, bname)
708         c.stats.TickErr(err)
709         return NewCountingReader(rdr, c.stats.TickInBytes), err
710 }
711
712 func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
713         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
714         rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
715         c.stats.TickErr(err)
716         return NewCountingReader(rdr, c.stats.TickInBytes), err
717 }
718
719 func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
720         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
721         rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
722         err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
723         c.stats.TickErr(err)
724         return err
725 }
726
727 func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
728         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
729         err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
730         c.stats.TickErr(err)
731         return err
732 }
733
734 func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
735         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
736         resp, err := c.client.ListBlobs(cname, params)
737         c.stats.TickErr(err)
738         return resp, err
739 }
740
741 func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
742         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
743         err := c.client.DeleteBlob(cname, bname, hdrs)
744         c.stats.TickErr(err)
745         return err
746 }