14199: Merge branch 'master' into 14199-copy-from-remote
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "github.com/Azure/azure-sdk-for-go/storage"
26 )
27
28 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
29
30 var (
31         azureMaxGetBytes           int
32         azureStorageAccountName    string
33         azureStorageAccountKeyFile string
34         azureStorageReplication    int
35         azureWriteRaceInterval     = 15 * time.Second
36         azureWriteRacePollTime     = time.Second
37 )
38
39 func readKeyFromFile(file string) (string, error) {
40         buf, err := ioutil.ReadFile(file)
41         if err != nil {
42                 return "", errors.New("reading key from " + file + ": " + err.Error())
43         }
44         accountKey := strings.TrimSpace(string(buf))
45         if accountKey == "" {
46                 return "", errors.New("empty account key in " + file)
47         }
48         return accountKey, nil
49 }
50
51 type azureVolumeAdder struct {
52         *Config
53 }
54
55 // String implements flag.Value
56 func (s *azureVolumeAdder) String() string {
57         return "-"
58 }
59
60 func (s *azureVolumeAdder) Set(containerName string) error {
61         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
62                 ContainerName:         containerName,
63                 StorageAccountName:    azureStorageAccountName,
64                 StorageAccountKeyFile: azureStorageAccountKeyFile,
65                 AzureReplication:      azureStorageReplication,
66                 ReadOnly:              deprecated.flagReadonly,
67         })
68         return nil
69 }
70
71 func init() {
72         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
73
74         flag.Var(&azureVolumeAdder{theConfig},
75                 "azure-storage-container-volume",
76                 "Use the given container as a storage volume. Can be given multiple times.")
77         flag.StringVar(
78                 &azureStorageAccountName,
79                 "azure-storage-account-name",
80                 "",
81                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
82         flag.StringVar(
83                 &azureStorageAccountKeyFile,
84                 "azure-storage-account-key-file",
85                 "",
86                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
87         flag.IntVar(
88                 &azureStorageReplication,
89                 "azure-storage-replication",
90                 3,
91                 "Replication level to report to clients when data is stored in an Azure container.")
92         flag.IntVar(
93                 &azureMaxGetBytes,
94                 "azure-max-get-bytes",
95                 BlockSize,
96                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
97 }
98
99 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
100 // container.
101 type AzureBlobVolume struct {
102         StorageAccountName    string
103         StorageAccountKeyFile string
104         StorageBaseURL        string // "" means default, "core.windows.net"
105         ContainerName         string
106         AzureReplication      int
107         ReadOnly              bool
108         RequestTimeout        arvados.Duration
109         StorageClasses        []string
110
111         azClient  storage.Client
112         container *azureContainer
113 }
114
115 // singleSender is a single-attempt storage.Sender.
116 type singleSender struct{}
117
118 // Send performs req exactly once.
119 func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
120         return c.HTTPClient.Do(req)
121 }
122
123 // Examples implements VolumeWithExamples.
124 func (*AzureBlobVolume) Examples() []Volume {
125         return []Volume{
126                 &AzureBlobVolume{
127                         StorageAccountName:    "example-account-name",
128                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
129                         ContainerName:         "example-container-name",
130                         AzureReplication:      3,
131                         RequestTimeout:        azureDefaultRequestTimeout,
132                 },
133                 &AzureBlobVolume{
134                         StorageAccountName:    "cn-account-name",
135                         StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
136                         StorageBaseURL:        "core.chinacloudapi.cn",
137                         ContainerName:         "cn-container-name",
138                         AzureReplication:      3,
139                         RequestTimeout:        azureDefaultRequestTimeout,
140                 },
141         }
142 }
143
144 // Type implements Volume.
145 func (v *AzureBlobVolume) Type() string {
146         return "Azure"
147 }
148
149 // Start implements Volume.
150 func (v *AzureBlobVolume) Start() error {
151         if v.ContainerName == "" {
152                 return errors.New("no container name given")
153         }
154         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
155                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
156         }
157         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
158         if err != nil {
159                 return err
160         }
161         if v.StorageBaseURL == "" {
162                 v.StorageBaseURL = storage.DefaultBaseURL
163         }
164         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
165         if err != nil {
166                 return fmt.Errorf("creating Azure storage client: %s", err)
167         }
168         v.azClient.Sender = &singleSender{}
169
170         if v.RequestTimeout == 0 {
171                 v.RequestTimeout = azureDefaultRequestTimeout
172         }
173         v.azClient.HTTPClient = &http.Client{
174                 Timeout: time.Duration(v.RequestTimeout),
175         }
176         bs := v.azClient.GetBlobService()
177         v.container = &azureContainer{
178                 ctr: bs.GetContainerReference(v.ContainerName),
179         }
180
181         if ok, err := v.container.Exists(); err != nil {
182                 return err
183         } else if !ok {
184                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
185         }
186         return nil
187 }
188
189 // DeviceID returns a globally unique ID for the storage container.
190 func (v *AzureBlobVolume) DeviceID() string {
191         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
192 }
193
194 // Return true if expires_at metadata attribute is found on the block
195 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
196         metadata, err := v.container.GetBlobMetadata(loc)
197         if err != nil {
198                 return false, metadata, v.translateError(err)
199         }
200         if metadata["expires_at"] != "" {
201                 return true, metadata, nil
202         }
203         return false, metadata, nil
204 }
205
206 // Get reads a Keep block that has been stored as a block blob in the
207 // container.
208 //
209 // If the block is younger than azureWriteRaceInterval and is
210 // unexpectedly empty, assume a PutBlob operation is in progress, and
211 // wait for it to finish writing.
212 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
213         trashed, _, err := v.checkTrashed(loc)
214         if err != nil {
215                 return 0, err
216         }
217         if trashed {
218                 return 0, os.ErrNotExist
219         }
220         var deadline time.Time
221         haveDeadline := false
222         size, err := v.get(ctx, loc, buf)
223         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
224                 // Seeing a brand new empty block probably means we're
225                 // in a race with CreateBlob, which under the hood
226                 // (apparently) does "CreateEmpty" and "CommitData"
227                 // with no additional transaction locking.
228                 if !haveDeadline {
229                         t, err := v.Mtime(loc)
230                         if err != nil {
231                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
232                                 break
233                         }
234                         deadline = t.Add(azureWriteRaceInterval)
235                         if time.Now().After(deadline) {
236                                 break
237                         }
238                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
239                         haveDeadline = true
240                 } else if time.Now().After(deadline) {
241                         break
242                 }
243                 select {
244                 case <-ctx.Done():
245                         return 0, ctx.Err()
246                 case <-time.After(azureWriteRacePollTime):
247                 }
248                 size, err = v.get(ctx, loc, buf)
249         }
250         if haveDeadline {
251                 log.Printf("Race ended with size==%d", size)
252         }
253         return size, err
254 }
255
256 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
257         ctx, cancel := context.WithCancel(ctx)
258         defer cancel()
259         expectSize := len(buf)
260         if azureMaxGetBytes < BlockSize {
261                 // Unfortunately the handler doesn't tell us how long the blob
262                 // is expected to be, so we have to ask Azure.
263                 props, err := v.container.GetBlobProperties(loc)
264                 if err != nil {
265                         return 0, v.translateError(err)
266                 }
267                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
268                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
269                 }
270                 expectSize = int(props.ContentLength)
271         }
272
273         if expectSize == 0 {
274                 return 0, nil
275         }
276
277         // We'll update this actualSize if/when we get the last piece.
278         actualSize := -1
279         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
280         errors := make(chan error, pieces)
281         var wg sync.WaitGroup
282         wg.Add(pieces)
283         for p := 0; p < pieces; p++ {
284                 // Each goroutine retrieves one piece. If we hit an
285                 // error, it is sent to the errors chan so get() can
286                 // return it -- but only if the error happens before
287                 // ctx is done. This way, if ctx is done before we hit
288                 // any other error (e.g., requesting client has hung
289                 // up), we return the original ctx.Err() instead of
290                 // the secondary errors from the transfers that got
291                 // interrupted as a result.
292                 go func(p int) {
293                         defer wg.Done()
294                         startPos := p * azureMaxGetBytes
295                         endPos := startPos + azureMaxGetBytes
296                         if endPos > expectSize {
297                                 endPos = expectSize
298                         }
299                         var rdr io.ReadCloser
300                         var err error
301                         gotRdr := make(chan struct{})
302                         go func() {
303                                 defer close(gotRdr)
304                                 if startPos == 0 && endPos == expectSize {
305                                         rdr, err = v.container.GetBlob(loc)
306                                 } else {
307                                         rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
308                                 }
309                         }()
310                         select {
311                         case <-ctx.Done():
312                                 go func() {
313                                         <-gotRdr
314                                         if err == nil {
315                                                 rdr.Close()
316                                         }
317                                 }()
318                                 return
319                         case <-gotRdr:
320                         }
321                         if err != nil {
322                                 errors <- err
323                                 cancel()
324                                 return
325                         }
326                         go func() {
327                                 // Close the reader when the client
328                                 // hangs up or another piece fails
329                                 // (possibly interrupting ReadFull())
330                                 // or when all pieces succeed and
331                                 // get() returns.
332                                 <-ctx.Done()
333                                 rdr.Close()
334                         }()
335                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
336                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
337                                 // If we don't know the actual size,
338                                 // and just tried reading 64 MiB, it's
339                                 // normal to encounter EOF.
340                         } else if err != nil {
341                                 if ctx.Err() == nil {
342                                         errors <- err
343                                 }
344                                 cancel()
345                                 return
346                         }
347                         if p == pieces-1 {
348                                 actualSize = startPos + n
349                         }
350                 }(p)
351         }
352         wg.Wait()
353         close(errors)
354         if len(errors) > 0 {
355                 return 0, v.translateError(<-errors)
356         }
357         if ctx.Err() != nil {
358                 return 0, ctx.Err()
359         }
360         return actualSize, nil
361 }
362
363 // Compare the given data with existing stored data.
364 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
365         trashed, _, err := v.checkTrashed(loc)
366         if err != nil {
367                 return err
368         }
369         if trashed {
370                 return os.ErrNotExist
371         }
372         var rdr io.ReadCloser
373         gotRdr := make(chan struct{})
374         go func() {
375                 defer close(gotRdr)
376                 rdr, err = v.container.GetBlob(loc)
377         }()
378         select {
379         case <-ctx.Done():
380                 go func() {
381                         <-gotRdr
382                         if err == nil {
383                                 rdr.Close()
384                         }
385                 }()
386                 return ctx.Err()
387         case <-gotRdr:
388         }
389         if err != nil {
390                 return v.translateError(err)
391         }
392         defer rdr.Close()
393         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
394 }
395
396 // Put stores a Keep block as a block blob in the container.
397 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
398         if v.ReadOnly {
399                 return MethodDisabledError
400         }
401         // Send the block data through a pipe, so that (if we need to)
402         // we can close the pipe early and abandon our
403         // CreateBlockBlobFromReader() goroutine, without worrying
404         // about CreateBlockBlobFromReader() accessing our block
405         // buffer after we release it.
406         bufr, bufw := io.Pipe()
407         go func() {
408                 io.Copy(bufw, bytes.NewReader(block))
409                 bufw.Close()
410         }()
411         errChan := make(chan error)
412         go func() {
413                 var body io.Reader = bufr
414                 if len(block) == 0 {
415                         // We must send a "Content-Length: 0" header,
416                         // but the http client interprets
417                         // ContentLength==0 as "unknown" unless it can
418                         // confirm by introspection that Body will
419                         // read 0 bytes.
420                         body = http.NoBody
421                         bufr.Close()
422                 }
423                 errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
424         }()
425         select {
426         case <-ctx.Done():
427                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
428                 // Our pipe might be stuck in Write(), waiting for
429                 // io.Copy() to read. If so, un-stick it. This means
430                 // CreateBlockBlobFromReader will get corrupt data,
431                 // but that's OK: the size won't match, so the write
432                 // will fail.
433                 go io.Copy(ioutil.Discard, bufr)
434                 // CloseWithError() will return once pending I/O is done.
435                 bufw.CloseWithError(ctx.Err())
436                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
437                 return ctx.Err()
438         case err := <-errChan:
439                 return err
440         }
441 }
442
443 // Touch updates the last-modified property of a block blob.
444 func (v *AzureBlobVolume) Touch(loc string) error {
445         if v.ReadOnly {
446                 return MethodDisabledError
447         }
448         trashed, metadata, err := v.checkTrashed(loc)
449         if err != nil {
450                 return err
451         }
452         if trashed {
453                 return os.ErrNotExist
454         }
455
456         metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
457         return v.container.SetBlobMetadata(loc, metadata, nil)
458 }
459
460 // Mtime returns the last-modified property of a block blob.
461 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
462         trashed, _, err := v.checkTrashed(loc)
463         if err != nil {
464                 return time.Time{}, err
465         }
466         if trashed {
467                 return time.Time{}, os.ErrNotExist
468         }
469
470         props, err := v.container.GetBlobProperties(loc)
471         if err != nil {
472                 return time.Time{}, err
473         }
474         return time.Time(props.LastModified), nil
475 }
476
477 // IndexTo writes a list of Keep blocks that are stored in the
478 // container.
479 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
480         params := storage.ListBlobsParameters{
481                 Prefix:  prefix,
482                 Include: &storage.IncludeBlobDataset{Metadata: true},
483         }
484         for {
485                 resp, err := v.container.ListBlobs(params)
486                 if err != nil {
487                         return err
488                 }
489                 for _, b := range resp.Blobs {
490                         if !v.isKeepBlock(b.Name) {
491                                 continue
492                         }
493                         modtime := time.Time(b.Properties.LastModified)
494                         if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
495                                 // A new zero-length blob is probably
496                                 // just a new non-empty blob that
497                                 // hasn't committed its data yet (see
498                                 // Get()), and in any case has no
499                                 // value.
500                                 continue
501                         }
502                         if b.Metadata["expires_at"] != "" {
503                                 // Trashed blob; exclude it from response
504                                 continue
505                         }
506                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
507                 }
508                 if resp.NextMarker == "" {
509                         return nil
510                 }
511                 params.Marker = resp.NextMarker
512         }
513 }
514
515 // Trash a Keep block.
516 func (v *AzureBlobVolume) Trash(loc string) error {
517         if v.ReadOnly {
518                 return MethodDisabledError
519         }
520
521         // Ideally we would use If-Unmodified-Since, but that
522         // particular condition seems to be ignored by Azure. Instead,
523         // we get the Etag before checking Mtime, and use If-Match to
524         // ensure we don't delete data if Put() or Touch() happens
525         // between our calls to Mtime() and DeleteBlob().
526         props, err := v.container.GetBlobProperties(loc)
527         if err != nil {
528                 return err
529         }
530         if t, err := v.Mtime(loc); err != nil {
531                 return err
532         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
533                 return nil
534         }
535
536         // If TrashLifetime == 0, just delete it
537         if theConfig.TrashLifetime == 0 {
538                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
539                         IfMatch: props.Etag,
540                 })
541         }
542
543         // Otherwise, mark as trash
544         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
545                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
546         }, &storage.SetBlobMetadataOptions{
547                 IfMatch: props.Etag,
548         })
549 }
550
551 // Untrash a Keep block.
552 // Delete the expires_at metadata attribute
553 func (v *AzureBlobVolume) Untrash(loc string) error {
554         // if expires_at does not exist, return NotFoundError
555         metadata, err := v.container.GetBlobMetadata(loc)
556         if err != nil {
557                 return v.translateError(err)
558         }
559         if metadata["expires_at"] == "" {
560                 return os.ErrNotExist
561         }
562
563         // reset expires_at metadata attribute
564         metadata["expires_at"] = ""
565         err = v.container.SetBlobMetadata(loc, metadata, nil)
566         return v.translateError(err)
567 }
568
569 // Status returns a VolumeStatus struct with placeholder data.
570 func (v *AzureBlobVolume) Status() *VolumeStatus {
571         return &VolumeStatus{
572                 DeviceNum: 1,
573                 BytesFree: BlockSize * 1000,
574                 BytesUsed: 1,
575         }
576 }
577
578 // String returns a volume label, including the container name.
579 func (v *AzureBlobVolume) String() string {
580         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
581 }
582
583 // Writable returns true, unless the -readonly flag was on when the
584 // volume was added.
585 func (v *AzureBlobVolume) Writable() bool {
586         return !v.ReadOnly
587 }
588
589 // Replication returns the replication level of the container, as
590 // specified by the -azure-storage-replication argument.
591 func (v *AzureBlobVolume) Replication() int {
592         return v.AzureReplication
593 }
594
595 // GetStorageClasses implements Volume
596 func (v *AzureBlobVolume) GetStorageClasses() []string {
597         return v.StorageClasses
598 }
599
600 // If possible, translate an Azure SDK error to a recognizable error
601 // like os.ErrNotExist.
602 func (v *AzureBlobVolume) translateError(err error) error {
603         switch {
604         case err == nil:
605                 return err
606         case strings.Contains(err.Error(), "Not Found"):
607                 // "storage: service returned without a response body (404 Not Found)"
608                 return os.ErrNotExist
609         default:
610                 return err
611         }
612 }
613
614 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
615
616 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
617         return keepBlockRegexp.MatchString(s)
618 }
619
620 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
621 // and deletes them from the volume.
622 func (v *AzureBlobVolume) EmptyTrash() {
623         var bytesDeleted, bytesInTrash int64
624         var blocksDeleted, blocksInTrash int64
625
626         doBlob := func(b storage.Blob) {
627                 // Check whether the block is flagged as trash
628                 if b.Metadata["expires_at"] == "" {
629                         return
630                 }
631
632                 atomic.AddInt64(&blocksInTrash, 1)
633                 atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
634
635                 expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
636                 if err != nil {
637                         log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
638                         return
639                 }
640
641                 if expiresAt > time.Now().Unix() {
642                         return
643                 }
644
645                 err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
646                         IfMatch: b.Properties.Etag,
647                 })
648                 if err != nil {
649                         log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
650                         return
651                 }
652                 atomic.AddInt64(&blocksDeleted, 1)
653                 atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
654         }
655
656         var wg sync.WaitGroup
657         todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers)
658         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
659                 wg.Add(1)
660                 go func() {
661                         defer wg.Done()
662                         for b := range todo {
663                                 doBlob(b)
664                         }
665                 }()
666         }
667
668         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
669         for {
670                 resp, err := v.container.ListBlobs(params)
671                 if err != nil {
672                         log.Printf("EmptyTrash: ListBlobs: %v", err)
673                         break
674                 }
675                 for _, b := range resp.Blobs {
676                         todo <- b
677                 }
678                 if resp.NextMarker == "" {
679                         break
680                 }
681                 params.Marker = resp.NextMarker
682         }
683         close(todo)
684         wg.Wait()
685
686         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
687 }
688
689 // InternalStats returns bucket I/O and API call counters.
690 func (v *AzureBlobVolume) InternalStats() interface{} {
691         return &v.container.stats
692 }
693
694 type azureBlobStats struct {
695         statsTicker
696         Ops              uint64
697         GetOps           uint64
698         GetRangeOps      uint64
699         GetMetadataOps   uint64
700         GetPropertiesOps uint64
701         CreateOps        uint64
702         SetMetadataOps   uint64
703         DelOps           uint64
704         ListOps          uint64
705 }
706
707 func (s *azureBlobStats) TickErr(err error) {
708         if err == nil {
709                 return
710         }
711         errType := fmt.Sprintf("%T", err)
712         if err, ok := err.(storage.AzureStorageServiceError); ok {
713                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
714         }
715         log.Printf("errType %T, err %s", err, err)
716         s.statsTicker.TickErr(err, errType)
717 }
718
719 // azureContainer wraps storage.Container in order to count I/O and
720 // API usage stats.
721 type azureContainer struct {
722         ctr   *storage.Container
723         stats azureBlobStats
724 }
725
726 func (c *azureContainer) Exists() (bool, error) {
727         c.stats.Tick(&c.stats.Ops)
728         ok, err := c.ctr.Exists()
729         c.stats.TickErr(err)
730         return ok, err
731 }
732
733 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
734         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
735         b := c.ctr.GetBlobReference(bname)
736         err := b.GetMetadata(nil)
737         c.stats.TickErr(err)
738         return b.Metadata, err
739 }
740
741 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
742         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
743         b := c.ctr.GetBlobReference(bname)
744         err := b.GetProperties(nil)
745         c.stats.TickErr(err)
746         return &b.Properties, err
747 }
748
749 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
750         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
751         b := c.ctr.GetBlobReference(bname)
752         rdr, err := b.Get(nil)
753         c.stats.TickErr(err)
754         return NewCountingReader(rdr, c.stats.TickInBytes), err
755 }
756
757 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
758         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
759         b := c.ctr.GetBlobReference(bname)
760         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
761                 Range: &storage.BlobRange{
762                         Start: uint64(start),
763                         End:   uint64(end),
764                 },
765                 GetBlobOptions: opts,
766         })
767         c.stats.TickErr(err)
768         return NewCountingReader(rdr, c.stats.TickInBytes), err
769 }
770
771 // If we give it an io.Reader that doesn't also have a Len() int
772 // method, the Azure SDK determines data size by copying the data into
773 // a new buffer, which is not a good use of memory.
774 type readerWithAzureLen struct {
775         io.Reader
776         len int
777 }
778
779 // Len satisfies the private lener interface in azure-sdk-for-go.
780 func (r *readerWithAzureLen) Len() int {
781         return r.len
782 }
783
784 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
785         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
786         if size != 0 {
787                 rdr = &readerWithAzureLen{
788                         Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
789                         len:    size,
790                 }
791         }
792         b := c.ctr.GetBlobReference(bname)
793         err := b.CreateBlockBlobFromReader(rdr, opts)
794         c.stats.TickErr(err)
795         return err
796 }
797
798 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
799         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
800         b := c.ctr.GetBlobReference(bname)
801         b.Metadata = m
802         err := b.SetMetadata(opts)
803         c.stats.TickErr(err)
804         return err
805 }
806
807 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
808         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
809         resp, err := c.ctr.ListBlobs(params)
810         c.stats.TickErr(err)
811         return resp, err
812 }
813
814 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
815         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
816         b := c.ctr.GetBlobReference(bname)
817         err := b.Delete(opts)
818         c.stats.TickErr(err)
819         return err
820 }