13647: Use cluster config instead of custom keepstore config.
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
26         "github.com/Azure/azure-sdk-for-go/storage"
27         "github.com/prometheus/client_golang/prometheus"
28         "github.com/sirupsen/logrus"
29 )
30
31 func init() {
32         driver["Azure"] = newAzureBlobVolume
33 }
34
35 func newAzureBlobVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
36         v := &AzureBlobVolume{
37                 StorageBaseURL:    storage.DefaultBaseURL,
38                 RequestTimeout:    azureDefaultRequestTimeout,
39                 WriteRaceInterval: azureDefaultWriteRaceInterval,
40                 WriteRacePollTime: azureDefaultWriteRacePollTime,
41                 cluster:           cluster,
42                 volume:            volume,
43                 logger:            logger,
44                 metrics:           metrics,
45         }
46         err := json.Unmarshal(volume.DriverParameters, &v)
47         if err != nil {
48                 return nil, err
49         }
50         if v.ListBlobsRetryDelay == 0 {
51                 v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay
52         }
53         if v.ListBlobsMaxAttempts == 0 {
54                 v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts
55         }
56         if v.ContainerName == "" || v.StorageAccountName == "" || v.StorageAccountKey == "" {
57                 return nil, errors.New("DriverParameters: ContainerName, StorageAccountName, and StorageAccountKey must be provided")
58         }
59         azc, err := storage.NewClient(v.StorageAccountName, v.StorageAccountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
60         if err != nil {
61                 return nil, fmt.Errorf("creating Azure storage client: %s", err)
62         }
63         v.azClient = azc
64         v.azClient.Sender = &singleSender{}
65         v.azClient.HTTPClient = &http.Client{
66                 Timeout: time.Duration(v.RequestTimeout),
67         }
68         bs := v.azClient.GetBlobService()
69         v.container = &azureContainer{
70                 ctr: bs.GetContainerReference(v.ContainerName),
71         }
72
73         if ok, err := v.container.Exists(); err != nil {
74                 return nil, err
75         } else if !ok {
76                 return nil, fmt.Errorf("Azure container %q does not exist: %s", v.ContainerName, err)
77         }
78         return v, v.check()
79 }
80
81 func (v *AzureBlobVolume) check() error {
82         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
83         v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
84         return nil
85 }
86
87 const (
88         azureDefaultRequestTimeout       = arvados.Duration(10 * time.Minute)
89         azureDefaultListBlobsMaxAttempts = 12
90         azureDefaultListBlobsRetryDelay  = arvados.Duration(10 * time.Second)
91         azureDefaultWriteRaceInterval    = arvados.Duration(15 * time.Second)
92         azureDefaultWriteRacePollTime    = arvados.Duration(time.Second)
93 )
94
95 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
96 // container.
97 type AzureBlobVolume struct {
98         StorageAccountName   string
99         StorageAccountKey    string
100         StorageBaseURL       string // "" means default, "core.windows.net"
101         ContainerName        string
102         RequestTimeout       arvados.Duration
103         ListBlobsRetryDelay  arvados.Duration
104         ListBlobsMaxAttempts int
105         MaxGetBytes          int
106         WriteRaceInterval    arvados.Duration
107         WriteRacePollTime    arvados.Duration
108
109         cluster   *arvados.Cluster
110         volume    arvados.Volume
111         logger    logrus.FieldLogger
112         metrics   *volumeMetricsVecs
113         azClient  storage.Client
114         container *azureContainer
115 }
116
117 // singleSender is a single-attempt storage.Sender.
118 type singleSender struct{}
119
120 // Send performs req exactly once.
121 func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
122         return c.HTTPClient.Do(req)
123 }
124
125 // Type implements Volume.
126 func (v *AzureBlobVolume) Type() string {
127         return "Azure"
128 }
129
130 // GetDeviceID returns a globally unique ID for the storage container.
131 func (v *AzureBlobVolume) GetDeviceID() string {
132         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
133 }
134
135 // Return true if expires_at metadata attribute is found on the block
136 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
137         metadata, err := v.container.GetBlobMetadata(loc)
138         if err != nil {
139                 return false, metadata, v.translateError(err)
140         }
141         if metadata["expires_at"] != "" {
142                 return true, metadata, nil
143         }
144         return false, metadata, nil
145 }
146
147 // Get reads a Keep block that has been stored as a block blob in the
148 // container.
149 //
150 // If the block is younger than azureWriteRaceInterval and is
151 // unexpectedly empty, assume a PutBlob operation is in progress, and
152 // wait for it to finish writing.
153 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
154         trashed, _, err := v.checkTrashed(loc)
155         if err != nil {
156                 return 0, err
157         }
158         if trashed {
159                 return 0, os.ErrNotExist
160         }
161         var deadline time.Time
162         haveDeadline := false
163         size, err := v.get(ctx, loc, buf)
164         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
165                 // Seeing a brand new empty block probably means we're
166                 // in a race with CreateBlob, which under the hood
167                 // (apparently) does "CreateEmpty" and "CommitData"
168                 // with no additional transaction locking.
169                 if !haveDeadline {
170                         t, err := v.Mtime(loc)
171                         if err != nil {
172                                 ctxlog.FromContext(ctx).Print("Got empty block (possible race) but Mtime failed: ", err)
173                                 break
174                         }
175                         deadline = t.Add(v.WriteRaceInterval.Duration())
176                         if time.Now().After(deadline) {
177                                 break
178                         }
179                         ctxlog.FromContext(ctx).Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
180                         haveDeadline = true
181                 } else if time.Now().After(deadline) {
182                         break
183                 }
184                 select {
185                 case <-ctx.Done():
186                         return 0, ctx.Err()
187                 case <-time.After(v.WriteRacePollTime.Duration()):
188                 }
189                 size, err = v.get(ctx, loc, buf)
190         }
191         if haveDeadline {
192                 ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size)
193         }
194         return size, err
195 }
196
197 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
198         ctx, cancel := context.WithCancel(ctx)
199         defer cancel()
200
201         pieceSize := BlockSize
202         if v.MaxGetBytes > 0 && v.MaxGetBytes < BlockSize {
203                 pieceSize = v.MaxGetBytes
204         }
205
206         pieces := 1
207         expectSize := len(buf)
208         if pieceSize < BlockSize {
209                 // Unfortunately the handler doesn't tell us how long the blob
210                 // is expected to be, so we have to ask Azure.
211                 props, err := v.container.GetBlobProperties(loc)
212                 if err != nil {
213                         return 0, v.translateError(err)
214                 }
215                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
216                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
217                 }
218                 expectSize = int(props.ContentLength)
219                 pieces = (expectSize + pieceSize - 1) / pieceSize
220         }
221
222         if expectSize == 0 {
223                 return 0, nil
224         }
225
226         // We'll update this actualSize if/when we get the last piece.
227         actualSize := -1
228         errors := make(chan error, pieces)
229         var wg sync.WaitGroup
230         wg.Add(pieces)
231         for p := 0; p < pieces; p++ {
232                 // Each goroutine retrieves one piece. If we hit an
233                 // error, it is sent to the errors chan so get() can
234                 // return it -- but only if the error happens before
235                 // ctx is done. This way, if ctx is done before we hit
236                 // any other error (e.g., requesting client has hung
237                 // up), we return the original ctx.Err() instead of
238                 // the secondary errors from the transfers that got
239                 // interrupted as a result.
240                 go func(p int) {
241                         defer wg.Done()
242                         startPos := p * pieceSize
243                         endPos := startPos + pieceSize
244                         if endPos > expectSize {
245                                 endPos = expectSize
246                         }
247                         var rdr io.ReadCloser
248                         var err error
249                         gotRdr := make(chan struct{})
250                         go func() {
251                                 defer close(gotRdr)
252                                 if startPos == 0 && endPos == expectSize {
253                                         rdr, err = v.container.GetBlob(loc)
254                                 } else {
255                                         rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
256                                 }
257                         }()
258                         select {
259                         case <-ctx.Done():
260                                 go func() {
261                                         <-gotRdr
262                                         if err == nil {
263                                                 rdr.Close()
264                                         }
265                                 }()
266                                 return
267                         case <-gotRdr:
268                         }
269                         if err != nil {
270                                 errors <- err
271                                 cancel()
272                                 return
273                         }
274                         go func() {
275                                 // Close the reader when the client
276                                 // hangs up or another piece fails
277                                 // (possibly interrupting ReadFull())
278                                 // or when all pieces succeed and
279                                 // get() returns.
280                                 <-ctx.Done()
281                                 rdr.Close()
282                         }()
283                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
284                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
285                                 // If we don't know the actual size,
286                                 // and just tried reading 64 MiB, it's
287                                 // normal to encounter EOF.
288                         } else if err != nil {
289                                 if ctx.Err() == nil {
290                                         errors <- err
291                                 }
292                                 cancel()
293                                 return
294                         }
295                         if p == pieces-1 {
296                                 actualSize = startPos + n
297                         }
298                 }(p)
299         }
300         wg.Wait()
301         close(errors)
302         if len(errors) > 0 {
303                 return 0, v.translateError(<-errors)
304         }
305         if ctx.Err() != nil {
306                 return 0, ctx.Err()
307         }
308         return actualSize, nil
309 }
310
311 // Compare the given data with existing stored data.
312 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
313         trashed, _, err := v.checkTrashed(loc)
314         if err != nil {
315                 return err
316         }
317         if trashed {
318                 return os.ErrNotExist
319         }
320         var rdr io.ReadCloser
321         gotRdr := make(chan struct{})
322         go func() {
323                 defer close(gotRdr)
324                 rdr, err = v.container.GetBlob(loc)
325         }()
326         select {
327         case <-ctx.Done():
328                 go func() {
329                         <-gotRdr
330                         if err == nil {
331                                 rdr.Close()
332                         }
333                 }()
334                 return ctx.Err()
335         case <-gotRdr:
336         }
337         if err != nil {
338                 return v.translateError(err)
339         }
340         defer rdr.Close()
341         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
342 }
343
344 // Put stores a Keep block as a block blob in the container.
345 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
346         if v.volume.ReadOnly {
347                 return MethodDisabledError
348         }
349         // Send the block data through a pipe, so that (if we need to)
350         // we can close the pipe early and abandon our
351         // CreateBlockBlobFromReader() goroutine, without worrying
352         // about CreateBlockBlobFromReader() accessing our block
353         // buffer after we release it.
354         bufr, bufw := io.Pipe()
355         go func() {
356                 io.Copy(bufw, bytes.NewReader(block))
357                 bufw.Close()
358         }()
359         errChan := make(chan error)
360         go func() {
361                 var body io.Reader = bufr
362                 if len(block) == 0 {
363                         // We must send a "Content-Length: 0" header,
364                         // but the http client interprets
365                         // ContentLength==0 as "unknown" unless it can
366                         // confirm by introspection that Body will
367                         // read 0 bytes.
368                         body = http.NoBody
369                         bufr.Close()
370                 }
371                 errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
372         }()
373         select {
374         case <-ctx.Done():
375                 ctxlog.FromContext(ctx).Debugf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
376                 // Our pipe might be stuck in Write(), waiting for
377                 // io.Copy() to read. If so, un-stick it. This means
378                 // CreateBlockBlobFromReader will get corrupt data,
379                 // but that's OK: the size won't match, so the write
380                 // will fail.
381                 go io.Copy(ioutil.Discard, bufr)
382                 // CloseWithError() will return once pending I/O is done.
383                 bufw.CloseWithError(ctx.Err())
384                 ctxlog.FromContext(ctx).Debugf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
385                 return ctx.Err()
386         case err := <-errChan:
387                 return err
388         }
389 }
390
391 // Touch updates the last-modified property of a block blob.
392 func (v *AzureBlobVolume) Touch(loc string) error {
393         if v.volume.ReadOnly {
394                 return MethodDisabledError
395         }
396         trashed, metadata, err := v.checkTrashed(loc)
397         if err != nil {
398                 return err
399         }
400         if trashed {
401                 return os.ErrNotExist
402         }
403
404         metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
405         return v.container.SetBlobMetadata(loc, metadata, nil)
406 }
407
408 // Mtime returns the last-modified property of a block blob.
409 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
410         trashed, _, err := v.checkTrashed(loc)
411         if err != nil {
412                 return time.Time{}, err
413         }
414         if trashed {
415                 return time.Time{}, os.ErrNotExist
416         }
417
418         props, err := v.container.GetBlobProperties(loc)
419         if err != nil {
420                 return time.Time{}, err
421         }
422         return time.Time(props.LastModified), nil
423 }
424
425 // IndexTo writes a list of Keep blocks that are stored in the
426 // container.
427 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
428         params := storage.ListBlobsParameters{
429                 Prefix:  prefix,
430                 Include: &storage.IncludeBlobDataset{Metadata: true},
431         }
432         for page := 1; ; page++ {
433                 resp, err := v.listBlobs(page, params)
434                 if err != nil {
435                         return err
436                 }
437                 for _, b := range resp.Blobs {
438                         if !v.isKeepBlock(b.Name) {
439                                 continue
440                         }
441                         modtime := time.Time(b.Properties.LastModified)
442                         if b.Properties.ContentLength == 0 && modtime.Add(v.WriteRaceInterval.Duration()).After(time.Now()) {
443                                 // A new zero-length blob is probably
444                                 // just a new non-empty blob that
445                                 // hasn't committed its data yet (see
446                                 // Get()), and in any case has no
447                                 // value.
448                                 continue
449                         }
450                         if b.Metadata["expires_at"] != "" {
451                                 // Trashed blob; exclude it from response
452                                 continue
453                         }
454                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
455                 }
456                 if resp.NextMarker == "" {
457                         return nil
458                 }
459                 params.Marker = resp.NextMarker
460         }
461 }
462
463 // call v.container.ListBlobs, retrying if needed.
464 func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
465         for i := 0; i < v.ListBlobsMaxAttempts; i++ {
466                 resp, err = v.container.ListBlobs(params)
467                 err = v.translateError(err)
468                 if err == VolumeBusyError {
469                         v.logger.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err)
470                         time.Sleep(time.Duration(v.ListBlobsRetryDelay))
471                         continue
472                 } else {
473                         break
474                 }
475         }
476         return
477 }
478
479 // Trash a Keep block.
480 func (v *AzureBlobVolume) Trash(loc string) error {
481         if v.volume.ReadOnly {
482                 return MethodDisabledError
483         }
484
485         // Ideally we would use If-Unmodified-Since, but that
486         // particular condition seems to be ignored by Azure. Instead,
487         // we get the Etag before checking Mtime, and use If-Match to
488         // ensure we don't delete data if Put() or Touch() happens
489         // between our calls to Mtime() and DeleteBlob().
490         props, err := v.container.GetBlobProperties(loc)
491         if err != nil {
492                 return err
493         }
494         if t, err := v.Mtime(loc); err != nil {
495                 return err
496         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
497                 return nil
498         }
499
500         // If TrashLifetime == 0, just delete it
501         if v.cluster.Collections.BlobTrashLifetime == 0 {
502                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
503                         IfMatch: props.Etag,
504                 })
505         }
506
507         // Otherwise, mark as trash
508         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
509                 "expires_at": fmt.Sprintf("%d", time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix()),
510         }, &storage.SetBlobMetadataOptions{
511                 IfMatch: props.Etag,
512         })
513 }
514
515 // Untrash a Keep block.
516 // Delete the expires_at metadata attribute
517 func (v *AzureBlobVolume) Untrash(loc string) error {
518         // if expires_at does not exist, return NotFoundError
519         metadata, err := v.container.GetBlobMetadata(loc)
520         if err != nil {
521                 return v.translateError(err)
522         }
523         if metadata["expires_at"] == "" {
524                 return os.ErrNotExist
525         }
526
527         // reset expires_at metadata attribute
528         metadata["expires_at"] = ""
529         err = v.container.SetBlobMetadata(loc, metadata, nil)
530         return v.translateError(err)
531 }
532
533 // Status returns a VolumeStatus struct with placeholder data.
534 func (v *AzureBlobVolume) Status() *VolumeStatus {
535         return &VolumeStatus{
536                 DeviceNum: 1,
537                 BytesFree: BlockSize * 1000,
538                 BytesUsed: 1,
539         }
540 }
541
542 // String returns a volume label, including the container name.
543 func (v *AzureBlobVolume) String() string {
544         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
545 }
546
547 // If possible, translate an Azure SDK error to a recognizable error
548 // like os.ErrNotExist.
549 func (v *AzureBlobVolume) translateError(err error) error {
550         switch {
551         case err == nil:
552                 return err
553         case strings.Contains(err.Error(), "StatusCode=503"):
554                 // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
555                 return VolumeBusyError
556         case strings.Contains(err.Error(), "Not Found"):
557                 // "storage: service returned without a response body (404 Not Found)"
558                 return os.ErrNotExist
559         default:
560                 return err
561         }
562 }
563
564 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
565
566 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
567         return keepBlockRegexp.MatchString(s)
568 }
569
570 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
571 // and deletes them from the volume.
572 func (v *AzureBlobVolume) EmptyTrash() {
573         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
574                 return
575         }
576
577         var bytesDeleted, bytesInTrash int64
578         var blocksDeleted, blocksInTrash int64
579
580         doBlob := func(b storage.Blob) {
581                 // Check whether the block is flagged as trash
582                 if b.Metadata["expires_at"] == "" {
583                         return
584                 }
585
586                 atomic.AddInt64(&blocksInTrash, 1)
587                 atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
588
589                 expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
590                 if err != nil {
591                         v.logger.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
592                         return
593                 }
594
595                 if expiresAt > time.Now().Unix() {
596                         return
597                 }
598
599                 err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
600                         IfMatch: b.Properties.Etag,
601                 })
602                 if err != nil {
603                         v.logger.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
604                         return
605                 }
606                 atomic.AddInt64(&blocksDeleted, 1)
607                 atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
608         }
609
610         var wg sync.WaitGroup
611         todo := make(chan storage.Blob, v.cluster.Collections.BlobDeleteConcurrency)
612         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
613                 wg.Add(1)
614                 go func() {
615                         defer wg.Done()
616                         for b := range todo {
617                                 doBlob(b)
618                         }
619                 }()
620         }
621
622         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
623         for page := 1; ; page++ {
624                 resp, err := v.listBlobs(page, params)
625                 if err != nil {
626                         v.logger.Printf("EmptyTrash: ListBlobs: %v", err)
627                         break
628                 }
629                 for _, b := range resp.Blobs {
630                         todo <- b
631                 }
632                 if resp.NextMarker == "" {
633                         break
634                 }
635                 params.Marker = resp.NextMarker
636         }
637         close(todo)
638         wg.Wait()
639
640         v.logger.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
641 }
642
643 // InternalStats returns bucket I/O and API call counters.
644 func (v *AzureBlobVolume) InternalStats() interface{} {
645         return &v.container.stats
646 }
647
648 type azureBlobStats struct {
649         statsTicker
650         Ops              uint64
651         GetOps           uint64
652         GetRangeOps      uint64
653         GetMetadataOps   uint64
654         GetPropertiesOps uint64
655         CreateOps        uint64
656         SetMetadataOps   uint64
657         DelOps           uint64
658         ListOps          uint64
659 }
660
661 func (s *azureBlobStats) TickErr(err error) {
662         if err == nil {
663                 return
664         }
665         errType := fmt.Sprintf("%T", err)
666         if err, ok := err.(storage.AzureStorageServiceError); ok {
667                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
668         }
669         s.statsTicker.TickErr(err, errType)
670 }
671
672 // azureContainer wraps storage.Container in order to count I/O and
673 // API usage stats.
674 type azureContainer struct {
675         ctr   *storage.Container
676         stats azureBlobStats
677 }
678
679 func (c *azureContainer) Exists() (bool, error) {
680         c.stats.TickOps("exists")
681         c.stats.Tick(&c.stats.Ops)
682         ok, err := c.ctr.Exists()
683         c.stats.TickErr(err)
684         return ok, err
685 }
686
687 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
688         c.stats.TickOps("get_metadata")
689         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
690         b := c.ctr.GetBlobReference(bname)
691         err := b.GetMetadata(nil)
692         c.stats.TickErr(err)
693         return b.Metadata, err
694 }
695
696 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
697         c.stats.TickOps("get_properties")
698         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
699         b := c.ctr.GetBlobReference(bname)
700         err := b.GetProperties(nil)
701         c.stats.TickErr(err)
702         return &b.Properties, err
703 }
704
705 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
706         c.stats.TickOps("get")
707         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
708         b := c.ctr.GetBlobReference(bname)
709         rdr, err := b.Get(nil)
710         c.stats.TickErr(err)
711         return NewCountingReader(rdr, c.stats.TickInBytes), err
712 }
713
714 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
715         c.stats.TickOps("get_range")
716         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
717         b := c.ctr.GetBlobReference(bname)
718         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
719                 Range: &storage.BlobRange{
720                         Start: uint64(start),
721                         End:   uint64(end),
722                 },
723                 GetBlobOptions: opts,
724         })
725         c.stats.TickErr(err)
726         return NewCountingReader(rdr, c.stats.TickInBytes), err
727 }
728
729 // If we give it an io.Reader that doesn't also have a Len() int
730 // method, the Azure SDK determines data size by copying the data into
731 // a new buffer, which is not a good use of memory.
732 type readerWithAzureLen struct {
733         io.Reader
734         len int
735 }
736
737 // Len satisfies the private lener interface in azure-sdk-for-go.
738 func (r *readerWithAzureLen) Len() int {
739         return r.len
740 }
741
742 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
743         c.stats.TickOps("create")
744         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
745         if size != 0 {
746                 rdr = &readerWithAzureLen{
747                         Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
748                         len:    size,
749                 }
750         }
751         b := c.ctr.GetBlobReference(bname)
752         err := b.CreateBlockBlobFromReader(rdr, opts)
753         c.stats.TickErr(err)
754         return err
755 }
756
757 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
758         c.stats.TickOps("set_metadata")
759         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
760         b := c.ctr.GetBlobReference(bname)
761         b.Metadata = m
762         err := b.SetMetadata(opts)
763         c.stats.TickErr(err)
764         return err
765 }
766
767 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
768         c.stats.TickOps("list")
769         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
770         resp, err := c.ctr.ListBlobs(params)
771         c.stats.TickErr(err)
772         return resp, err
773 }
774
775 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
776         c.stats.TickOps("delete")
777         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
778         b := c.ctr.GetBlobReference(bname)
779         err := b.Delete(opts)
780         c.stats.TickErr(err)
781         return err
782 }