Merge branch '11645-keepstore-storageclasses' closes #11645
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "github.com/Azure/azure-sdk-for-go/storage"
25 )
26
27 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
28
29 var (
30         azureMaxGetBytes           int
31         azureStorageAccountName    string
32         azureStorageAccountKeyFile string
33         azureStorageReplication    int
34         azureWriteRaceInterval     = 15 * time.Second
35         azureWriteRacePollTime     = time.Second
36 )
37
38 func readKeyFromFile(file string) (string, error) {
39         buf, err := ioutil.ReadFile(file)
40         if err != nil {
41                 return "", errors.New("reading key from " + file + ": " + err.Error())
42         }
43         accountKey := strings.TrimSpace(string(buf))
44         if accountKey == "" {
45                 return "", errors.New("empty account key in " + file)
46         }
47         return accountKey, nil
48 }
49
50 type azureVolumeAdder struct {
51         *Config
52 }
53
54 // String implements flag.Value
55 func (s *azureVolumeAdder) String() string {
56         return "-"
57 }
58
59 func (s *azureVolumeAdder) Set(containerName string) error {
60         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
61                 ContainerName:         containerName,
62                 StorageAccountName:    azureStorageAccountName,
63                 StorageAccountKeyFile: azureStorageAccountKeyFile,
64                 AzureReplication:      azureStorageReplication,
65                 ReadOnly:              deprecated.flagReadonly,
66         })
67         return nil
68 }
69
70 func init() {
71         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
72
73         flag.Var(&azureVolumeAdder{theConfig},
74                 "azure-storage-container-volume",
75                 "Use the given container as a storage volume. Can be given multiple times.")
76         flag.StringVar(
77                 &azureStorageAccountName,
78                 "azure-storage-account-name",
79                 "",
80                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
81         flag.StringVar(
82                 &azureStorageAccountKeyFile,
83                 "azure-storage-account-key-file",
84                 "",
85                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
86         flag.IntVar(
87                 &azureStorageReplication,
88                 "azure-storage-replication",
89                 3,
90                 "Replication level to report to clients when data is stored in an Azure container.")
91         flag.IntVar(
92                 &azureMaxGetBytes,
93                 "azure-max-get-bytes",
94                 BlockSize,
95                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
96 }
97
98 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
99 // container.
100 type AzureBlobVolume struct {
101         StorageAccountName    string
102         StorageAccountKeyFile string
103         StorageBaseURL        string // "" means default, "core.windows.net"
104         ContainerName         string
105         AzureReplication      int
106         ReadOnly              bool
107         RequestTimeout        arvados.Duration
108         StorageClasses        []string
109
110         azClient  storage.Client
111         container *azureContainer
112 }
113
114 // singleSender is a single-attempt storage.Sender.
115 type singleSender struct{}
116
117 // Send performs req exactly once.
118 func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
119         return c.HTTPClient.Do(req)
120 }
121
122 // Examples implements VolumeWithExamples.
123 func (*AzureBlobVolume) Examples() []Volume {
124         return []Volume{
125                 &AzureBlobVolume{
126                         StorageAccountName:    "example-account-name",
127                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
128                         ContainerName:         "example-container-name",
129                         AzureReplication:      3,
130                         RequestTimeout:        azureDefaultRequestTimeout,
131                 },
132                 &AzureBlobVolume{
133                         StorageAccountName:    "cn-account-name",
134                         StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
135                         StorageBaseURL:        "core.chinacloudapi.cn",
136                         ContainerName:         "cn-container-name",
137                         AzureReplication:      3,
138                         RequestTimeout:        azureDefaultRequestTimeout,
139                 },
140         }
141 }
142
143 // Type implements Volume.
144 func (v *AzureBlobVolume) Type() string {
145         return "Azure"
146 }
147
148 // Start implements Volume.
149 func (v *AzureBlobVolume) Start() error {
150         if v.ContainerName == "" {
151                 return errors.New("no container name given")
152         }
153         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
154                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
155         }
156         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
157         if err != nil {
158                 return err
159         }
160         if v.StorageBaseURL == "" {
161                 v.StorageBaseURL = storage.DefaultBaseURL
162         }
163         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
164         if err != nil {
165                 return fmt.Errorf("creating Azure storage client: %s", err)
166         }
167         v.azClient.Sender = &singleSender{}
168
169         if v.RequestTimeout == 0 {
170                 v.RequestTimeout = azureDefaultRequestTimeout
171         }
172         v.azClient.HTTPClient = &http.Client{
173                 Timeout: time.Duration(v.RequestTimeout),
174         }
175         bs := v.azClient.GetBlobService()
176         v.container = &azureContainer{
177                 ctr: bs.GetContainerReference(v.ContainerName),
178         }
179
180         if ok, err := v.container.Exists(); err != nil {
181                 return err
182         } else if !ok {
183                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
184         }
185         return nil
186 }
187
188 // DeviceID returns a globally unique ID for the storage container.
189 func (v *AzureBlobVolume) DeviceID() string {
190         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
191 }
192
193 // Return true if expires_at metadata attribute is found on the block
194 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
195         metadata, err := v.container.GetBlobMetadata(loc)
196         if err != nil {
197                 return false, metadata, v.translateError(err)
198         }
199         if metadata["expires_at"] != "" {
200                 return true, metadata, nil
201         }
202         return false, metadata, nil
203 }
204
205 // Get reads a Keep block that has been stored as a block blob in the
206 // container.
207 //
208 // If the block is younger than azureWriteRaceInterval and is
209 // unexpectedly empty, assume a PutBlob operation is in progress, and
210 // wait for it to finish writing.
211 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
212         trashed, _, err := v.checkTrashed(loc)
213         if err != nil {
214                 return 0, err
215         }
216         if trashed {
217                 return 0, os.ErrNotExist
218         }
219         var deadline time.Time
220         haveDeadline := false
221         size, err := v.get(ctx, loc, buf)
222         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
223                 // Seeing a brand new empty block probably means we're
224                 // in a race with CreateBlob, which under the hood
225                 // (apparently) does "CreateEmpty" and "CommitData"
226                 // with no additional transaction locking.
227                 if !haveDeadline {
228                         t, err := v.Mtime(loc)
229                         if err != nil {
230                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
231                                 break
232                         }
233                         deadline = t.Add(azureWriteRaceInterval)
234                         if time.Now().After(deadline) {
235                                 break
236                         }
237                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
238                         haveDeadline = true
239                 } else if time.Now().After(deadline) {
240                         break
241                 }
242                 select {
243                 case <-ctx.Done():
244                         return 0, ctx.Err()
245                 case <-time.After(azureWriteRacePollTime):
246                 }
247                 size, err = v.get(ctx, loc, buf)
248         }
249         if haveDeadline {
250                 log.Printf("Race ended with size==%d", size)
251         }
252         return size, err
253 }
254
255 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
256         ctx, cancel := context.WithCancel(ctx)
257         defer cancel()
258         expectSize := len(buf)
259         if azureMaxGetBytes < BlockSize {
260                 // Unfortunately the handler doesn't tell us how long the blob
261                 // is expected to be, so we have to ask Azure.
262                 props, err := v.container.GetBlobProperties(loc)
263                 if err != nil {
264                         return 0, v.translateError(err)
265                 }
266                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
267                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
268                 }
269                 expectSize = int(props.ContentLength)
270         }
271
272         if expectSize == 0 {
273                 return 0, nil
274         }
275
276         // We'll update this actualSize if/when we get the last piece.
277         actualSize := -1
278         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
279         errors := make(chan error, pieces)
280         var wg sync.WaitGroup
281         wg.Add(pieces)
282         for p := 0; p < pieces; p++ {
283                 // Each goroutine retrieves one piece. If we hit an
284                 // error, it is sent to the errors chan so get() can
285                 // return it -- but only if the error happens before
286                 // ctx is done. This way, if ctx is done before we hit
287                 // any other error (e.g., requesting client has hung
288                 // up), we return the original ctx.Err() instead of
289                 // the secondary errors from the transfers that got
290                 // interrupted as a result.
291                 go func(p int) {
292                         defer wg.Done()
293                         startPos := p * azureMaxGetBytes
294                         endPos := startPos + azureMaxGetBytes
295                         if endPos > expectSize {
296                                 endPos = expectSize
297                         }
298                         var rdr io.ReadCloser
299                         var err error
300                         gotRdr := make(chan struct{})
301                         go func() {
302                                 defer close(gotRdr)
303                                 if startPos == 0 && endPos == expectSize {
304                                         rdr, err = v.container.GetBlob(loc)
305                                 } else {
306                                         rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
307                                 }
308                         }()
309                         select {
310                         case <-ctx.Done():
311                                 go func() {
312                                         <-gotRdr
313                                         if err == nil {
314                                                 rdr.Close()
315                                         }
316                                 }()
317                                 return
318                         case <-gotRdr:
319                         }
320                         if err != nil {
321                                 errors <- err
322                                 cancel()
323                                 return
324                         }
325                         go func() {
326                                 // Close the reader when the client
327                                 // hangs up or another piece fails
328                                 // (possibly interrupting ReadFull())
329                                 // or when all pieces succeed and
330                                 // get() returns.
331                                 <-ctx.Done()
332                                 rdr.Close()
333                         }()
334                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
335                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
336                                 // If we don't know the actual size,
337                                 // and just tried reading 64 MiB, it's
338                                 // normal to encounter EOF.
339                         } else if err != nil {
340                                 if ctx.Err() == nil {
341                                         errors <- err
342                                 }
343                                 cancel()
344                                 return
345                         }
346                         if p == pieces-1 {
347                                 actualSize = startPos + n
348                         }
349                 }(p)
350         }
351         wg.Wait()
352         close(errors)
353         if len(errors) > 0 {
354                 return 0, v.translateError(<-errors)
355         }
356         if ctx.Err() != nil {
357                 return 0, ctx.Err()
358         }
359         return actualSize, nil
360 }
361
362 // Compare the given data with existing stored data.
363 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
364         trashed, _, err := v.checkTrashed(loc)
365         if err != nil {
366                 return err
367         }
368         if trashed {
369                 return os.ErrNotExist
370         }
371         var rdr io.ReadCloser
372         gotRdr := make(chan struct{})
373         go func() {
374                 defer close(gotRdr)
375                 rdr, err = v.container.GetBlob(loc)
376         }()
377         select {
378         case <-ctx.Done():
379                 go func() {
380                         <-gotRdr
381                         if err == nil {
382                                 rdr.Close()
383                         }
384                 }()
385                 return ctx.Err()
386         case <-gotRdr:
387         }
388         if err != nil {
389                 return v.translateError(err)
390         }
391         defer rdr.Close()
392         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
393 }
394
395 // Put stores a Keep block as a block blob in the container.
396 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
397         if v.ReadOnly {
398                 return MethodDisabledError
399         }
400         // Send the block data through a pipe, so that (if we need to)
401         // we can close the pipe early and abandon our
402         // CreateBlockBlobFromReader() goroutine, without worrying
403         // about CreateBlockBlobFromReader() accessing our block
404         // buffer after we release it.
405         bufr, bufw := io.Pipe()
406         go func() {
407                 io.Copy(bufw, bytes.NewReader(block))
408                 bufw.Close()
409         }()
410         errChan := make(chan error)
411         go func() {
412                 var body io.Reader = bufr
413                 if len(block) == 0 {
414                         // We must send a "Content-Length: 0" header,
415                         // but the http client interprets
416                         // ContentLength==0 as "unknown" unless it can
417                         // confirm by introspection that Body will
418                         // read 0 bytes.
419                         body = http.NoBody
420                         bufr.Close()
421                 }
422                 errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
423         }()
424         select {
425         case <-ctx.Done():
426                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
427                 // Our pipe might be stuck in Write(), waiting for
428                 // io.Copy() to read. If so, un-stick it. This means
429                 // CreateBlockBlobFromReader will get corrupt data,
430                 // but that's OK: the size won't match, so the write
431                 // will fail.
432                 go io.Copy(ioutil.Discard, bufr)
433                 // CloseWithError() will return once pending I/O is done.
434                 bufw.CloseWithError(ctx.Err())
435                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
436                 return ctx.Err()
437         case err := <-errChan:
438                 return err
439         }
440 }
441
442 // Touch updates the last-modified property of a block blob.
443 func (v *AzureBlobVolume) Touch(loc string) error {
444         if v.ReadOnly {
445                 return MethodDisabledError
446         }
447         trashed, metadata, err := v.checkTrashed(loc)
448         if err != nil {
449                 return err
450         }
451         if trashed {
452                 return os.ErrNotExist
453         }
454
455         metadata["touch"] = fmt.Sprintf("%d", time.Now())
456         return v.container.SetBlobMetadata(loc, metadata, nil)
457 }
458
459 // Mtime returns the last-modified property of a block blob.
460 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
461         trashed, _, err := v.checkTrashed(loc)
462         if err != nil {
463                 return time.Time{}, err
464         }
465         if trashed {
466                 return time.Time{}, os.ErrNotExist
467         }
468
469         props, err := v.container.GetBlobProperties(loc)
470         if err != nil {
471                 return time.Time{}, err
472         }
473         return time.Time(props.LastModified), nil
474 }
475
476 // IndexTo writes a list of Keep blocks that are stored in the
477 // container.
478 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
479         params := storage.ListBlobsParameters{
480                 Prefix:  prefix,
481                 Include: &storage.IncludeBlobDataset{Metadata: true},
482         }
483         for {
484                 resp, err := v.container.ListBlobs(params)
485                 if err != nil {
486                         return err
487                 }
488                 for _, b := range resp.Blobs {
489                         if !v.isKeepBlock(b.Name) {
490                                 continue
491                         }
492                         modtime := time.Time(b.Properties.LastModified)
493                         if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
494                                 // A new zero-length blob is probably
495                                 // just a new non-empty blob that
496                                 // hasn't committed its data yet (see
497                                 // Get()), and in any case has no
498                                 // value.
499                                 continue
500                         }
501                         if b.Metadata["expires_at"] != "" {
502                                 // Trashed blob; exclude it from response
503                                 continue
504                         }
505                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
506                 }
507                 if resp.NextMarker == "" {
508                         return nil
509                 }
510                 params.Marker = resp.NextMarker
511         }
512 }
513
514 // Trash a Keep block.
515 func (v *AzureBlobVolume) Trash(loc string) error {
516         if v.ReadOnly {
517                 return MethodDisabledError
518         }
519
520         // Ideally we would use If-Unmodified-Since, but that
521         // particular condition seems to be ignored by Azure. Instead,
522         // we get the Etag before checking Mtime, and use If-Match to
523         // ensure we don't delete data if Put() or Touch() happens
524         // between our calls to Mtime() and DeleteBlob().
525         props, err := v.container.GetBlobProperties(loc)
526         if err != nil {
527                 return err
528         }
529         if t, err := v.Mtime(loc); err != nil {
530                 return err
531         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
532                 return nil
533         }
534
535         // If TrashLifetime == 0, just delete it
536         if theConfig.TrashLifetime == 0 {
537                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
538                         IfMatch: props.Etag,
539                 })
540         }
541
542         // Otherwise, mark as trash
543         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
544                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
545         }, &storage.SetBlobMetadataOptions{
546                 IfMatch: props.Etag,
547         })
548 }
549
550 // Untrash a Keep block.
551 // Delete the expires_at metadata attribute
552 func (v *AzureBlobVolume) Untrash(loc string) error {
553         // if expires_at does not exist, return NotFoundError
554         metadata, err := v.container.GetBlobMetadata(loc)
555         if err != nil {
556                 return v.translateError(err)
557         }
558         if metadata["expires_at"] == "" {
559                 return os.ErrNotExist
560         }
561
562         // reset expires_at metadata attribute
563         metadata["expires_at"] = ""
564         err = v.container.SetBlobMetadata(loc, metadata, nil)
565         return v.translateError(err)
566 }
567
568 // Status returns a VolumeStatus struct with placeholder data.
569 func (v *AzureBlobVolume) Status() *VolumeStatus {
570         return &VolumeStatus{
571                 DeviceNum: 1,
572                 BytesFree: BlockSize * 1000,
573                 BytesUsed: 1,
574         }
575 }
576
577 // String returns a volume label, including the container name.
578 func (v *AzureBlobVolume) String() string {
579         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
580 }
581
582 // Writable returns true, unless the -readonly flag was on when the
583 // volume was added.
584 func (v *AzureBlobVolume) Writable() bool {
585         return !v.ReadOnly
586 }
587
588 // Replication returns the replication level of the container, as
589 // specified by the -azure-storage-replication argument.
590 func (v *AzureBlobVolume) Replication() int {
591         return v.AzureReplication
592 }
593
594 // GetStorageClasses implements Volume
595 func (v *AzureBlobVolume) GetStorageClasses() []string {
596         return v.StorageClasses
597 }
598
599 // If possible, translate an Azure SDK error to a recognizable error
600 // like os.ErrNotExist.
601 func (v *AzureBlobVolume) translateError(err error) error {
602         switch {
603         case err == nil:
604                 return err
605         case strings.Contains(err.Error(), "Not Found"):
606                 // "storage: service returned without a response body (404 Not Found)"
607                 return os.ErrNotExist
608         default:
609                 return err
610         }
611 }
612
613 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
614
615 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
616         return keepBlockRegexp.MatchString(s)
617 }
618
619 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
620 // and deletes them from the volume.
621 func (v *AzureBlobVolume) EmptyTrash() {
622         var bytesDeleted, bytesInTrash int64
623         var blocksDeleted, blocksInTrash int
624         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
625
626         for {
627                 resp, err := v.container.ListBlobs(params)
628                 if err != nil {
629                         log.Printf("EmptyTrash: ListBlobs: %v", err)
630                         break
631                 }
632                 for _, b := range resp.Blobs {
633                         // Check if the block is expired
634                         if b.Metadata["expires_at"] == "" {
635                                 continue
636                         }
637
638                         blocksInTrash++
639                         bytesInTrash += b.Properties.ContentLength
640
641                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
642                         if err != nil {
643                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
644                                 continue
645                         }
646
647                         if expiresAt > time.Now().Unix() {
648                                 continue
649                         }
650
651                         err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
652                                 IfMatch: b.Properties.Etag,
653                         })
654                         if err != nil {
655                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
656                                 continue
657                         }
658                         blocksDeleted++
659                         bytesDeleted += b.Properties.ContentLength
660                 }
661                 if resp.NextMarker == "" {
662                         break
663                 }
664                 params.Marker = resp.NextMarker
665         }
666
667         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
668 }
669
670 // InternalStats returns bucket I/O and API call counters.
671 func (v *AzureBlobVolume) InternalStats() interface{} {
672         return &v.container.stats
673 }
674
675 type azureBlobStats struct {
676         statsTicker
677         Ops              uint64
678         GetOps           uint64
679         GetRangeOps      uint64
680         GetMetadataOps   uint64
681         GetPropertiesOps uint64
682         CreateOps        uint64
683         SetMetadataOps   uint64
684         DelOps           uint64
685         ListOps          uint64
686 }
687
688 func (s *azureBlobStats) TickErr(err error) {
689         if err == nil {
690                 return
691         }
692         errType := fmt.Sprintf("%T", err)
693         if err, ok := err.(storage.AzureStorageServiceError); ok {
694                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
695         }
696         log.Printf("errType %T, err %s", err, err)
697         s.statsTicker.TickErr(err, errType)
698 }
699
700 // azureContainer wraps storage.Container in order to count I/O and
701 // API usage stats.
702 type azureContainer struct {
703         ctr   *storage.Container
704         stats azureBlobStats
705 }
706
707 func (c *azureContainer) Exists() (bool, error) {
708         c.stats.Tick(&c.stats.Ops)
709         ok, err := c.ctr.Exists()
710         c.stats.TickErr(err)
711         return ok, err
712 }
713
714 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
715         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
716         b := c.ctr.GetBlobReference(bname)
717         err := b.GetMetadata(nil)
718         c.stats.TickErr(err)
719         return b.Metadata, err
720 }
721
722 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
723         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
724         b := c.ctr.GetBlobReference(bname)
725         err := b.GetProperties(nil)
726         c.stats.TickErr(err)
727         return &b.Properties, err
728 }
729
730 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
731         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
732         b := c.ctr.GetBlobReference(bname)
733         rdr, err := b.Get(nil)
734         c.stats.TickErr(err)
735         return NewCountingReader(rdr, c.stats.TickInBytes), err
736 }
737
738 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
739         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
740         b := c.ctr.GetBlobReference(bname)
741         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
742                 Range: &storage.BlobRange{
743                         Start: uint64(start),
744                         End:   uint64(end),
745                 },
746                 GetBlobOptions: opts,
747         })
748         c.stats.TickErr(err)
749         return NewCountingReader(rdr, c.stats.TickInBytes), err
750 }
751
752 // If we give it an io.Reader that doesn't also have a Len() int
753 // method, the Azure SDK determines data size by copying the data into
754 // a new buffer, which is not a good use of memory.
755 type readerWithAzureLen struct {
756         io.Reader
757         len int
758 }
759
760 // Len satisfies the private lener interface in azure-sdk-for-go.
761 func (r *readerWithAzureLen) Len() int {
762         return r.len
763 }
764
765 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
766         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
767         if size != 0 {
768                 rdr = &readerWithAzureLen{
769                         Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
770                         len:    size,
771                 }
772         }
773         b := c.ctr.GetBlobReference(bname)
774         err := b.CreateBlockBlobFromReader(rdr, opts)
775         c.stats.TickErr(err)
776         return err
777 }
778
779 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
780         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
781         b := c.ctr.GetBlobReference(bname)
782         b.Metadata = m
783         err := b.SetMetadata(opts)
784         c.stats.TickErr(err)
785         return err
786 }
787
788 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
789         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
790         resp, err := c.ctr.ListBlobs(params)
791         c.stats.TickErr(err)
792         return resp, err
793 }
794
795 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
796         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
797         b := c.ctr.GetBlobReference(bname)
798         err := b.Delete(opts)
799         c.stats.TickErr(err)
800         return err
801 }