13063: Update azure storage client library.
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/curoverse/azure-sdk-for-go/storage"
25 )
26
27 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
28
29 var (
30         azureMaxGetBytes           int
31         azureStorageAccountName    string
32         azureStorageAccountKeyFile string
33         azureStorageReplication    int
34         azureWriteRaceInterval     = 15 * time.Second
35         azureWriteRacePollTime     = time.Second
36 )
37
38 func readKeyFromFile(file string) (string, error) {
39         buf, err := ioutil.ReadFile(file)
40         if err != nil {
41                 return "", errors.New("reading key from " + file + ": " + err.Error())
42         }
43         accountKey := strings.TrimSpace(string(buf))
44         if accountKey == "" {
45                 return "", errors.New("empty account key in " + file)
46         }
47         return accountKey, nil
48 }
49
50 type azureVolumeAdder struct {
51         *Config
52 }
53
54 // String implements flag.Value
55 func (s *azureVolumeAdder) String() string {
56         return "-"
57 }
58
59 func (s *azureVolumeAdder) Set(containerName string) error {
60         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
61                 ContainerName:         containerName,
62                 StorageAccountName:    azureStorageAccountName,
63                 StorageAccountKeyFile: azureStorageAccountKeyFile,
64                 AzureReplication:      azureStorageReplication,
65                 ReadOnly:              deprecated.flagReadonly,
66         })
67         return nil
68 }
69
70 func init() {
71         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
72
73         flag.Var(&azureVolumeAdder{theConfig},
74                 "azure-storage-container-volume",
75                 "Use the given container as a storage volume. Can be given multiple times.")
76         flag.StringVar(
77                 &azureStorageAccountName,
78                 "azure-storage-account-name",
79                 "",
80                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
81         flag.StringVar(
82                 &azureStorageAccountKeyFile,
83                 "azure-storage-account-key-file",
84                 "",
85                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
86         flag.IntVar(
87                 &azureStorageReplication,
88                 "azure-storage-replication",
89                 3,
90                 "Replication level to report to clients when data is stored in an Azure container.")
91         flag.IntVar(
92                 &azureMaxGetBytes,
93                 "azure-max-get-bytes",
94                 BlockSize,
95                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
96 }
97
98 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
99 // container.
100 type AzureBlobVolume struct {
101         StorageAccountName    string
102         StorageAccountKeyFile string
103         StorageBaseURL        string // "" means default, "core.windows.net"
104         ContainerName         string
105         AzureReplication      int
106         ReadOnly              bool
107         RequestTimeout        arvados.Duration
108
109         azClient  storage.Client
110         container *azureContainer
111 }
112
113 // Examples implements VolumeWithExamples.
114 func (*AzureBlobVolume) Examples() []Volume {
115         return []Volume{
116                 &AzureBlobVolume{
117                         StorageAccountName:    "example-account-name",
118                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
119                         ContainerName:         "example-container-name",
120                         AzureReplication:      3,
121                         RequestTimeout:        azureDefaultRequestTimeout,
122                 },
123                 &AzureBlobVolume{
124                         StorageAccountName:    "cn-account-name",
125                         StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
126                         StorageBaseURL:        "core.chinacloudapi.cn",
127                         ContainerName:         "cn-container-name",
128                         AzureReplication:      3,
129                         RequestTimeout:        azureDefaultRequestTimeout,
130                 },
131         }
132 }
133
134 // Type implements Volume.
135 func (v *AzureBlobVolume) Type() string {
136         return "Azure"
137 }
138
139 // Start implements Volume.
140 func (v *AzureBlobVolume) Start() error {
141         if v.ContainerName == "" {
142                 return errors.New("no container name given")
143         }
144         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
145                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
146         }
147         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
148         if err != nil {
149                 return err
150         }
151         if v.StorageBaseURL == "" {
152                 v.StorageBaseURL = storage.DefaultBaseURL
153         }
154         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
155         if err != nil {
156                 return fmt.Errorf("creating Azure storage client: %s", err)
157         }
158
159         if v.RequestTimeout == 0 {
160                 v.RequestTimeout = azureDefaultRequestTimeout
161         }
162         v.azClient.HTTPClient = &http.Client{
163                 Timeout: time.Duration(v.RequestTimeout),
164         }
165         bs := v.azClient.GetBlobService()
166         v.container = &azureContainer{
167                 ctr: bs.GetContainerReference(v.ContainerName),
168         }
169
170         if ok, err := v.container.Exists(); err != nil {
171                 return err
172         } else if !ok {
173                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
174         }
175         return nil
176 }
177
178 // DeviceID returns a globally unique ID for the storage container.
179 func (v *AzureBlobVolume) DeviceID() string {
180         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
181 }
182
183 // Return true if expires_at metadata attribute is found on the block
184 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
185         metadata, err := v.container.GetBlobMetadata(loc)
186         if err != nil {
187                 return false, metadata, v.translateError(err)
188         }
189         if metadata["expires_at"] != "" {
190                 return true, metadata, nil
191         }
192         return false, metadata, nil
193 }
194
195 // Get reads a Keep block that has been stored as a block blob in the
196 // container.
197 //
198 // If the block is younger than azureWriteRaceInterval and is
199 // unexpectedly empty, assume a PutBlob operation is in progress, and
200 // wait for it to finish writing.
201 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
202         trashed, _, err := v.checkTrashed(loc)
203         if err != nil {
204                 return 0, err
205         }
206         if trashed {
207                 return 0, os.ErrNotExist
208         }
209         var deadline time.Time
210         haveDeadline := false
211         size, err := v.get(ctx, loc, buf)
212         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
213                 // Seeing a brand new empty block probably means we're
214                 // in a race with CreateBlob, which under the hood
215                 // (apparently) does "CreateEmpty" and "CommitData"
216                 // with no additional transaction locking.
217                 if !haveDeadline {
218                         t, err := v.Mtime(loc)
219                         if err != nil {
220                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
221                                 break
222                         }
223                         deadline = t.Add(azureWriteRaceInterval)
224                         if time.Now().After(deadline) {
225                                 break
226                         }
227                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
228                         haveDeadline = true
229                 } else if time.Now().After(deadline) {
230                         break
231                 }
232                 select {
233                 case <-ctx.Done():
234                         return 0, ctx.Err()
235                 case <-time.After(azureWriteRacePollTime):
236                 }
237                 size, err = v.get(ctx, loc, buf)
238         }
239         if haveDeadline {
240                 log.Printf("Race ended with size==%d", size)
241         }
242         return size, err
243 }
244
245 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
246         ctx, cancel := context.WithCancel(ctx)
247         defer cancel()
248         expectSize := len(buf)
249         if azureMaxGetBytes < BlockSize {
250                 // Unfortunately the handler doesn't tell us how long the blob
251                 // is expected to be, so we have to ask Azure.
252                 props, err := v.container.GetBlobProperties(loc)
253                 if err != nil {
254                         return 0, v.translateError(err)
255                 }
256                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
257                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
258                 }
259                 expectSize = int(props.ContentLength)
260         }
261
262         if expectSize == 0 {
263                 return 0, nil
264         }
265
266         // We'll update this actualSize if/when we get the last piece.
267         actualSize := -1
268         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
269         errors := make(chan error, pieces)
270         var wg sync.WaitGroup
271         wg.Add(pieces)
272         for p := 0; p < pieces; p++ {
273                 // Each goroutine retrieves one piece. If we hit an
274                 // error, it is sent to the errors chan so get() can
275                 // return it -- but only if the error happens before
276                 // ctx is done. This way, if ctx is done before we hit
277                 // any other error (e.g., requesting client has hung
278                 // up), we return the original ctx.Err() instead of
279                 // the secondary errors from the transfers that got
280                 // interrupted as a result.
281                 go func(p int) {
282                         defer wg.Done()
283                         startPos := p * azureMaxGetBytes
284                         endPos := startPos + azureMaxGetBytes
285                         if endPos > expectSize {
286                                 endPos = expectSize
287                         }
288                         var rdr io.ReadCloser
289                         var err error
290                         gotRdr := make(chan struct{})
291                         go func() {
292                                 defer close(gotRdr)
293                                 if startPos == 0 && endPos == expectSize {
294                                         rdr, err = v.container.GetBlob(loc)
295                                 } else {
296                                         rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
297                                 }
298                         }()
299                         select {
300                         case <-ctx.Done():
301                                 go func() {
302                                         <-gotRdr
303                                         if err == nil {
304                                                 rdr.Close()
305                                         }
306                                 }()
307                                 return
308                         case <-gotRdr:
309                         }
310                         if err != nil {
311                                 errors <- err
312                                 cancel()
313                                 return
314                         }
315                         go func() {
316                                 // Close the reader when the client
317                                 // hangs up or another piece fails
318                                 // (possibly interrupting ReadFull())
319                                 // or when all pieces succeed and
320                                 // get() returns.
321                                 <-ctx.Done()
322                                 rdr.Close()
323                         }()
324                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
325                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
326                                 // If we don't know the actual size,
327                                 // and just tried reading 64 MiB, it's
328                                 // normal to encounter EOF.
329                         } else if err != nil {
330                                 if ctx.Err() == nil {
331                                         errors <- err
332                                 }
333                                 cancel()
334                                 return
335                         }
336                         if p == pieces-1 {
337                                 actualSize = startPos + n
338                         }
339                 }(p)
340         }
341         wg.Wait()
342         close(errors)
343         if len(errors) > 0 {
344                 return 0, v.translateError(<-errors)
345         }
346         if ctx.Err() != nil {
347                 return 0, ctx.Err()
348         }
349         return actualSize, nil
350 }
351
352 // Compare the given data with existing stored data.
353 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
354         trashed, _, err := v.checkTrashed(loc)
355         if err != nil {
356                 return err
357         }
358         if trashed {
359                 return os.ErrNotExist
360         }
361         var rdr io.ReadCloser
362         gotRdr := make(chan struct{})
363         go func() {
364                 defer close(gotRdr)
365                 rdr, err = v.container.GetBlob(loc)
366         }()
367         select {
368         case <-ctx.Done():
369                 go func() {
370                         <-gotRdr
371                         if err == nil {
372                                 rdr.Close()
373                         }
374                 }()
375                 return ctx.Err()
376         case <-gotRdr:
377         }
378         if err != nil {
379                 return v.translateError(err)
380         }
381         defer rdr.Close()
382         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
383 }
384
385 // Put stores a Keep block as a block blob in the container.
386 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
387         if v.ReadOnly {
388                 return MethodDisabledError
389         }
390         // Send the block data through a pipe, so that (if we need to)
391         // we can close the pipe early and abandon our
392         // CreateBlockBlobFromReader() goroutine, without worrying
393         // about CreateBlockBlobFromReader() accessing our block
394         // buffer after we release it.
395         bufr, bufw := io.Pipe()
396         go func() {
397                 io.Copy(bufw, bytes.NewReader(block))
398                 bufw.Close()
399         }()
400         errChan := make(chan error)
401         go func() {
402                 var body io.Reader = bufr
403                 if len(block) == 0 {
404                         // We must send a "Content-Length: 0" header,
405                         // but the http client interprets
406                         // ContentLength==0 as "unknown" unless it can
407                         // confirm by introspection that Body will
408                         // read 0 bytes.
409                         body = http.NoBody
410                         bufr.Close()
411                 }
412                 errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
413         }()
414         select {
415         case <-ctx.Done():
416                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
417                 // Our pipe might be stuck in Write(), waiting for
418                 // io.Copy() to read. If so, un-stick it. This means
419                 // CreateBlockBlobFromReader will get corrupt data,
420                 // but that's OK: the size won't match, so the write
421                 // will fail.
422                 go io.Copy(ioutil.Discard, bufr)
423                 // CloseWithError() will return once pending I/O is done.
424                 bufw.CloseWithError(ctx.Err())
425                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
426                 return ctx.Err()
427         case err := <-errChan:
428                 return err
429         }
430 }
431
432 // Touch updates the last-modified property of a block blob.
433 func (v *AzureBlobVolume) Touch(loc string) error {
434         if v.ReadOnly {
435                 return MethodDisabledError
436         }
437         trashed, metadata, err := v.checkTrashed(loc)
438         if err != nil {
439                 return err
440         }
441         if trashed {
442                 return os.ErrNotExist
443         }
444
445         metadata["touch"] = fmt.Sprintf("%d", time.Now())
446         return v.container.SetBlobMetadata(loc, metadata, nil)
447 }
448
449 // Mtime returns the last-modified property of a block blob.
450 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
451         trashed, _, err := v.checkTrashed(loc)
452         if err != nil {
453                 return time.Time{}, err
454         }
455         if trashed {
456                 return time.Time{}, os.ErrNotExist
457         }
458
459         props, err := v.container.GetBlobProperties(loc)
460         if err != nil {
461                 return time.Time{}, err
462         }
463         return time.Time(props.LastModified), nil
464 }
465
466 // IndexTo writes a list of Keep blocks that are stored in the
467 // container.
468 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
469         params := storage.ListBlobsParameters{
470                 Prefix:  prefix,
471                 Include: &storage.IncludeBlobDataset{Metadata: true},
472         }
473         for {
474                 resp, err := v.container.ListBlobs(params)
475                 if err != nil {
476                         return err
477                 }
478                 for _, b := range resp.Blobs {
479                         if !v.isKeepBlock(b.Name) {
480                                 continue
481                         }
482                         modtime := time.Time(b.Properties.LastModified)
483                         if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
484                                 // A new zero-length blob is probably
485                                 // just a new non-empty blob that
486                                 // hasn't committed its data yet (see
487                                 // Get()), and in any case has no
488                                 // value.
489                                 continue
490                         }
491                         if b.Metadata["expires_at"] != "" {
492                                 // Trashed blob; exclude it from response
493                                 continue
494                         }
495                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
496                 }
497                 if resp.NextMarker == "" {
498                         return nil
499                 }
500                 params.Marker = resp.NextMarker
501         }
502 }
503
504 // Trash a Keep block.
505 func (v *AzureBlobVolume) Trash(loc string) error {
506         if v.ReadOnly {
507                 return MethodDisabledError
508         }
509
510         // Ideally we would use If-Unmodified-Since, but that
511         // particular condition seems to be ignored by Azure. Instead,
512         // we get the Etag before checking Mtime, and use If-Match to
513         // ensure we don't delete data if Put() or Touch() happens
514         // between our calls to Mtime() and DeleteBlob().
515         props, err := v.container.GetBlobProperties(loc)
516         if err != nil {
517                 return err
518         }
519         if t, err := v.Mtime(loc); err != nil {
520                 return err
521         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
522                 return nil
523         }
524
525         // If TrashLifetime == 0, just delete it
526         if theConfig.TrashLifetime == 0 {
527                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
528                         IfMatch: props.Etag,
529                 })
530         }
531
532         // Otherwise, mark as trash
533         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
534                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
535         }, &storage.SetBlobMetadataOptions{
536                 IfMatch: props.Etag,
537         })
538 }
539
540 // Untrash a Keep block.
541 // Delete the expires_at metadata attribute
542 func (v *AzureBlobVolume) Untrash(loc string) error {
543         // if expires_at does not exist, return NotFoundError
544         metadata, err := v.container.GetBlobMetadata(loc)
545         if err != nil {
546                 return v.translateError(err)
547         }
548         if metadata["expires_at"] == "" {
549                 return os.ErrNotExist
550         }
551
552         // reset expires_at metadata attribute
553         metadata["expires_at"] = ""
554         err = v.container.SetBlobMetadata(loc, metadata, nil)
555         return v.translateError(err)
556 }
557
558 // Status returns a VolumeStatus struct with placeholder data.
559 func (v *AzureBlobVolume) Status() *VolumeStatus {
560         return &VolumeStatus{
561                 DeviceNum: 1,
562                 BytesFree: BlockSize * 1000,
563                 BytesUsed: 1,
564         }
565 }
566
567 // String returns a volume label, including the container name.
568 func (v *AzureBlobVolume) String() string {
569         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
570 }
571
572 // Writable returns true, unless the -readonly flag was on when the
573 // volume was added.
574 func (v *AzureBlobVolume) Writable() bool {
575         return !v.ReadOnly
576 }
577
578 // Replication returns the replication level of the container, as
579 // specified by the -azure-storage-replication argument.
580 func (v *AzureBlobVolume) Replication() int {
581         return v.AzureReplication
582 }
583
584 // If possible, translate an Azure SDK error to a recognizable error
585 // like os.ErrNotExist.
586 func (v *AzureBlobVolume) translateError(err error) error {
587         switch {
588         case err == nil:
589                 return err
590         case strings.Contains(err.Error(), "Not Found"):
591                 // "storage: service returned without a response body (404 Not Found)"
592                 return os.ErrNotExist
593         default:
594                 return err
595         }
596 }
597
598 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
599
600 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
601         return keepBlockRegexp.MatchString(s)
602 }
603
604 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
605 // and deletes them from the volume.
606 func (v *AzureBlobVolume) EmptyTrash() {
607         var bytesDeleted, bytesInTrash int64
608         var blocksDeleted, blocksInTrash int
609         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
610
611         for {
612                 resp, err := v.container.ListBlobs(params)
613                 if err != nil {
614                         log.Printf("EmptyTrash: ListBlobs: %v", err)
615                         break
616                 }
617                 for _, b := range resp.Blobs {
618                         // Check if the block is expired
619                         if b.Metadata["expires_at"] == "" {
620                                 continue
621                         }
622
623                         blocksInTrash++
624                         bytesInTrash += b.Properties.ContentLength
625
626                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
627                         if err != nil {
628                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
629                                 continue
630                         }
631
632                         if expiresAt > time.Now().Unix() {
633                                 continue
634                         }
635
636                         err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
637                                 IfMatch: b.Properties.Etag,
638                         })
639                         if err != nil {
640                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
641                                 continue
642                         }
643                         blocksDeleted++
644                         bytesDeleted += b.Properties.ContentLength
645                 }
646                 if resp.NextMarker == "" {
647                         break
648                 }
649                 params.Marker = resp.NextMarker
650         }
651
652         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
653 }
654
655 // InternalStats returns bucket I/O and API call counters.
656 func (v *AzureBlobVolume) InternalStats() interface{} {
657         return &v.container.stats
658 }
659
660 type azureBlobStats struct {
661         statsTicker
662         Ops              uint64
663         GetOps           uint64
664         GetRangeOps      uint64
665         GetMetadataOps   uint64
666         GetPropertiesOps uint64
667         CreateOps        uint64
668         SetMetadataOps   uint64
669         DelOps           uint64
670         ListOps          uint64
671 }
672
673 func (s *azureBlobStats) TickErr(err error) {
674         if err == nil {
675                 return
676         }
677         errType := fmt.Sprintf("%T", err)
678         if err, ok := err.(storage.AzureStorageServiceError); ok {
679                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
680         }
681         log.Printf("errType %T, err %s", err, err)
682         s.statsTicker.TickErr(err, errType)
683 }
684
685 // azureContainer wraps storage.Container in order to count I/O and
686 // API usage stats.
687 type azureContainer struct {
688         ctr   *storage.Container
689         stats azureBlobStats
690 }
691
692 func (c *azureContainer) Exists() (bool, error) {
693         c.stats.Tick(&c.stats.Ops)
694         ok, err := c.ctr.Exists()
695         c.stats.TickErr(err)
696         return ok, err
697 }
698
699 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
700         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
701         b := c.ctr.GetBlobReference(bname)
702         err := b.GetMetadata(nil)
703         c.stats.TickErr(err)
704         return b.Metadata, err
705 }
706
707 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
708         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
709         b := c.ctr.GetBlobReference(bname)
710         err := b.GetProperties(nil)
711         c.stats.TickErr(err)
712         return &b.Properties, err
713 }
714
715 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
716         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
717         b := c.ctr.GetBlobReference(bname)
718         rdr, err := b.Get(nil)
719         c.stats.TickErr(err)
720         return NewCountingReader(rdr, c.stats.TickInBytes), err
721 }
722
723 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
724         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
725         b := c.ctr.GetBlobReference(bname)
726         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
727                 Range: &storage.BlobRange{
728                         Start: uint64(start),
729                         End:   uint64(end),
730                 },
731                 GetBlobOptions: opts,
732         })
733         c.stats.TickErr(err)
734         return NewCountingReader(rdr, c.stats.TickInBytes), err
735 }
736
737 // If we give it an io.Reader that doesn't also have a Len() int
738 // method, the Azure SDK determines data size by copying the data into
739 // a new buffer, which is not a good use of memory.
740 type readerWithAzureLen struct {
741         io.Reader
742         len int
743 }
744
745 // Len satisfies the private lener interface in azure-sdk-for-go.
746 func (r *readerWithAzureLen) Len() int {
747         return r.len
748 }
749
750 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
751         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
752         if size != 0 {
753                 rdr = &readerWithAzureLen{
754                         Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
755                         len:    size,
756                 }
757         }
758         b := c.ctr.GetBlobReference(bname)
759         err := b.CreateBlockBlobFromReader(rdr, opts)
760         c.stats.TickErr(err)
761         return err
762 }
763
764 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
765         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
766         b := c.ctr.GetBlobReference(bname)
767         b.Metadata = m
768         err := b.SetMetadata(opts)
769         c.stats.TickErr(err)
770         return err
771 }
772
773 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
774         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
775         resp, err := c.ctr.ListBlobs(params)
776         c.stats.TickErr(err)
777         return resp, err
778 }
779
780 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
781         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
782         b := c.ctr.GetBlobReference(bname)
783         err := b.Delete(opts)
784         c.stats.TickErr(err)
785         return err
786 }