Merge branch 'master' into 13937-keepstore-prometheus
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "github.com/Azure/azure-sdk-for-go/storage"
26         "github.com/prometheus/client_golang/prometheus"
27 )
28
29 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
30
31 var (
32         azureMaxGetBytes           int
33         azureStorageAccountName    string
34         azureStorageAccountKeyFile string
35         azureStorageReplication    int
36         azureWriteRaceInterval     = 15 * time.Second
37         azureWriteRacePollTime     = time.Second
38 )
39
40 func readKeyFromFile(file string) (string, error) {
41         buf, err := ioutil.ReadFile(file)
42         if err != nil {
43                 return "", errors.New("reading key from " + file + ": " + err.Error())
44         }
45         accountKey := strings.TrimSpace(string(buf))
46         if accountKey == "" {
47                 return "", errors.New("empty account key in " + file)
48         }
49         return accountKey, nil
50 }
51
52 type azureVolumeAdder struct {
53         *Config
54 }
55
56 // String implements flag.Value
57 func (s *azureVolumeAdder) String() string {
58         return "-"
59 }
60
61 func (s *azureVolumeAdder) Set(containerName string) error {
62         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
63                 ContainerName:         containerName,
64                 StorageAccountName:    azureStorageAccountName,
65                 StorageAccountKeyFile: azureStorageAccountKeyFile,
66                 AzureReplication:      azureStorageReplication,
67                 ReadOnly:              deprecated.flagReadonly,
68         })
69         return nil
70 }
71
72 func init() {
73         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
74
75         flag.Var(&azureVolumeAdder{theConfig},
76                 "azure-storage-container-volume",
77                 "Use the given container as a storage volume. Can be given multiple times.")
78         flag.StringVar(
79                 &azureStorageAccountName,
80                 "azure-storage-account-name",
81                 "",
82                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
83         flag.StringVar(
84                 &azureStorageAccountKeyFile,
85                 "azure-storage-account-key-file",
86                 "",
87                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
88         flag.IntVar(
89                 &azureStorageReplication,
90                 "azure-storage-replication",
91                 3,
92                 "Replication level to report to clients when data is stored in an Azure container.")
93         flag.IntVar(
94                 &azureMaxGetBytes,
95                 "azure-max-get-bytes",
96                 BlockSize,
97                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
98 }
99
100 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
101 // container.
102 type AzureBlobVolume struct {
103         StorageAccountName    string
104         StorageAccountKeyFile string
105         StorageBaseURL        string // "" means default, "core.windows.net"
106         ContainerName         string
107         AzureReplication      int
108         ReadOnly              bool
109         RequestTimeout        arvados.Duration
110         StorageClasses        []string
111
112         azClient  storage.Client
113         container *azureContainer
114 }
115
116 // singleSender is a single-attempt storage.Sender.
117 type singleSender struct{}
118
119 // Send performs req exactly once.
120 func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
121         return c.HTTPClient.Do(req)
122 }
123
124 // Examples implements VolumeWithExamples.
125 func (*AzureBlobVolume) Examples() []Volume {
126         return []Volume{
127                 &AzureBlobVolume{
128                         StorageAccountName:    "example-account-name",
129                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
130                         ContainerName:         "example-container-name",
131                         AzureReplication:      3,
132                         RequestTimeout:        azureDefaultRequestTimeout,
133                 },
134                 &AzureBlobVolume{
135                         StorageAccountName:    "cn-account-name",
136                         StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
137                         StorageBaseURL:        "core.chinacloudapi.cn",
138                         ContainerName:         "cn-container-name",
139                         AzureReplication:      3,
140                         RequestTimeout:        azureDefaultRequestTimeout,
141                 },
142         }
143 }
144
145 // Type implements Volume.
146 func (v *AzureBlobVolume) Type() string {
147         return "Azure"
148 }
149
150 // Start implements Volume.
151 func (v *AzureBlobVolume) Start(opsCounters, errCounters, ioBytes *prometheus.CounterVec) error {
152         if v.ContainerName == "" {
153                 return errors.New("no container name given")
154         }
155         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
156                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
157         }
158         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
159         if err != nil {
160                 return err
161         }
162         if v.StorageBaseURL == "" {
163                 v.StorageBaseURL = storage.DefaultBaseURL
164         }
165         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
166         if err != nil {
167                 return fmt.Errorf("creating Azure storage client: %s", err)
168         }
169         v.azClient.Sender = &singleSender{}
170
171         if v.RequestTimeout == 0 {
172                 v.RequestTimeout = azureDefaultRequestTimeout
173         }
174         v.azClient.HTTPClient = &http.Client{
175                 Timeout: time.Duration(v.RequestTimeout),
176         }
177         bs := v.azClient.GetBlobService()
178         v.container = &azureContainer{
179                 ctr: bs.GetContainerReference(v.ContainerName),
180         }
181
182         if ok, err := v.container.Exists(); err != nil {
183                 return err
184         } else if !ok {
185                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
186         }
187         // Set up prometheus metrics
188         lbls := prometheus.Labels{"device_id": v.DeviceID()}
189         v.container.stats.opsCounters = opsCounters.MustCurryWith(lbls)
190         v.container.stats.errCounters = errCounters.MustCurryWith(lbls)
191         v.container.stats.ioBytes = ioBytes.MustCurryWith(lbls)
192
193         return nil
194 }
195
196 // DeviceID returns a globally unique ID for the storage container.
197 func (v *AzureBlobVolume) DeviceID() string {
198         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
199 }
200
201 // Return true if expires_at metadata attribute is found on the block
202 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
203         metadata, err := v.container.GetBlobMetadata(loc)
204         if err != nil {
205                 return false, metadata, v.translateError(err)
206         }
207         if metadata["expires_at"] != "" {
208                 return true, metadata, nil
209         }
210         return false, metadata, nil
211 }
212
213 // Get reads a Keep block that has been stored as a block blob in the
214 // container.
215 //
216 // If the block is younger than azureWriteRaceInterval and is
217 // unexpectedly empty, assume a PutBlob operation is in progress, and
218 // wait for it to finish writing.
219 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
220         trashed, _, err := v.checkTrashed(loc)
221         if err != nil {
222                 return 0, err
223         }
224         if trashed {
225                 return 0, os.ErrNotExist
226         }
227         var deadline time.Time
228         haveDeadline := false
229         size, err := v.get(ctx, loc, buf)
230         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
231                 // Seeing a brand new empty block probably means we're
232                 // in a race with CreateBlob, which under the hood
233                 // (apparently) does "CreateEmpty" and "CommitData"
234                 // with no additional transaction locking.
235                 if !haveDeadline {
236                         t, err := v.Mtime(loc)
237                         if err != nil {
238                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
239                                 break
240                         }
241                         deadline = t.Add(azureWriteRaceInterval)
242                         if time.Now().After(deadline) {
243                                 break
244                         }
245                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
246                         haveDeadline = true
247                 } else if time.Now().After(deadline) {
248                         break
249                 }
250                 select {
251                 case <-ctx.Done():
252                         return 0, ctx.Err()
253                 case <-time.After(azureWriteRacePollTime):
254                 }
255                 size, err = v.get(ctx, loc, buf)
256         }
257         if haveDeadline {
258                 log.Printf("Race ended with size==%d", size)
259         }
260         return size, err
261 }
262
263 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
264         ctx, cancel := context.WithCancel(ctx)
265         defer cancel()
266         expectSize := len(buf)
267         if azureMaxGetBytes < BlockSize {
268                 // Unfortunately the handler doesn't tell us how long the blob
269                 // is expected to be, so we have to ask Azure.
270                 props, err := v.container.GetBlobProperties(loc)
271                 if err != nil {
272                         return 0, v.translateError(err)
273                 }
274                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
275                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
276                 }
277                 expectSize = int(props.ContentLength)
278         }
279
280         if expectSize == 0 {
281                 return 0, nil
282         }
283
284         // We'll update this actualSize if/when we get the last piece.
285         actualSize := -1
286         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
287         errors := make(chan error, pieces)
288         var wg sync.WaitGroup
289         wg.Add(pieces)
290         for p := 0; p < pieces; p++ {
291                 // Each goroutine retrieves one piece. If we hit an
292                 // error, it is sent to the errors chan so get() can
293                 // return it -- but only if the error happens before
294                 // ctx is done. This way, if ctx is done before we hit
295                 // any other error (e.g., requesting client has hung
296                 // up), we return the original ctx.Err() instead of
297                 // the secondary errors from the transfers that got
298                 // interrupted as a result.
299                 go func(p int) {
300                         defer wg.Done()
301                         startPos := p * azureMaxGetBytes
302                         endPos := startPos + azureMaxGetBytes
303                         if endPos > expectSize {
304                                 endPos = expectSize
305                         }
306                         var rdr io.ReadCloser
307                         var err error
308                         gotRdr := make(chan struct{})
309                         go func() {
310                                 defer close(gotRdr)
311                                 if startPos == 0 && endPos == expectSize {
312                                         rdr, err = v.container.GetBlob(loc)
313                                 } else {
314                                         rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
315                                 }
316                         }()
317                         select {
318                         case <-ctx.Done():
319                                 go func() {
320                                         <-gotRdr
321                                         if err == nil {
322                                                 rdr.Close()
323                                         }
324                                 }()
325                                 return
326                         case <-gotRdr:
327                         }
328                         if err != nil {
329                                 errors <- err
330                                 cancel()
331                                 return
332                         }
333                         go func() {
334                                 // Close the reader when the client
335                                 // hangs up or another piece fails
336                                 // (possibly interrupting ReadFull())
337                                 // or when all pieces succeed and
338                                 // get() returns.
339                                 <-ctx.Done()
340                                 rdr.Close()
341                         }()
342                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
343                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
344                                 // If we don't know the actual size,
345                                 // and just tried reading 64 MiB, it's
346                                 // normal to encounter EOF.
347                         } else if err != nil {
348                                 if ctx.Err() == nil {
349                                         errors <- err
350                                 }
351                                 cancel()
352                                 return
353                         }
354                         if p == pieces-1 {
355                                 actualSize = startPos + n
356                         }
357                 }(p)
358         }
359         wg.Wait()
360         close(errors)
361         if len(errors) > 0 {
362                 return 0, v.translateError(<-errors)
363         }
364         if ctx.Err() != nil {
365                 return 0, ctx.Err()
366         }
367         return actualSize, nil
368 }
369
370 // Compare the given data with existing stored data.
371 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
372         trashed, _, err := v.checkTrashed(loc)
373         if err != nil {
374                 return err
375         }
376         if trashed {
377                 return os.ErrNotExist
378         }
379         var rdr io.ReadCloser
380         gotRdr := make(chan struct{})
381         go func() {
382                 defer close(gotRdr)
383                 rdr, err = v.container.GetBlob(loc)
384         }()
385         select {
386         case <-ctx.Done():
387                 go func() {
388                         <-gotRdr
389                         if err == nil {
390                                 rdr.Close()
391                         }
392                 }()
393                 return ctx.Err()
394         case <-gotRdr:
395         }
396         if err != nil {
397                 return v.translateError(err)
398         }
399         defer rdr.Close()
400         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
401 }
402
403 // Put stores a Keep block as a block blob in the container.
404 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
405         if v.ReadOnly {
406                 return MethodDisabledError
407         }
408         // Send the block data through a pipe, so that (if we need to)
409         // we can close the pipe early and abandon our
410         // CreateBlockBlobFromReader() goroutine, without worrying
411         // about CreateBlockBlobFromReader() accessing our block
412         // buffer after we release it.
413         bufr, bufw := io.Pipe()
414         go func() {
415                 io.Copy(bufw, bytes.NewReader(block))
416                 bufw.Close()
417         }()
418         errChan := make(chan error)
419         go func() {
420                 var body io.Reader = bufr
421                 if len(block) == 0 {
422                         // We must send a "Content-Length: 0" header,
423                         // but the http client interprets
424                         // ContentLength==0 as "unknown" unless it can
425                         // confirm by introspection that Body will
426                         // read 0 bytes.
427                         body = http.NoBody
428                         bufr.Close()
429                 }
430                 errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
431         }()
432         select {
433         case <-ctx.Done():
434                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
435                 // Our pipe might be stuck in Write(), waiting for
436                 // io.Copy() to read. If so, un-stick it. This means
437                 // CreateBlockBlobFromReader will get corrupt data,
438                 // but that's OK: the size won't match, so the write
439                 // will fail.
440                 go io.Copy(ioutil.Discard, bufr)
441                 // CloseWithError() will return once pending I/O is done.
442                 bufw.CloseWithError(ctx.Err())
443                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
444                 return ctx.Err()
445         case err := <-errChan:
446                 return err
447         }
448 }
449
450 // Touch updates the last-modified property of a block blob.
451 func (v *AzureBlobVolume) Touch(loc string) error {
452         if v.ReadOnly {
453                 return MethodDisabledError
454         }
455         trashed, metadata, err := v.checkTrashed(loc)
456         if err != nil {
457                 return err
458         }
459         if trashed {
460                 return os.ErrNotExist
461         }
462
463         metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
464         return v.container.SetBlobMetadata(loc, metadata, nil)
465 }
466
467 // Mtime returns the last-modified property of a block blob.
468 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
469         trashed, _, err := v.checkTrashed(loc)
470         if err != nil {
471                 return time.Time{}, err
472         }
473         if trashed {
474                 return time.Time{}, os.ErrNotExist
475         }
476
477         props, err := v.container.GetBlobProperties(loc)
478         if err != nil {
479                 return time.Time{}, err
480         }
481         return time.Time(props.LastModified), nil
482 }
483
484 // IndexTo writes a list of Keep blocks that are stored in the
485 // container.
486 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
487         params := storage.ListBlobsParameters{
488                 Prefix:  prefix,
489                 Include: &storage.IncludeBlobDataset{Metadata: true},
490         }
491         for {
492                 resp, err := v.container.ListBlobs(params)
493                 if err != nil {
494                         return err
495                 }
496                 for _, b := range resp.Blobs {
497                         if !v.isKeepBlock(b.Name) {
498                                 continue
499                         }
500                         modtime := time.Time(b.Properties.LastModified)
501                         if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
502                                 // A new zero-length blob is probably
503                                 // just a new non-empty blob that
504                                 // hasn't committed its data yet (see
505                                 // Get()), and in any case has no
506                                 // value.
507                                 continue
508                         }
509                         if b.Metadata["expires_at"] != "" {
510                                 // Trashed blob; exclude it from response
511                                 continue
512                         }
513                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
514                 }
515                 if resp.NextMarker == "" {
516                         return nil
517                 }
518                 params.Marker = resp.NextMarker
519         }
520 }
521
522 // Trash a Keep block.
523 func (v *AzureBlobVolume) Trash(loc string) error {
524         if v.ReadOnly {
525                 return MethodDisabledError
526         }
527
528         // Ideally we would use If-Unmodified-Since, but that
529         // particular condition seems to be ignored by Azure. Instead,
530         // we get the Etag before checking Mtime, and use If-Match to
531         // ensure we don't delete data if Put() or Touch() happens
532         // between our calls to Mtime() and DeleteBlob().
533         props, err := v.container.GetBlobProperties(loc)
534         if err != nil {
535                 return err
536         }
537         if t, err := v.Mtime(loc); err != nil {
538                 return err
539         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
540                 return nil
541         }
542
543         // If TrashLifetime == 0, just delete it
544         if theConfig.TrashLifetime == 0 {
545                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
546                         IfMatch: props.Etag,
547                 })
548         }
549
550         // Otherwise, mark as trash
551         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
552                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
553         }, &storage.SetBlobMetadataOptions{
554                 IfMatch: props.Etag,
555         })
556 }
557
558 // Untrash a Keep block.
559 // Delete the expires_at metadata attribute
560 func (v *AzureBlobVolume) Untrash(loc string) error {
561         // if expires_at does not exist, return NotFoundError
562         metadata, err := v.container.GetBlobMetadata(loc)
563         if err != nil {
564                 return v.translateError(err)
565         }
566         if metadata["expires_at"] == "" {
567                 return os.ErrNotExist
568         }
569
570         // reset expires_at metadata attribute
571         metadata["expires_at"] = ""
572         err = v.container.SetBlobMetadata(loc, metadata, nil)
573         return v.translateError(err)
574 }
575
576 // Status returns a VolumeStatus struct with placeholder data.
577 func (v *AzureBlobVolume) Status() *VolumeStatus {
578         return &VolumeStatus{
579                 DeviceNum: 1,
580                 BytesFree: BlockSize * 1000,
581                 BytesUsed: 1,
582         }
583 }
584
585 // String returns a volume label, including the container name.
586 func (v *AzureBlobVolume) String() string {
587         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
588 }
589
590 // Writable returns true, unless the -readonly flag was on when the
591 // volume was added.
592 func (v *AzureBlobVolume) Writable() bool {
593         return !v.ReadOnly
594 }
595
596 // Replication returns the replication level of the container, as
597 // specified by the -azure-storage-replication argument.
598 func (v *AzureBlobVolume) Replication() int {
599         return v.AzureReplication
600 }
601
602 // GetStorageClasses implements Volume
603 func (v *AzureBlobVolume) GetStorageClasses() []string {
604         return v.StorageClasses
605 }
606
607 // If possible, translate an Azure SDK error to a recognizable error
608 // like os.ErrNotExist.
609 func (v *AzureBlobVolume) translateError(err error) error {
610         switch {
611         case err == nil:
612                 return err
613         case strings.Contains(err.Error(), "StatusCode=503"):
614                 // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
615                 return VolumeBusyError
616         case strings.Contains(err.Error(), "Not Found"):
617                 // "storage: service returned without a response body (404 Not Found)"
618                 return os.ErrNotExist
619         default:
620                 return err
621         }
622 }
623
624 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
625
626 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
627         return keepBlockRegexp.MatchString(s)
628 }
629
630 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
631 // and deletes them from the volume.
632 func (v *AzureBlobVolume) EmptyTrash() {
633         var bytesDeleted, bytesInTrash int64
634         var blocksDeleted, blocksInTrash int64
635
636         doBlob := func(b storage.Blob) {
637                 // Check whether the block is flagged as trash
638                 if b.Metadata["expires_at"] == "" {
639                         return
640                 }
641
642                 atomic.AddInt64(&blocksInTrash, 1)
643                 atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
644
645                 expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
646                 if err != nil {
647                         log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
648                         return
649                 }
650
651                 if expiresAt > time.Now().Unix() {
652                         return
653                 }
654
655                 err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
656                         IfMatch: b.Properties.Etag,
657                 })
658                 if err != nil {
659                         log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
660                         return
661                 }
662                 atomic.AddInt64(&blocksDeleted, 1)
663                 atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
664         }
665
666         var wg sync.WaitGroup
667         todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers)
668         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
669                 wg.Add(1)
670                 go func() {
671                         defer wg.Done()
672                         for b := range todo {
673                                 doBlob(b)
674                         }
675                 }()
676         }
677
678         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
679         for {
680                 resp, err := v.container.ListBlobs(params)
681                 if err != nil {
682                         log.Printf("EmptyTrash: ListBlobs: %v", err)
683                         break
684                 }
685                 for _, b := range resp.Blobs {
686                         todo <- b
687                 }
688                 if resp.NextMarker == "" {
689                         break
690                 }
691                 params.Marker = resp.NextMarker
692         }
693         close(todo)
694         wg.Wait()
695
696         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
697 }
698
699 // InternalStats returns bucket I/O and API call counters.
700 func (v *AzureBlobVolume) InternalStats() interface{} {
701         return &v.container.stats
702 }
703
704 type azureBlobStats struct {
705         statsTicker
706         Ops              uint64
707         GetOps           uint64
708         GetRangeOps      uint64
709         GetMetadataOps   uint64
710         GetPropertiesOps uint64
711         CreateOps        uint64
712         SetMetadataOps   uint64
713         DelOps           uint64
714         ListOps          uint64
715 }
716
717 func (s *azureBlobStats) TickErr(err error) {
718         if err == nil {
719                 return
720         }
721         errType := fmt.Sprintf("%T", err)
722         if err, ok := err.(storage.AzureStorageServiceError); ok {
723                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
724         }
725         log.Printf("errType %T, err %s", err, err)
726         s.statsTicker.TickErr(err, errType)
727 }
728
729 // azureContainer wraps storage.Container in order to count I/O and
730 // API usage stats.
731 type azureContainer struct {
732         ctr   *storage.Container
733         stats azureBlobStats
734 }
735
736 func (c *azureContainer) Exists() (bool, error) {
737         c.stats.TickOps("exists")
738         c.stats.Tick(&c.stats.Ops)
739         ok, err := c.ctr.Exists()
740         c.stats.TickErr(err)
741         return ok, err
742 }
743
744 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
745         c.stats.TickOps("get_metadata")
746         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
747         b := c.ctr.GetBlobReference(bname)
748         err := b.GetMetadata(nil)
749         c.stats.TickErr(err)
750         return b.Metadata, err
751 }
752
753 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
754         c.stats.TickOps("get_properties")
755         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
756         b := c.ctr.GetBlobReference(bname)
757         err := b.GetProperties(nil)
758         c.stats.TickErr(err)
759         return &b.Properties, err
760 }
761
762 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
763         c.stats.TickOps("get")
764         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
765         b := c.ctr.GetBlobReference(bname)
766         rdr, err := b.Get(nil)
767         c.stats.TickErr(err)
768         return NewCountingReader(rdr, c.stats.TickInBytes), err
769 }
770
771 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
772         c.stats.TickOps("get_range")
773         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
774         b := c.ctr.GetBlobReference(bname)
775         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
776                 Range: &storage.BlobRange{
777                         Start: uint64(start),
778                         End:   uint64(end),
779                 },
780                 GetBlobOptions: opts,
781         })
782         c.stats.TickErr(err)
783         return NewCountingReader(rdr, c.stats.TickInBytes), err
784 }
785
786 // If we give it an io.Reader that doesn't also have a Len() int
787 // method, the Azure SDK determines data size by copying the data into
788 // a new buffer, which is not a good use of memory.
789 type readerWithAzureLen struct {
790         io.Reader
791         len int
792 }
793
794 // Len satisfies the private lener interface in azure-sdk-for-go.
795 func (r *readerWithAzureLen) Len() int {
796         return r.len
797 }
798
799 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
800         c.stats.TickOps("create")
801         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
802         if size != 0 {
803                 rdr = &readerWithAzureLen{
804                         Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
805                         len:    size,
806                 }
807         }
808         b := c.ctr.GetBlobReference(bname)
809         err := b.CreateBlockBlobFromReader(rdr, opts)
810         c.stats.TickErr(err)
811         return err
812 }
813
814 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
815         c.stats.TickOps("set_metadata")
816         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
817         b := c.ctr.GetBlobReference(bname)
818         b.Metadata = m
819         err := b.SetMetadata(opts)
820         c.stats.TickErr(err)
821         return err
822 }
823
824 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
825         c.stats.TickOps("list")
826         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
827         resp, err := c.ctr.ListBlobs(params)
828         c.stats.TickErr(err)
829         return resp, err
830 }
831
832 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
833         c.stats.TickOps("delete")
834         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
835         b := c.ctr.GetBlobReference(bname)
836         err := b.Delete(opts)
837         c.stats.TickErr(err)
838         return err
839 }