21943: Add test for files appearing multiple times in output
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "context"
9         "encoding/json"
10         "errors"
11         "fmt"
12         "io"
13         "net/http"
14         "os"
15         "regexp"
16         "strconv"
17         "strings"
18         "sync"
19         "sync/atomic"
20         "time"
21
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/ctxlog"
24         "github.com/Azure/azure-sdk-for-go/storage"
25         "github.com/prometheus/client_golang/prometheus"
26         "github.com/sirupsen/logrus"
27 )
28
29 func init() {
30         driver["Azure"] = newAzureBlobVolume
31 }
32
33 func newAzureBlobVolume(params newVolumeParams) (volume, error) {
34         v := &azureBlobVolume{
35                 RequestTimeout:    azureDefaultRequestTimeout,
36                 WriteRaceInterval: azureDefaultWriteRaceInterval,
37                 WriteRacePollTime: azureDefaultWriteRacePollTime,
38                 cluster:           params.Cluster,
39                 volume:            params.ConfigVolume,
40                 logger:            params.Logger,
41                 metrics:           params.MetricsVecs,
42                 bufferPool:        params.BufferPool,
43         }
44         err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
45         if err != nil {
46                 return nil, err
47         }
48         if v.ListBlobsRetryDelay == 0 {
49                 v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay
50         }
51         if v.ListBlobsMaxAttempts == 0 {
52                 v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts
53         }
54         if v.StorageBaseURL == "" {
55                 v.StorageBaseURL = storage.DefaultBaseURL
56         }
57         if v.ContainerName == "" || v.StorageAccountName == "" || v.StorageAccountKey == "" {
58                 return nil, errors.New("DriverParameters: ContainerName, StorageAccountName, and StorageAccountKey must be provided")
59         }
60         azc, err := storage.NewClient(v.StorageAccountName, v.StorageAccountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
61         if err != nil {
62                 return nil, fmt.Errorf("creating Azure storage client: %s", err)
63         }
64         v.azClient = azc
65         v.azClient.Sender = &singleSender{}
66         v.azClient.HTTPClient = &http.Client{
67                 Timeout: time.Duration(v.RequestTimeout),
68         }
69         bs := v.azClient.GetBlobService()
70         v.container = &azureContainer{
71                 ctr: bs.GetContainerReference(v.ContainerName),
72         }
73
74         if ok, err := v.container.Exists(); err != nil {
75                 return nil, err
76         } else if !ok {
77                 return nil, fmt.Errorf("Azure container %q does not exist: %s", v.ContainerName, err)
78         }
79         return v, v.check()
80 }
81
82 func (v *azureBlobVolume) check() error {
83         lbls := prometheus.Labels{"device_id": v.DeviceID()}
84         v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
85         return nil
86 }
87
88 const (
89         azureDefaultRequestTimeout       = arvados.Duration(10 * time.Minute)
90         azureDefaultListBlobsMaxAttempts = 12
91         azureDefaultListBlobsRetryDelay  = arvados.Duration(10 * time.Second)
92         azureDefaultWriteRaceInterval    = arvados.Duration(15 * time.Second)
93         azureDefaultWriteRacePollTime    = arvados.Duration(time.Second)
94 )
95
96 // An azureBlobVolume stores and retrieves blocks in an Azure Blob
97 // container.
98 type azureBlobVolume struct {
99         StorageAccountName   string
100         StorageAccountKey    string
101         StorageBaseURL       string // "" means default, "core.windows.net"
102         ContainerName        string
103         RequestTimeout       arvados.Duration
104         ListBlobsRetryDelay  arvados.Duration
105         ListBlobsMaxAttempts int
106         MaxGetBytes          int
107         WriteRaceInterval    arvados.Duration
108         WriteRacePollTime    arvados.Duration
109
110         cluster    *arvados.Cluster
111         volume     arvados.Volume
112         logger     logrus.FieldLogger
113         metrics    *volumeMetricsVecs
114         bufferPool *bufferPool
115         azClient   storage.Client
116         container  *azureContainer
117 }
118
119 // singleSender is a single-attempt storage.Sender.
120 type singleSender struct{}
121
122 // Send performs req exactly once.
123 func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
124         return c.HTTPClient.Do(req)
125 }
126
127 // DeviceID returns a globally unique ID for the storage container.
128 func (v *azureBlobVolume) DeviceID() string {
129         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
130 }
131
132 // Return true if expires_at metadata attribute is found on the block
133 func (v *azureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
134         metadata, err := v.container.GetBlobMetadata(loc)
135         if err != nil {
136                 return false, metadata, v.translateError(err)
137         }
138         if metadata["expires_at"] != "" {
139                 return true, metadata, nil
140         }
141         return false, metadata, nil
142 }
143
144 // BlockRead reads a Keep block that has been stored as a block blob
145 // in the container.
146 //
147 // If the block is younger than azureWriteRaceInterval and is
148 // unexpectedly empty, assume a BlockWrite operation is in progress,
149 // and wait for it to finish writing.
150 func (v *azureBlobVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
151         trashed, _, err := v.checkTrashed(hash)
152         if err != nil {
153                 return err
154         }
155         if trashed {
156                 return os.ErrNotExist
157         }
158         buf, err := v.bufferPool.GetContext(ctx)
159         if err != nil {
160                 return err
161         }
162         defer v.bufferPool.Put(buf)
163         var deadline time.Time
164         wrote, err := v.get(ctx, hash, w)
165         for err == nil && wrote == 0 && hash != "d41d8cd98f00b204e9800998ecf8427e" {
166                 // Seeing a brand new empty block probably means we're
167                 // in a race with CreateBlob, which under the hood
168                 // (apparently) does "CreateEmpty" and "CommitData"
169                 // with no additional transaction locking.
170                 if deadline.IsZero() {
171                         t, err := v.Mtime(hash)
172                         if err != nil {
173                                 ctxlog.FromContext(ctx).Print("Got empty block (possible race) but Mtime failed: ", err)
174                                 break
175                         }
176                         deadline = t.Add(v.WriteRaceInterval.Duration())
177                         if time.Now().After(deadline) {
178                                 break
179                         }
180                         ctxlog.FromContext(ctx).Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", hash, time.Since(t), deadline)
181                 } else if time.Now().After(deadline) {
182                         break
183                 }
184                 select {
185                 case <-ctx.Done():
186                         return ctx.Err()
187                 case <-time.After(v.WriteRacePollTime.Duration()):
188                 }
189                 wrote, err = v.get(ctx, hash, w)
190         }
191         if !deadline.IsZero() {
192                 ctxlog.FromContext(ctx).Printf("Race ended with size==%d", wrote)
193         }
194         return err
195 }
196
197 func (v *azureBlobVolume) get(ctx context.Context, hash string, dst io.WriterAt) (int, error) {
198         ctx, cancel := context.WithCancel(ctx)
199         defer cancel()
200
201         pieceSize := BlockSize
202         if v.MaxGetBytes > 0 && v.MaxGetBytes < BlockSize {
203                 pieceSize = v.MaxGetBytes
204         }
205
206         pieces := 1
207         expectSize := BlockSize
208         sizeKnown := false
209         if pieceSize < BlockSize {
210                 // Unfortunately the handler doesn't tell us how long
211                 // the blob is expected to be, so we have to ask
212                 // Azure.
213                 props, err := v.container.GetBlobProperties(hash)
214                 if err != nil {
215                         return 0, v.translateError(err)
216                 }
217                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
218                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", hash, props.ContentLength, BlockSize)
219                 }
220                 expectSize = int(props.ContentLength)
221                 pieces = (expectSize + pieceSize - 1) / pieceSize
222                 sizeKnown = true
223         }
224
225         if expectSize == 0 {
226                 return 0, nil
227         }
228
229         errors := make(chan error, pieces)
230         var wrote atomic.Int64
231         var wg sync.WaitGroup
232         wg.Add(pieces)
233         for p := 0; p < pieces; p++ {
234                 // Each goroutine retrieves one piece. If we hit an
235                 // error, it is sent to the errors chan so get() can
236                 // return it -- but only if the error happens before
237                 // ctx is done. This way, if ctx is done before we hit
238                 // any other error (e.g., requesting client has hung
239                 // up), we return the original ctx.Err() instead of
240                 // the secondary errors from the transfers that got
241                 // interrupted as a result.
242                 go func(p int) {
243                         defer wg.Done()
244                         startPos := p * pieceSize
245                         endPos := startPos + pieceSize
246                         if endPos > expectSize {
247                                 endPos = expectSize
248                         }
249                         var rdr io.ReadCloser
250                         var err error
251                         gotRdr := make(chan struct{})
252                         go func() {
253                                 defer close(gotRdr)
254                                 if startPos == 0 && endPos == expectSize {
255                                         rdr, err = v.container.GetBlob(hash)
256                                 } else {
257                                         rdr, err = v.container.GetBlobRange(hash, startPos, endPos-1, nil)
258                                 }
259                         }()
260                         select {
261                         case <-ctx.Done():
262                                 go func() {
263                                         <-gotRdr
264                                         if err == nil {
265                                                 rdr.Close()
266                                         }
267                                 }()
268                                 return
269                         case <-gotRdr:
270                         }
271                         if err != nil {
272                                 errors <- err
273                                 cancel()
274                                 return
275                         }
276                         go func() {
277                                 // Close the reader when the client
278                                 // hangs up or another piece fails
279                                 // (possibly interrupting ReadFull())
280                                 // or when all pieces succeed and
281                                 // get() returns.
282                                 <-ctx.Done()
283                                 rdr.Close()
284                         }()
285                         n, err := io.CopyN(io.NewOffsetWriter(dst, int64(startPos)), rdr, int64(endPos-startPos))
286                         wrote.Add(n)
287                         if pieces == 1 && !sizeKnown && (err == io.ErrUnexpectedEOF || err == io.EOF) {
288                                 // If we don't know the actual size,
289                                 // and just tried reading 64 MiB, it's
290                                 // normal to encounter EOF.
291                         } else if err != nil {
292                                 errors <- err
293                                 cancel()
294                                 return
295                         }
296                 }(p)
297         }
298         wg.Wait()
299         close(errors)
300         if len(errors) > 0 {
301                 return int(wrote.Load()), v.translateError(<-errors)
302         }
303         return int(wrote.Load()), ctx.Err()
304 }
305
306 // BlockWrite stores a block on the volume. If it already exists, its
307 // timestamp is updated.
308 func (v *azureBlobVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
309         // Send the block data through a pipe, so that (if we need to)
310         // we can close the pipe early and abandon our
311         // CreateBlockBlobFromReader() goroutine, without worrying
312         // about CreateBlockBlobFromReader() accessing our data
313         // buffer after we release it.
314         bufr, bufw := io.Pipe()
315         go func() {
316                 bufw.Write(data)
317                 bufw.Close()
318         }()
319         errChan := make(chan error, 1)
320         go func() {
321                 var body io.Reader = bufr
322                 if len(data) == 0 {
323                         // We must send a "Content-Length: 0" header,
324                         // but the http client interprets
325                         // ContentLength==0 as "unknown" unless it can
326                         // confirm by introspection that Body will
327                         // read 0 bytes.
328                         body = http.NoBody
329                         bufr.Close()
330                 }
331                 errChan <- v.container.CreateBlockBlobFromReader(hash, len(data), body, nil)
332         }()
333         select {
334         case <-ctx.Done():
335                 ctxlog.FromContext(ctx).Debugf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
336                 // bufw.CloseWithError() interrupts bufw.Write() if
337                 // necessary, ensuring CreateBlockBlobFromReader can't
338                 // read any more of our data slice via bufr after we
339                 // return.
340                 bufw.CloseWithError(ctx.Err())
341                 ctxlog.FromContext(ctx).Debugf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
342                 return ctx.Err()
343         case err := <-errChan:
344                 return err
345         }
346 }
347
348 // BlockTouch updates the last-modified property of a block blob.
349 func (v *azureBlobVolume) BlockTouch(hash string) error {
350         trashed, metadata, err := v.checkTrashed(hash)
351         if err != nil {
352                 return err
353         }
354         if trashed {
355                 return os.ErrNotExist
356         }
357
358         metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
359         return v.container.SetBlobMetadata(hash, metadata, nil)
360 }
361
362 // Mtime returns the last-modified property of a block blob.
363 func (v *azureBlobVolume) Mtime(hash string) (time.Time, error) {
364         trashed, _, err := v.checkTrashed(hash)
365         if err != nil {
366                 return time.Time{}, err
367         }
368         if trashed {
369                 return time.Time{}, os.ErrNotExist
370         }
371
372         props, err := v.container.GetBlobProperties(hash)
373         if err != nil {
374                 return time.Time{}, err
375         }
376         return time.Time(props.LastModified), nil
377 }
378
379 // Index writes a list of Keep blocks that are stored in the
380 // container.
381 func (v *azureBlobVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
382         params := storage.ListBlobsParameters{
383                 Prefix:  prefix,
384                 Include: &storage.IncludeBlobDataset{Metadata: true},
385         }
386         for page := 1; ; page++ {
387                 err := ctx.Err()
388                 if err != nil {
389                         return err
390                 }
391                 resp, err := v.listBlobs(page, params)
392                 if err != nil {
393                         return err
394                 }
395                 for _, b := range resp.Blobs {
396                         if !v.isKeepBlock(b.Name) {
397                                 continue
398                         }
399                         modtime := time.Time(b.Properties.LastModified)
400                         if b.Properties.ContentLength == 0 && modtime.Add(v.WriteRaceInterval.Duration()).After(time.Now()) {
401                                 // A new zero-length blob is probably
402                                 // just a new non-empty blob that
403                                 // hasn't committed its data yet (see
404                                 // Get()), and in any case has no
405                                 // value.
406                                 continue
407                         }
408                         if b.Metadata["expires_at"] != "" {
409                                 // Trashed blob; exclude it from response
410                                 continue
411                         }
412                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
413                 }
414                 if resp.NextMarker == "" {
415                         return nil
416                 }
417                 params.Marker = resp.NextMarker
418         }
419 }
420
421 // call v.container.ListBlobs, retrying if needed.
422 func (v *azureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
423         for i := 0; i < v.ListBlobsMaxAttempts; i++ {
424                 resp, err = v.container.ListBlobs(params)
425                 err = v.translateError(err)
426                 if err == errVolumeUnavailable {
427                         v.logger.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err)
428                         time.Sleep(time.Duration(v.ListBlobsRetryDelay))
429                         continue
430                 } else {
431                         break
432                 }
433         }
434         return
435 }
436
437 // Trash a Keep block.
438 func (v *azureBlobVolume) BlockTrash(loc string) error {
439         // Ideally we would use If-Unmodified-Since, but that
440         // particular condition seems to be ignored by Azure. Instead,
441         // we get the Etag before checking Mtime, and use If-Match to
442         // ensure we don't delete data if Put() or Touch() happens
443         // between our calls to Mtime() and DeleteBlob().
444         props, err := v.container.GetBlobProperties(loc)
445         if err != nil {
446                 return err
447         }
448         if t, err := v.Mtime(loc); err != nil {
449                 return err
450         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
451                 return nil
452         }
453
454         // If BlobTrashLifetime == 0, just delete it
455         if v.cluster.Collections.BlobTrashLifetime == 0 {
456                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
457                         IfMatch: props.Etag,
458                 })
459         }
460
461         // Otherwise, mark as trash
462         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
463                 "expires_at": fmt.Sprintf("%d", time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix()),
464         }, &storage.SetBlobMetadataOptions{
465                 IfMatch: props.Etag,
466         })
467 }
468
469 // BlockUntrash deletes the expires_at metadata attribute for the
470 // specified block blob.
471 func (v *azureBlobVolume) BlockUntrash(hash string) error {
472         // if expires_at does not exist, return NotFoundError
473         metadata, err := v.container.GetBlobMetadata(hash)
474         if err != nil {
475                 return v.translateError(err)
476         }
477         if metadata["expires_at"] == "" {
478                 return os.ErrNotExist
479         }
480
481         // reset expires_at metadata attribute
482         metadata["expires_at"] = ""
483         err = v.container.SetBlobMetadata(hash, metadata, nil)
484         return v.translateError(err)
485 }
486
487 // If possible, translate an Azure SDK error to a recognizable error
488 // like os.ErrNotExist.
489 func (v *azureBlobVolume) translateError(err error) error {
490         switch {
491         case err == nil:
492                 return err
493         case strings.Contains(err.Error(), "StatusCode=503"):
494                 // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
495                 return errVolumeUnavailable
496         case strings.Contains(err.Error(), "Not Found"):
497                 // "storage: service returned without a response body (404 Not Found)"
498                 return os.ErrNotExist
499         case strings.Contains(err.Error(), "ErrorCode=BlobNotFound"):
500                 // "storage: service returned error: StatusCode=404, ErrorCode=BlobNotFound, ErrorMessage=The specified blob does not exist.\n..."
501                 return os.ErrNotExist
502         default:
503                 return err
504         }
505 }
506
507 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
508
509 func (v *azureBlobVolume) isKeepBlock(s string) bool {
510         return keepBlockRegexp.MatchString(s)
511 }
512
513 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
514 // and deletes them from the volume.
515 func (v *azureBlobVolume) EmptyTrash() {
516         var bytesDeleted, bytesInTrash int64
517         var blocksDeleted, blocksInTrash int64
518
519         doBlob := func(b storage.Blob) {
520                 // Check whether the block is flagged as trash
521                 if b.Metadata["expires_at"] == "" {
522                         return
523                 }
524
525                 atomic.AddInt64(&blocksInTrash, 1)
526                 atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
527
528                 expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
529                 if err != nil {
530                         v.logger.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
531                         return
532                 }
533
534                 if expiresAt > time.Now().Unix() {
535                         return
536                 }
537
538                 err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
539                         IfMatch: b.Properties.Etag,
540                 })
541                 if err != nil {
542                         v.logger.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
543                         return
544                 }
545                 atomic.AddInt64(&blocksDeleted, 1)
546                 atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
547         }
548
549         var wg sync.WaitGroup
550         todo := make(chan storage.Blob, v.cluster.Collections.BlobDeleteConcurrency)
551         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
552                 wg.Add(1)
553                 go func() {
554                         defer wg.Done()
555                         for b := range todo {
556                                 doBlob(b)
557                         }
558                 }()
559         }
560
561         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
562         for page := 1; ; page++ {
563                 resp, err := v.listBlobs(page, params)
564                 if err != nil {
565                         v.logger.Printf("EmptyTrash: ListBlobs: %v", err)
566                         break
567                 }
568                 for _, b := range resp.Blobs {
569                         todo <- b
570                 }
571                 if resp.NextMarker == "" {
572                         break
573                 }
574                 params.Marker = resp.NextMarker
575         }
576         close(todo)
577         wg.Wait()
578
579         v.logger.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.DeviceID(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
580 }
581
582 // InternalStats returns bucket I/O and API call counters.
583 func (v *azureBlobVolume) InternalStats() interface{} {
584         return &v.container.stats
585 }
586
587 type azureBlobStats struct {
588         statsTicker
589         Ops              uint64
590         GetOps           uint64
591         GetRangeOps      uint64
592         GetMetadataOps   uint64
593         GetPropertiesOps uint64
594         CreateOps        uint64
595         SetMetadataOps   uint64
596         DelOps           uint64
597         ListOps          uint64
598 }
599
600 func (s *azureBlobStats) TickErr(err error) {
601         if err == nil {
602                 return
603         }
604         errType := fmt.Sprintf("%T", err)
605         if err, ok := err.(storage.AzureStorageServiceError); ok {
606                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
607         }
608         s.statsTicker.TickErr(err, errType)
609 }
610
611 // azureContainer wraps storage.Container in order to count I/O and
612 // API usage stats.
613 type azureContainer struct {
614         ctr   *storage.Container
615         stats azureBlobStats
616 }
617
618 func (c *azureContainer) Exists() (bool, error) {
619         c.stats.TickOps("exists")
620         c.stats.Tick(&c.stats.Ops)
621         ok, err := c.ctr.Exists()
622         c.stats.TickErr(err)
623         return ok, err
624 }
625
626 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
627         c.stats.TickOps("get_metadata")
628         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
629         b := c.ctr.GetBlobReference(bname)
630         err := b.GetMetadata(nil)
631         c.stats.TickErr(err)
632         return b.Metadata, err
633 }
634
635 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
636         c.stats.TickOps("get_properties")
637         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
638         b := c.ctr.GetBlobReference(bname)
639         err := b.GetProperties(nil)
640         c.stats.TickErr(err)
641         return &b.Properties, err
642 }
643
644 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
645         c.stats.TickOps("get")
646         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
647         b := c.ctr.GetBlobReference(bname)
648         rdr, err := b.Get(nil)
649         c.stats.TickErr(err)
650         return newCountingReader(rdr, c.stats.TickInBytes), err
651 }
652
653 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
654         c.stats.TickOps("get_range")
655         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
656         b := c.ctr.GetBlobReference(bname)
657         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
658                 Range: &storage.BlobRange{
659                         Start: uint64(start),
660                         End:   uint64(end),
661                 },
662                 GetBlobOptions: opts,
663         })
664         c.stats.TickErr(err)
665         return newCountingReader(rdr, c.stats.TickInBytes), err
666 }
667
668 // If we give it an io.Reader that doesn't also have a Len() int
669 // method, the Azure SDK determines data size by copying the data into
670 // a new buffer, which is not a good use of memory.
671 type readerWithAzureLen struct {
672         io.Reader
673         len int
674 }
675
676 // Len satisfies the private lener interface in azure-sdk-for-go.
677 func (r *readerWithAzureLen) Len() int {
678         return r.len
679 }
680
681 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
682         c.stats.TickOps("create")
683         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
684         if size != 0 {
685                 rdr = &readerWithAzureLen{
686                         Reader: newCountingReader(rdr, c.stats.TickOutBytes),
687                         len:    size,
688                 }
689         }
690         b := c.ctr.GetBlobReference(bname)
691         err := b.CreateBlockBlobFromReader(rdr, opts)
692         c.stats.TickErr(err)
693         return err
694 }
695
696 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
697         c.stats.TickOps("set_metadata")
698         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
699         b := c.ctr.GetBlobReference(bname)
700         b.Metadata = m
701         err := b.SetMetadata(opts)
702         c.stats.TickErr(err)
703         return err
704 }
705
706 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
707         c.stats.TickOps("list")
708         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
709         resp, err := c.ctr.ListBlobs(params)
710         c.stats.TickErr(err)
711         return resp, err
712 }
713
714 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
715         c.stats.TickOps("delete")
716         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
717         b := c.ctr.GetBlobReference(bname)
718         err := b.Delete(opts)
719         c.stats.TickErr(err)
720         return err
721 }