13063: Don't use azure default delay-and-retry loop.
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/curoverse/azure-sdk-for-go/storage"
25 )
26
27 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
28
29 var (
30         azureMaxGetBytes           int
31         azureStorageAccountName    string
32         azureStorageAccountKeyFile string
33         azureStorageReplication    int
34         azureWriteRaceInterval     = 15 * time.Second
35         azureWriteRacePollTime     = time.Second
36 )
37
38 func readKeyFromFile(file string) (string, error) {
39         buf, err := ioutil.ReadFile(file)
40         if err != nil {
41                 return "", errors.New("reading key from " + file + ": " + err.Error())
42         }
43         accountKey := strings.TrimSpace(string(buf))
44         if accountKey == "" {
45                 return "", errors.New("empty account key in " + file)
46         }
47         return accountKey, nil
48 }
49
50 type azureVolumeAdder struct {
51         *Config
52 }
53
54 // String implements flag.Value
55 func (s *azureVolumeAdder) String() string {
56         return "-"
57 }
58
59 func (s *azureVolumeAdder) Set(containerName string) error {
60         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
61                 ContainerName:         containerName,
62                 StorageAccountName:    azureStorageAccountName,
63                 StorageAccountKeyFile: azureStorageAccountKeyFile,
64                 AzureReplication:      azureStorageReplication,
65                 ReadOnly:              deprecated.flagReadonly,
66         })
67         return nil
68 }
69
70 func init() {
71         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
72
73         flag.Var(&azureVolumeAdder{theConfig},
74                 "azure-storage-container-volume",
75                 "Use the given container as a storage volume. Can be given multiple times.")
76         flag.StringVar(
77                 &azureStorageAccountName,
78                 "azure-storage-account-name",
79                 "",
80                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
81         flag.StringVar(
82                 &azureStorageAccountKeyFile,
83                 "azure-storage-account-key-file",
84                 "",
85                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
86         flag.IntVar(
87                 &azureStorageReplication,
88                 "azure-storage-replication",
89                 3,
90                 "Replication level to report to clients when data is stored in an Azure container.")
91         flag.IntVar(
92                 &azureMaxGetBytes,
93                 "azure-max-get-bytes",
94                 BlockSize,
95                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
96 }
97
98 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
99 // container.
100 type AzureBlobVolume struct {
101         StorageAccountName    string
102         StorageAccountKeyFile string
103         StorageBaseURL        string // "" means default, "core.windows.net"
104         ContainerName         string
105         AzureReplication      int
106         ReadOnly              bool
107         RequestTimeout        arvados.Duration
108
109         azClient  storage.Client
110         container *azureContainer
111 }
112
113 // singleSender is a single-attempt storage.Sender.
114 type singleSender struct{}
115
116 // Send performs req exactly once.
117 func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
118         return c.HTTPClient.Do(req)
119 }
120
121 // Examples implements VolumeWithExamples.
122 func (*AzureBlobVolume) Examples() []Volume {
123         return []Volume{
124                 &AzureBlobVolume{
125                         StorageAccountName:    "example-account-name",
126                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
127                         ContainerName:         "example-container-name",
128                         AzureReplication:      3,
129                         RequestTimeout:        azureDefaultRequestTimeout,
130                 },
131                 &AzureBlobVolume{
132                         StorageAccountName:    "cn-account-name",
133                         StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
134                         StorageBaseURL:        "core.chinacloudapi.cn",
135                         ContainerName:         "cn-container-name",
136                         AzureReplication:      3,
137                         RequestTimeout:        azureDefaultRequestTimeout,
138                 },
139         }
140 }
141
142 // Type implements Volume.
143 func (v *AzureBlobVolume) Type() string {
144         return "Azure"
145 }
146
147 // Start implements Volume.
148 func (v *AzureBlobVolume) Start() error {
149         if v.ContainerName == "" {
150                 return errors.New("no container name given")
151         }
152         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
153                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
154         }
155         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
156         if err != nil {
157                 return err
158         }
159         if v.StorageBaseURL == "" {
160                 v.StorageBaseURL = storage.DefaultBaseURL
161         }
162         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
163         if err != nil {
164                 return fmt.Errorf("creating Azure storage client: %s", err)
165         }
166         v.azClient.Sender = &singleSender{}
167
168         if v.RequestTimeout == 0 {
169                 v.RequestTimeout = azureDefaultRequestTimeout
170         }
171         v.azClient.HTTPClient = &http.Client{
172                 Timeout: time.Duration(v.RequestTimeout),
173         }
174         bs := v.azClient.GetBlobService()
175         v.container = &azureContainer{
176                 ctr: bs.GetContainerReference(v.ContainerName),
177         }
178
179         if ok, err := v.container.Exists(); err != nil {
180                 return err
181         } else if !ok {
182                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
183         }
184         return nil
185 }
186
187 // DeviceID returns a globally unique ID for the storage container.
188 func (v *AzureBlobVolume) DeviceID() string {
189         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
190 }
191
192 // Return true if expires_at metadata attribute is found on the block
193 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
194         metadata, err := v.container.GetBlobMetadata(loc)
195         if err != nil {
196                 return false, metadata, v.translateError(err)
197         }
198         if metadata["expires_at"] != "" {
199                 return true, metadata, nil
200         }
201         return false, metadata, nil
202 }
203
204 // Get reads a Keep block that has been stored as a block blob in the
205 // container.
206 //
207 // If the block is younger than azureWriteRaceInterval and is
208 // unexpectedly empty, assume a PutBlob operation is in progress, and
209 // wait for it to finish writing.
210 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
211         trashed, _, err := v.checkTrashed(loc)
212         if err != nil {
213                 return 0, err
214         }
215         if trashed {
216                 return 0, os.ErrNotExist
217         }
218         var deadline time.Time
219         haveDeadline := false
220         size, err := v.get(ctx, loc, buf)
221         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
222                 // Seeing a brand new empty block probably means we're
223                 // in a race with CreateBlob, which under the hood
224                 // (apparently) does "CreateEmpty" and "CommitData"
225                 // with no additional transaction locking.
226                 if !haveDeadline {
227                         t, err := v.Mtime(loc)
228                         if err != nil {
229                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
230                                 break
231                         }
232                         deadline = t.Add(azureWriteRaceInterval)
233                         if time.Now().After(deadline) {
234                                 break
235                         }
236                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
237                         haveDeadline = true
238                 } else if time.Now().After(deadline) {
239                         break
240                 }
241                 select {
242                 case <-ctx.Done():
243                         return 0, ctx.Err()
244                 case <-time.After(azureWriteRacePollTime):
245                 }
246                 size, err = v.get(ctx, loc, buf)
247         }
248         if haveDeadline {
249                 log.Printf("Race ended with size==%d", size)
250         }
251         return size, err
252 }
253
254 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
255         ctx, cancel := context.WithCancel(ctx)
256         defer cancel()
257         expectSize := len(buf)
258         if azureMaxGetBytes < BlockSize {
259                 // Unfortunately the handler doesn't tell us how long the blob
260                 // is expected to be, so we have to ask Azure.
261                 props, err := v.container.GetBlobProperties(loc)
262                 if err != nil {
263                         return 0, v.translateError(err)
264                 }
265                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
266                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
267                 }
268                 expectSize = int(props.ContentLength)
269         }
270
271         if expectSize == 0 {
272                 return 0, nil
273         }
274
275         // We'll update this actualSize if/when we get the last piece.
276         actualSize := -1
277         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
278         errors := make(chan error, pieces)
279         var wg sync.WaitGroup
280         wg.Add(pieces)
281         for p := 0; p < pieces; p++ {
282                 // Each goroutine retrieves one piece. If we hit an
283                 // error, it is sent to the errors chan so get() can
284                 // return it -- but only if the error happens before
285                 // ctx is done. This way, if ctx is done before we hit
286                 // any other error (e.g., requesting client has hung
287                 // up), we return the original ctx.Err() instead of
288                 // the secondary errors from the transfers that got
289                 // interrupted as a result.
290                 go func(p int) {
291                         defer wg.Done()
292                         startPos := p * azureMaxGetBytes
293                         endPos := startPos + azureMaxGetBytes
294                         if endPos > expectSize {
295                                 endPos = expectSize
296                         }
297                         var rdr io.ReadCloser
298                         var err error
299                         gotRdr := make(chan struct{})
300                         go func() {
301                                 defer close(gotRdr)
302                                 if startPos == 0 && endPos == expectSize {
303                                         rdr, err = v.container.GetBlob(loc)
304                                 } else {
305                                         rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
306                                 }
307                         }()
308                         select {
309                         case <-ctx.Done():
310                                 go func() {
311                                         <-gotRdr
312                                         if err == nil {
313                                                 rdr.Close()
314                                         }
315                                 }()
316                                 return
317                         case <-gotRdr:
318                         }
319                         if err != nil {
320                                 errors <- err
321                                 cancel()
322                                 return
323                         }
324                         go func() {
325                                 // Close the reader when the client
326                                 // hangs up or another piece fails
327                                 // (possibly interrupting ReadFull())
328                                 // or when all pieces succeed and
329                                 // get() returns.
330                                 <-ctx.Done()
331                                 rdr.Close()
332                         }()
333                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
334                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
335                                 // If we don't know the actual size,
336                                 // and just tried reading 64 MiB, it's
337                                 // normal to encounter EOF.
338                         } else if err != nil {
339                                 if ctx.Err() == nil {
340                                         errors <- err
341                                 }
342                                 cancel()
343                                 return
344                         }
345                         if p == pieces-1 {
346                                 actualSize = startPos + n
347                         }
348                 }(p)
349         }
350         wg.Wait()
351         close(errors)
352         if len(errors) > 0 {
353                 return 0, v.translateError(<-errors)
354         }
355         if ctx.Err() != nil {
356                 return 0, ctx.Err()
357         }
358         return actualSize, nil
359 }
360
361 // Compare the given data with existing stored data.
362 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
363         trashed, _, err := v.checkTrashed(loc)
364         if err != nil {
365                 return err
366         }
367         if trashed {
368                 return os.ErrNotExist
369         }
370         var rdr io.ReadCloser
371         gotRdr := make(chan struct{})
372         go func() {
373                 defer close(gotRdr)
374                 rdr, err = v.container.GetBlob(loc)
375         }()
376         select {
377         case <-ctx.Done():
378                 go func() {
379                         <-gotRdr
380                         if err == nil {
381                                 rdr.Close()
382                         }
383                 }()
384                 return ctx.Err()
385         case <-gotRdr:
386         }
387         if err != nil {
388                 return v.translateError(err)
389         }
390         defer rdr.Close()
391         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
392 }
393
394 // Put stores a Keep block as a block blob in the container.
395 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
396         if v.ReadOnly {
397                 return MethodDisabledError
398         }
399         // Send the block data through a pipe, so that (if we need to)
400         // we can close the pipe early and abandon our
401         // CreateBlockBlobFromReader() goroutine, without worrying
402         // about CreateBlockBlobFromReader() accessing our block
403         // buffer after we release it.
404         bufr, bufw := io.Pipe()
405         go func() {
406                 io.Copy(bufw, bytes.NewReader(block))
407                 bufw.Close()
408         }()
409         errChan := make(chan error)
410         go func() {
411                 var body io.Reader = bufr
412                 if len(block) == 0 {
413                         // We must send a "Content-Length: 0" header,
414                         // but the http client interprets
415                         // ContentLength==0 as "unknown" unless it can
416                         // confirm by introspection that Body will
417                         // read 0 bytes.
418                         body = http.NoBody
419                         bufr.Close()
420                 }
421                 errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
422         }()
423         select {
424         case <-ctx.Done():
425                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
426                 // Our pipe might be stuck in Write(), waiting for
427                 // io.Copy() to read. If so, un-stick it. This means
428                 // CreateBlockBlobFromReader will get corrupt data,
429                 // but that's OK: the size won't match, so the write
430                 // will fail.
431                 go io.Copy(ioutil.Discard, bufr)
432                 // CloseWithError() will return once pending I/O is done.
433                 bufw.CloseWithError(ctx.Err())
434                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
435                 return ctx.Err()
436         case err := <-errChan:
437                 return err
438         }
439 }
440
441 // Touch updates the last-modified property of a block blob.
442 func (v *AzureBlobVolume) Touch(loc string) error {
443         if v.ReadOnly {
444                 return MethodDisabledError
445         }
446         trashed, metadata, err := v.checkTrashed(loc)
447         if err != nil {
448                 return err
449         }
450         if trashed {
451                 return os.ErrNotExist
452         }
453
454         metadata["touch"] = fmt.Sprintf("%d", time.Now())
455         return v.container.SetBlobMetadata(loc, metadata, nil)
456 }
457
458 // Mtime returns the last-modified property of a block blob.
459 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
460         trashed, _, err := v.checkTrashed(loc)
461         if err != nil {
462                 return time.Time{}, err
463         }
464         if trashed {
465                 return time.Time{}, os.ErrNotExist
466         }
467
468         props, err := v.container.GetBlobProperties(loc)
469         if err != nil {
470                 return time.Time{}, err
471         }
472         return time.Time(props.LastModified), nil
473 }
474
475 // IndexTo writes a list of Keep blocks that are stored in the
476 // container.
477 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
478         params := storage.ListBlobsParameters{
479                 Prefix:  prefix,
480                 Include: &storage.IncludeBlobDataset{Metadata: true},
481         }
482         for {
483                 resp, err := v.container.ListBlobs(params)
484                 if err != nil {
485                         return err
486                 }
487                 for _, b := range resp.Blobs {
488                         if !v.isKeepBlock(b.Name) {
489                                 continue
490                         }
491                         modtime := time.Time(b.Properties.LastModified)
492                         if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
493                                 // A new zero-length blob is probably
494                                 // just a new non-empty blob that
495                                 // hasn't committed its data yet (see
496                                 // Get()), and in any case has no
497                                 // value.
498                                 continue
499                         }
500                         if b.Metadata["expires_at"] != "" {
501                                 // Trashed blob; exclude it from response
502                                 continue
503                         }
504                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
505                 }
506                 if resp.NextMarker == "" {
507                         return nil
508                 }
509                 params.Marker = resp.NextMarker
510         }
511 }
512
513 // Trash a Keep block.
514 func (v *AzureBlobVolume) Trash(loc string) error {
515         if v.ReadOnly {
516                 return MethodDisabledError
517         }
518
519         // Ideally we would use If-Unmodified-Since, but that
520         // particular condition seems to be ignored by Azure. Instead,
521         // we get the Etag before checking Mtime, and use If-Match to
522         // ensure we don't delete data if Put() or Touch() happens
523         // between our calls to Mtime() and DeleteBlob().
524         props, err := v.container.GetBlobProperties(loc)
525         if err != nil {
526                 return err
527         }
528         if t, err := v.Mtime(loc); err != nil {
529                 return err
530         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
531                 return nil
532         }
533
534         // If TrashLifetime == 0, just delete it
535         if theConfig.TrashLifetime == 0 {
536                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
537                         IfMatch: props.Etag,
538                 })
539         }
540
541         // Otherwise, mark as trash
542         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
543                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
544         }, &storage.SetBlobMetadataOptions{
545                 IfMatch: props.Etag,
546         })
547 }
548
549 // Untrash a Keep block.
550 // Delete the expires_at metadata attribute
551 func (v *AzureBlobVolume) Untrash(loc string) error {
552         // if expires_at does not exist, return NotFoundError
553         metadata, err := v.container.GetBlobMetadata(loc)
554         if err != nil {
555                 return v.translateError(err)
556         }
557         if metadata["expires_at"] == "" {
558                 return os.ErrNotExist
559         }
560
561         // reset expires_at metadata attribute
562         metadata["expires_at"] = ""
563         err = v.container.SetBlobMetadata(loc, metadata, nil)
564         return v.translateError(err)
565 }
566
567 // Status returns a VolumeStatus struct with placeholder data.
568 func (v *AzureBlobVolume) Status() *VolumeStatus {
569         return &VolumeStatus{
570                 DeviceNum: 1,
571                 BytesFree: BlockSize * 1000,
572                 BytesUsed: 1,
573         }
574 }
575
576 // String returns a volume label, including the container name.
577 func (v *AzureBlobVolume) String() string {
578         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
579 }
580
581 // Writable returns true, unless the -readonly flag was on when the
582 // volume was added.
583 func (v *AzureBlobVolume) Writable() bool {
584         return !v.ReadOnly
585 }
586
587 // Replication returns the replication level of the container, as
588 // specified by the -azure-storage-replication argument.
589 func (v *AzureBlobVolume) Replication() int {
590         return v.AzureReplication
591 }
592
593 // If possible, translate an Azure SDK error to a recognizable error
594 // like os.ErrNotExist.
595 func (v *AzureBlobVolume) translateError(err error) error {
596         switch {
597         case err == nil:
598                 return err
599         case strings.Contains(err.Error(), "Not Found"):
600                 // "storage: service returned without a response body (404 Not Found)"
601                 return os.ErrNotExist
602         default:
603                 return err
604         }
605 }
606
607 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
608
609 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
610         return keepBlockRegexp.MatchString(s)
611 }
612
613 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
614 // and deletes them from the volume.
615 func (v *AzureBlobVolume) EmptyTrash() {
616         var bytesDeleted, bytesInTrash int64
617         var blocksDeleted, blocksInTrash int
618         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
619
620         for {
621                 resp, err := v.container.ListBlobs(params)
622                 if err != nil {
623                         log.Printf("EmptyTrash: ListBlobs: %v", err)
624                         break
625                 }
626                 for _, b := range resp.Blobs {
627                         // Check if the block is expired
628                         if b.Metadata["expires_at"] == "" {
629                                 continue
630                         }
631
632                         blocksInTrash++
633                         bytesInTrash += b.Properties.ContentLength
634
635                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
636                         if err != nil {
637                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
638                                 continue
639                         }
640
641                         if expiresAt > time.Now().Unix() {
642                                 continue
643                         }
644
645                         err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
646                                 IfMatch: b.Properties.Etag,
647                         })
648                         if err != nil {
649                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
650                                 continue
651                         }
652                         blocksDeleted++
653                         bytesDeleted += b.Properties.ContentLength
654                 }
655                 if resp.NextMarker == "" {
656                         break
657                 }
658                 params.Marker = resp.NextMarker
659         }
660
661         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
662 }
663
664 // InternalStats returns bucket I/O and API call counters.
665 func (v *AzureBlobVolume) InternalStats() interface{} {
666         return &v.container.stats
667 }
668
669 type azureBlobStats struct {
670         statsTicker
671         Ops              uint64
672         GetOps           uint64
673         GetRangeOps      uint64
674         GetMetadataOps   uint64
675         GetPropertiesOps uint64
676         CreateOps        uint64
677         SetMetadataOps   uint64
678         DelOps           uint64
679         ListOps          uint64
680 }
681
682 func (s *azureBlobStats) TickErr(err error) {
683         if err == nil {
684                 return
685         }
686         errType := fmt.Sprintf("%T", err)
687         if err, ok := err.(storage.AzureStorageServiceError); ok {
688                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
689         }
690         log.Printf("errType %T, err %s", err, err)
691         s.statsTicker.TickErr(err, errType)
692 }
693
694 // azureContainer wraps storage.Container in order to count I/O and
695 // API usage stats.
696 type azureContainer struct {
697         ctr   *storage.Container
698         stats azureBlobStats
699 }
700
701 func (c *azureContainer) Exists() (bool, error) {
702         c.stats.Tick(&c.stats.Ops)
703         ok, err := c.ctr.Exists()
704         c.stats.TickErr(err)
705         return ok, err
706 }
707
708 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
709         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
710         b := c.ctr.GetBlobReference(bname)
711         err := b.GetMetadata(nil)
712         c.stats.TickErr(err)
713         return b.Metadata, err
714 }
715
716 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
717         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
718         b := c.ctr.GetBlobReference(bname)
719         err := b.GetProperties(nil)
720         c.stats.TickErr(err)
721         return &b.Properties, err
722 }
723
724 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
725         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
726         b := c.ctr.GetBlobReference(bname)
727         rdr, err := b.Get(nil)
728         c.stats.TickErr(err)
729         return NewCountingReader(rdr, c.stats.TickInBytes), err
730 }
731
732 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
733         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
734         b := c.ctr.GetBlobReference(bname)
735         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
736                 Range: &storage.BlobRange{
737                         Start: uint64(start),
738                         End:   uint64(end),
739                 },
740                 GetBlobOptions: opts,
741         })
742         c.stats.TickErr(err)
743         return NewCountingReader(rdr, c.stats.TickInBytes), err
744 }
745
746 // If we give it an io.Reader that doesn't also have a Len() int
747 // method, the Azure SDK determines data size by copying the data into
748 // a new buffer, which is not a good use of memory.
749 type readerWithAzureLen struct {
750         io.Reader
751         len int
752 }
753
754 // Len satisfies the private lener interface in azure-sdk-for-go.
755 func (r *readerWithAzureLen) Len() int {
756         return r.len
757 }
758
759 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
760         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
761         if size != 0 {
762                 rdr = &readerWithAzureLen{
763                         Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
764                         len:    size,
765                 }
766         }
767         b := c.ctr.GetBlobReference(bname)
768         err := b.CreateBlockBlobFromReader(rdr, opts)
769         c.stats.TickErr(err)
770         return err
771 }
772
773 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
774         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
775         b := c.ctr.GetBlobReference(bname)
776         b.Metadata = m
777         err := b.SetMetadata(opts)
778         c.stats.TickErr(err)
779         return err
780 }
781
782 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
783         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
784         resp, err := c.ctr.ListBlobs(params)
785         c.stats.TickErr(err)
786         return resp, err
787 }
788
789 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
790         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
791         b := c.ctr.GetBlobReference(bname)
792         err := b.Delete(opts)
793         c.stats.TickErr(err)
794         return err
795 }