6e8def82222eb1e169af6fa2632c21b20bc1bdf8
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/ctxlog"
26         "github.com/Azure/azure-sdk-for-go/storage"
27         "github.com/prometheus/client_golang/prometheus"
28         "github.com/sirupsen/logrus"
29 )
30
31 func init() {
32         driver["Azure"] = newAzureBlobVolume
33 }
34
35 func newAzureBlobVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
36         v := &AzureBlobVolume{
37                 RequestTimeout:    azureDefaultRequestTimeout,
38                 WriteRaceInterval: azureDefaultWriteRaceInterval,
39                 WriteRacePollTime: azureDefaultWriteRacePollTime,
40                 cluster:           cluster,
41                 volume:            volume,
42                 logger:            logger,
43                 metrics:           metrics,
44         }
45         err := json.Unmarshal(volume.DriverParameters, &v)
46         if err != nil {
47                 return nil, err
48         }
49         if v.ListBlobsRetryDelay == 0 {
50                 v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay
51         }
52         if v.ListBlobsMaxAttempts == 0 {
53                 v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts
54         }
55         if v.StorageBaseURL == "" {
56                 v.StorageBaseURL = storage.DefaultBaseURL
57         }
58         if v.ContainerName == "" || v.StorageAccountName == "" || v.StorageAccountKey == "" {
59                 return nil, errors.New("DriverParameters: ContainerName, StorageAccountName, and StorageAccountKey must be provided")
60         }
61         azc, err := storage.NewClient(v.StorageAccountName, v.StorageAccountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
62         if err != nil {
63                 return nil, fmt.Errorf("creating Azure storage client: %s", err)
64         }
65         v.azClient = azc
66         v.azClient.Sender = &singleSender{}
67         v.azClient.HTTPClient = &http.Client{
68                 Timeout: time.Duration(v.RequestTimeout),
69         }
70         bs := v.azClient.GetBlobService()
71         v.container = &azureContainer{
72                 ctr: bs.GetContainerReference(v.ContainerName),
73         }
74
75         if ok, err := v.container.Exists(); err != nil {
76                 return nil, err
77         } else if !ok {
78                 return nil, fmt.Errorf("Azure container %q does not exist: %s", v.ContainerName, err)
79         }
80         return v, v.check()
81 }
82
83 func (v *AzureBlobVolume) check() error {
84         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
85         v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
86         return nil
87 }
88
89 const (
90         azureDefaultRequestTimeout       = arvados.Duration(10 * time.Minute)
91         azureDefaultListBlobsMaxAttempts = 12
92         azureDefaultListBlobsRetryDelay  = arvados.Duration(10 * time.Second)
93         azureDefaultWriteRaceInterval    = arvados.Duration(15 * time.Second)
94         azureDefaultWriteRacePollTime    = arvados.Duration(time.Second)
95 )
96
97 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
98 // container.
99 type AzureBlobVolume struct {
100         StorageAccountName   string
101         StorageAccountKey    string
102         StorageBaseURL       string // "" means default, "core.windows.net"
103         ContainerName        string
104         RequestTimeout       arvados.Duration
105         ListBlobsRetryDelay  arvados.Duration
106         ListBlobsMaxAttempts int
107         MaxGetBytes          int
108         WriteRaceInterval    arvados.Duration
109         WriteRacePollTime    arvados.Duration
110
111         cluster   *arvados.Cluster
112         volume    arvados.Volume
113         logger    logrus.FieldLogger
114         metrics   *volumeMetricsVecs
115         azClient  storage.Client
116         container *azureContainer
117 }
118
119 // singleSender is a single-attempt storage.Sender.
120 type singleSender struct{}
121
122 // Send performs req exactly once.
123 func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
124         return c.HTTPClient.Do(req)
125 }
126
127 // Type implements Volume.
128 func (v *AzureBlobVolume) Type() string {
129         return "Azure"
130 }
131
132 // GetDeviceID returns a globally unique ID for the storage container.
133 func (v *AzureBlobVolume) GetDeviceID() string {
134         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
135 }
136
137 // Return true if expires_at metadata attribute is found on the block
138 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
139         metadata, err := v.container.GetBlobMetadata(loc)
140         if err != nil {
141                 return false, metadata, v.translateError(err)
142         }
143         if metadata["expires_at"] != "" {
144                 return true, metadata, nil
145         }
146         return false, metadata, nil
147 }
148
149 // Get reads a Keep block that has been stored as a block blob in the
150 // container.
151 //
152 // If the block is younger than azureWriteRaceInterval and is
153 // unexpectedly empty, assume a PutBlob operation is in progress, and
154 // wait for it to finish writing.
155 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
156         trashed, _, err := v.checkTrashed(loc)
157         if err != nil {
158                 return 0, err
159         }
160         if trashed {
161                 return 0, os.ErrNotExist
162         }
163         var deadline time.Time
164         haveDeadline := false
165         size, err := v.get(ctx, loc, buf)
166         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
167                 // Seeing a brand new empty block probably means we're
168                 // in a race with CreateBlob, which under the hood
169                 // (apparently) does "CreateEmpty" and "CommitData"
170                 // with no additional transaction locking.
171                 if !haveDeadline {
172                         t, err := v.Mtime(loc)
173                         if err != nil {
174                                 ctxlog.FromContext(ctx).Print("Got empty block (possible race) but Mtime failed: ", err)
175                                 break
176                         }
177                         deadline = t.Add(v.WriteRaceInterval.Duration())
178                         if time.Now().After(deadline) {
179                                 break
180                         }
181                         ctxlog.FromContext(ctx).Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
182                         haveDeadline = true
183                 } else if time.Now().After(deadline) {
184                         break
185                 }
186                 select {
187                 case <-ctx.Done():
188                         return 0, ctx.Err()
189                 case <-time.After(v.WriteRacePollTime.Duration()):
190                 }
191                 size, err = v.get(ctx, loc, buf)
192         }
193         if haveDeadline {
194                 ctxlog.FromContext(ctx).Printf("Race ended with size==%d", size)
195         }
196         return size, err
197 }
198
199 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
200         ctx, cancel := context.WithCancel(ctx)
201         defer cancel()
202
203         pieceSize := BlockSize
204         if v.MaxGetBytes > 0 && v.MaxGetBytes < BlockSize {
205                 pieceSize = v.MaxGetBytes
206         }
207
208         pieces := 1
209         expectSize := len(buf)
210         if pieceSize < BlockSize {
211                 // Unfortunately the handler doesn't tell us how long the blob
212                 // is expected to be, so we have to ask Azure.
213                 props, err := v.container.GetBlobProperties(loc)
214                 if err != nil {
215                         return 0, v.translateError(err)
216                 }
217                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
218                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
219                 }
220                 expectSize = int(props.ContentLength)
221                 pieces = (expectSize + pieceSize - 1) / pieceSize
222         }
223
224         if expectSize == 0 {
225                 return 0, nil
226         }
227
228         // We'll update this actualSize if/when we get the last piece.
229         actualSize := -1
230         errors := make(chan error, pieces)
231         var wg sync.WaitGroup
232         wg.Add(pieces)
233         for p := 0; p < pieces; p++ {
234                 // Each goroutine retrieves one piece. If we hit an
235                 // error, it is sent to the errors chan so get() can
236                 // return it -- but only if the error happens before
237                 // ctx is done. This way, if ctx is done before we hit
238                 // any other error (e.g., requesting client has hung
239                 // up), we return the original ctx.Err() instead of
240                 // the secondary errors from the transfers that got
241                 // interrupted as a result.
242                 go func(p int) {
243                         defer wg.Done()
244                         startPos := p * pieceSize
245                         endPos := startPos + pieceSize
246                         if endPos > expectSize {
247                                 endPos = expectSize
248                         }
249                         var rdr io.ReadCloser
250                         var err error
251                         gotRdr := make(chan struct{})
252                         go func() {
253                                 defer close(gotRdr)
254                                 if startPos == 0 && endPos == expectSize {
255                                         rdr, err = v.container.GetBlob(loc)
256                                 } else {
257                                         rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
258                                 }
259                         }()
260                         select {
261                         case <-ctx.Done():
262                                 go func() {
263                                         <-gotRdr
264                                         if err == nil {
265                                                 rdr.Close()
266                                         }
267                                 }()
268                                 return
269                         case <-gotRdr:
270                         }
271                         if err != nil {
272                                 errors <- err
273                                 cancel()
274                                 return
275                         }
276                         go func() {
277                                 // Close the reader when the client
278                                 // hangs up or another piece fails
279                                 // (possibly interrupting ReadFull())
280                                 // or when all pieces succeed and
281                                 // get() returns.
282                                 <-ctx.Done()
283                                 rdr.Close()
284                         }()
285                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
286                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
287                                 // If we don't know the actual size,
288                                 // and just tried reading 64 MiB, it's
289                                 // normal to encounter EOF.
290                         } else if err != nil {
291                                 if ctx.Err() == nil {
292                                         errors <- err
293                                 }
294                                 cancel()
295                                 return
296                         }
297                         if p == pieces-1 {
298                                 actualSize = startPos + n
299                         }
300                 }(p)
301         }
302         wg.Wait()
303         close(errors)
304         if len(errors) > 0 {
305                 return 0, v.translateError(<-errors)
306         }
307         if ctx.Err() != nil {
308                 return 0, ctx.Err()
309         }
310         return actualSize, nil
311 }
312
313 // Compare the given data with existing stored data.
314 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
315         trashed, _, err := v.checkTrashed(loc)
316         if err != nil {
317                 return err
318         }
319         if trashed {
320                 return os.ErrNotExist
321         }
322         var rdr io.ReadCloser
323         gotRdr := make(chan struct{})
324         go func() {
325                 defer close(gotRdr)
326                 rdr, err = v.container.GetBlob(loc)
327         }()
328         select {
329         case <-ctx.Done():
330                 go func() {
331                         <-gotRdr
332                         if err == nil {
333                                 rdr.Close()
334                         }
335                 }()
336                 return ctx.Err()
337         case <-gotRdr:
338         }
339         if err != nil {
340                 return v.translateError(err)
341         }
342         defer rdr.Close()
343         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
344 }
345
346 // Put stores a Keep block as a block blob in the container.
347 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
348         if v.volume.ReadOnly {
349                 return MethodDisabledError
350         }
351         // Send the block data through a pipe, so that (if we need to)
352         // we can close the pipe early and abandon our
353         // CreateBlockBlobFromReader() goroutine, without worrying
354         // about CreateBlockBlobFromReader() accessing our block
355         // buffer after we release it.
356         bufr, bufw := io.Pipe()
357         go func() {
358                 io.Copy(bufw, bytes.NewReader(block))
359                 bufw.Close()
360         }()
361         errChan := make(chan error)
362         go func() {
363                 var body io.Reader = bufr
364                 if len(block) == 0 {
365                         // We must send a "Content-Length: 0" header,
366                         // but the http client interprets
367                         // ContentLength==0 as "unknown" unless it can
368                         // confirm by introspection that Body will
369                         // read 0 bytes.
370                         body = http.NoBody
371                         bufr.Close()
372                 }
373                 errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
374         }()
375         select {
376         case <-ctx.Done():
377                 ctxlog.FromContext(ctx).Debugf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
378                 // Our pipe might be stuck in Write(), waiting for
379                 // io.Copy() to read. If so, un-stick it. This means
380                 // CreateBlockBlobFromReader will get corrupt data,
381                 // but that's OK: the size won't match, so the write
382                 // will fail.
383                 go io.Copy(ioutil.Discard, bufr)
384                 // CloseWithError() will return once pending I/O is done.
385                 bufw.CloseWithError(ctx.Err())
386                 ctxlog.FromContext(ctx).Debugf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
387                 return ctx.Err()
388         case err := <-errChan:
389                 return err
390         }
391 }
392
393 // Touch updates the last-modified property of a block blob.
394 func (v *AzureBlobVolume) Touch(loc string) error {
395         if v.volume.ReadOnly {
396                 return MethodDisabledError
397         }
398         trashed, metadata, err := v.checkTrashed(loc)
399         if err != nil {
400                 return err
401         }
402         if trashed {
403                 return os.ErrNotExist
404         }
405
406         metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
407         return v.container.SetBlobMetadata(loc, metadata, nil)
408 }
409
410 // Mtime returns the last-modified property of a block blob.
411 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
412         trashed, _, err := v.checkTrashed(loc)
413         if err != nil {
414                 return time.Time{}, err
415         }
416         if trashed {
417                 return time.Time{}, os.ErrNotExist
418         }
419
420         props, err := v.container.GetBlobProperties(loc)
421         if err != nil {
422                 return time.Time{}, err
423         }
424         return time.Time(props.LastModified), nil
425 }
426
427 // IndexTo writes a list of Keep blocks that are stored in the
428 // container.
429 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
430         params := storage.ListBlobsParameters{
431                 Prefix:  prefix,
432                 Include: &storage.IncludeBlobDataset{Metadata: true},
433         }
434         for page := 1; ; page++ {
435                 resp, err := v.listBlobs(page, params)
436                 if err != nil {
437                         return err
438                 }
439                 for _, b := range resp.Blobs {
440                         if !v.isKeepBlock(b.Name) {
441                                 continue
442                         }
443                         modtime := time.Time(b.Properties.LastModified)
444                         if b.Properties.ContentLength == 0 && modtime.Add(v.WriteRaceInterval.Duration()).After(time.Now()) {
445                                 // A new zero-length blob is probably
446                                 // just a new non-empty blob that
447                                 // hasn't committed its data yet (see
448                                 // Get()), and in any case has no
449                                 // value.
450                                 continue
451                         }
452                         if b.Metadata["expires_at"] != "" {
453                                 // Trashed blob; exclude it from response
454                                 continue
455                         }
456                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
457                 }
458                 if resp.NextMarker == "" {
459                         return nil
460                 }
461                 params.Marker = resp.NextMarker
462         }
463 }
464
465 // call v.container.ListBlobs, retrying if needed.
466 func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
467         for i := 0; i < v.ListBlobsMaxAttempts; i++ {
468                 resp, err = v.container.ListBlobs(params)
469                 err = v.translateError(err)
470                 if err == VolumeBusyError {
471                         v.logger.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err)
472                         time.Sleep(time.Duration(v.ListBlobsRetryDelay))
473                         continue
474                 } else {
475                         break
476                 }
477         }
478         return
479 }
480
481 // Trash a Keep block.
482 func (v *AzureBlobVolume) Trash(loc string) error {
483         if v.volume.ReadOnly {
484                 return MethodDisabledError
485         }
486
487         // Ideally we would use If-Unmodified-Since, but that
488         // particular condition seems to be ignored by Azure. Instead,
489         // we get the Etag before checking Mtime, and use If-Match to
490         // ensure we don't delete data if Put() or Touch() happens
491         // between our calls to Mtime() and DeleteBlob().
492         props, err := v.container.GetBlobProperties(loc)
493         if err != nil {
494                 return err
495         }
496         if t, err := v.Mtime(loc); err != nil {
497                 return err
498         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
499                 return nil
500         }
501
502         // If BlobTrashLifetime == 0, just delete it
503         if v.cluster.Collections.BlobTrashLifetime == 0 {
504                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
505                         IfMatch: props.Etag,
506                 })
507         }
508
509         // Otherwise, mark as trash
510         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
511                 "expires_at": fmt.Sprintf("%d", time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix()),
512         }, &storage.SetBlobMetadataOptions{
513                 IfMatch: props.Etag,
514         })
515 }
516
517 // Untrash a Keep block.
518 // Delete the expires_at metadata attribute
519 func (v *AzureBlobVolume) Untrash(loc string) error {
520         // if expires_at does not exist, return NotFoundError
521         metadata, err := v.container.GetBlobMetadata(loc)
522         if err != nil {
523                 return v.translateError(err)
524         }
525         if metadata["expires_at"] == "" {
526                 return os.ErrNotExist
527         }
528
529         // reset expires_at metadata attribute
530         metadata["expires_at"] = ""
531         err = v.container.SetBlobMetadata(loc, metadata, nil)
532         return v.translateError(err)
533 }
534
535 // Status returns a VolumeStatus struct with placeholder data.
536 func (v *AzureBlobVolume) Status() *VolumeStatus {
537         return &VolumeStatus{
538                 DeviceNum: 1,
539                 BytesFree: BlockSize * 1000,
540                 BytesUsed: 1,
541         }
542 }
543
544 // String returns a volume label, including the container name.
545 func (v *AzureBlobVolume) String() string {
546         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
547 }
548
549 // If possible, translate an Azure SDK error to a recognizable error
550 // like os.ErrNotExist.
551 func (v *AzureBlobVolume) translateError(err error) error {
552         switch {
553         case err == nil:
554                 return err
555         case strings.Contains(err.Error(), "StatusCode=503"):
556                 // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
557                 return VolumeBusyError
558         case strings.Contains(err.Error(), "Not Found"):
559                 // "storage: service returned without a response body (404 Not Found)"
560                 return os.ErrNotExist
561         default:
562                 return err
563         }
564 }
565
566 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
567
568 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
569         return keepBlockRegexp.MatchString(s)
570 }
571
572 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
573 // and deletes them from the volume.
574 func (v *AzureBlobVolume) EmptyTrash() {
575         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
576                 return
577         }
578
579         var bytesDeleted, bytesInTrash int64
580         var blocksDeleted, blocksInTrash int64
581
582         doBlob := func(b storage.Blob) {
583                 // Check whether the block is flagged as trash
584                 if b.Metadata["expires_at"] == "" {
585                         return
586                 }
587
588                 atomic.AddInt64(&blocksInTrash, 1)
589                 atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
590
591                 expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
592                 if err != nil {
593                         v.logger.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
594                         return
595                 }
596
597                 if expiresAt > time.Now().Unix() {
598                         return
599                 }
600
601                 err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
602                         IfMatch: b.Properties.Etag,
603                 })
604                 if err != nil {
605                         v.logger.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
606                         return
607                 }
608                 atomic.AddInt64(&blocksDeleted, 1)
609                 atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
610         }
611
612         var wg sync.WaitGroup
613         todo := make(chan storage.Blob, v.cluster.Collections.BlobDeleteConcurrency)
614         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
615                 wg.Add(1)
616                 go func() {
617                         defer wg.Done()
618                         for b := range todo {
619                                 doBlob(b)
620                         }
621                 }()
622         }
623
624         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
625         for page := 1; ; page++ {
626                 resp, err := v.listBlobs(page, params)
627                 if err != nil {
628                         v.logger.Printf("EmptyTrash: ListBlobs: %v", err)
629                         break
630                 }
631                 for _, b := range resp.Blobs {
632                         todo <- b
633                 }
634                 if resp.NextMarker == "" {
635                         break
636                 }
637                 params.Marker = resp.NextMarker
638         }
639         close(todo)
640         wg.Wait()
641
642         v.logger.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
643 }
644
645 // InternalStats returns bucket I/O and API call counters.
646 func (v *AzureBlobVolume) InternalStats() interface{} {
647         return &v.container.stats
648 }
649
650 type azureBlobStats struct {
651         statsTicker
652         Ops              uint64
653         GetOps           uint64
654         GetRangeOps      uint64
655         GetMetadataOps   uint64
656         GetPropertiesOps uint64
657         CreateOps        uint64
658         SetMetadataOps   uint64
659         DelOps           uint64
660         ListOps          uint64
661 }
662
663 func (s *azureBlobStats) TickErr(err error) {
664         if err == nil {
665                 return
666         }
667         errType := fmt.Sprintf("%T", err)
668         if err, ok := err.(storage.AzureStorageServiceError); ok {
669                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
670         }
671         s.statsTicker.TickErr(err, errType)
672 }
673
674 // azureContainer wraps storage.Container in order to count I/O and
675 // API usage stats.
676 type azureContainer struct {
677         ctr   *storage.Container
678         stats azureBlobStats
679 }
680
681 func (c *azureContainer) Exists() (bool, error) {
682         c.stats.TickOps("exists")
683         c.stats.Tick(&c.stats.Ops)
684         ok, err := c.ctr.Exists()
685         c.stats.TickErr(err)
686         return ok, err
687 }
688
689 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
690         c.stats.TickOps("get_metadata")
691         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
692         b := c.ctr.GetBlobReference(bname)
693         err := b.GetMetadata(nil)
694         c.stats.TickErr(err)
695         return b.Metadata, err
696 }
697
698 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
699         c.stats.TickOps("get_properties")
700         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
701         b := c.ctr.GetBlobReference(bname)
702         err := b.GetProperties(nil)
703         c.stats.TickErr(err)
704         return &b.Properties, err
705 }
706
707 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
708         c.stats.TickOps("get")
709         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
710         b := c.ctr.GetBlobReference(bname)
711         rdr, err := b.Get(nil)
712         c.stats.TickErr(err)
713         return NewCountingReader(rdr, c.stats.TickInBytes), err
714 }
715
716 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
717         c.stats.TickOps("get_range")
718         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
719         b := c.ctr.GetBlobReference(bname)
720         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
721                 Range: &storage.BlobRange{
722                         Start: uint64(start),
723                         End:   uint64(end),
724                 },
725                 GetBlobOptions: opts,
726         })
727         c.stats.TickErr(err)
728         return NewCountingReader(rdr, c.stats.TickInBytes), err
729 }
730
731 // If we give it an io.Reader that doesn't also have a Len() int
732 // method, the Azure SDK determines data size by copying the data into
733 // a new buffer, which is not a good use of memory.
734 type readerWithAzureLen struct {
735         io.Reader
736         len int
737 }
738
739 // Len satisfies the private lener interface in azure-sdk-for-go.
740 func (r *readerWithAzureLen) Len() int {
741         return r.len
742 }
743
744 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
745         c.stats.TickOps("create")
746         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
747         if size != 0 {
748                 rdr = &readerWithAzureLen{
749                         Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
750                         len:    size,
751                 }
752         }
753         b := c.ctr.GetBlobReference(bname)
754         err := b.CreateBlockBlobFromReader(rdr, opts)
755         c.stats.TickErr(err)
756         return err
757 }
758
759 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
760         c.stats.TickOps("set_metadata")
761         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
762         b := c.ctr.GetBlobReference(bname)
763         b.Metadata = m
764         err := b.SetMetadata(opts)
765         c.stats.TickErr(err)
766         return err
767 }
768
769 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
770         c.stats.TickOps("list")
771         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
772         resp, err := c.ctr.ListBlobs(params)
773         c.stats.TickErr(err)
774         return resp, err
775 }
776
777 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
778         c.stats.TickOps("delete")
779         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
780         b := c.ctr.GetBlobReference(bname)
781         err := b.Delete(opts)
782         c.stats.TickErr(err)
783         return err
784 }