3c17b3bd0641e2bee23007d775b1740e2c7a14d4
[arvados.git] / services / keepstore / azure_blob_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.curoverse.com/arvados.git/sdk/go/arvados"
25         "github.com/Azure/azure-sdk-for-go/storage"
26         "github.com/prometheus/client_golang/prometheus"
27 )
28
29 const (
30         azureDefaultRequestTimeout       = arvados.Duration(10 * time.Minute)
31         azureDefaultListBlobsMaxAttempts = 12
32         azureDefaultListBlobsRetryDelay  = arvados.Duration(10 * time.Second)
33 )
34
35 var (
36         azureMaxGetBytes           int
37         azureStorageAccountName    string
38         azureStorageAccountKeyFile string
39         azureStorageReplication    int
40         azureWriteRaceInterval     = 15 * time.Second
41         azureWriteRacePollTime     = time.Second
42 )
43
44 func readKeyFromFile(file string) (string, error) {
45         buf, err := ioutil.ReadFile(file)
46         if err != nil {
47                 return "", errors.New("reading key from " + file + ": " + err.Error())
48         }
49         accountKey := strings.TrimSpace(string(buf))
50         if accountKey == "" {
51                 return "", errors.New("empty account key in " + file)
52         }
53         return accountKey, nil
54 }
55
56 type azureVolumeAdder struct {
57         *Config
58 }
59
60 // String implements flag.Value
61 func (s *azureVolumeAdder) String() string {
62         return "-"
63 }
64
65 func (s *azureVolumeAdder) Set(containerName string) error {
66         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
67                 ContainerName:         containerName,
68                 StorageAccountName:    azureStorageAccountName,
69                 StorageAccountKeyFile: azureStorageAccountKeyFile,
70                 AzureReplication:      azureStorageReplication,
71                 ReadOnly:              deprecated.flagReadonly,
72         })
73         return nil
74 }
75
76 func init() {
77         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
78
79         flag.Var(&azureVolumeAdder{theConfig},
80                 "azure-storage-container-volume",
81                 "Use the given container as a storage volume. Can be given multiple times.")
82         flag.StringVar(
83                 &azureStorageAccountName,
84                 "azure-storage-account-name",
85                 "",
86                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
87         flag.StringVar(
88                 &azureStorageAccountKeyFile,
89                 "azure-storage-account-key-file",
90                 "",
91                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
92         flag.IntVar(
93                 &azureStorageReplication,
94                 "azure-storage-replication",
95                 3,
96                 "Replication level to report to clients when data is stored in an Azure container.")
97         flag.IntVar(
98                 &azureMaxGetBytes,
99                 "azure-max-get-bytes",
100                 BlockSize,
101                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
102 }
103
104 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
105 // container.
106 type AzureBlobVolume struct {
107         StorageAccountName    string
108         StorageAccountKeyFile string
109         StorageBaseURL        string // "" means default, "core.windows.net"
110         ContainerName         string
111         AzureReplication      int
112         ReadOnly              bool
113         RequestTimeout        arvados.Duration
114         StorageClasses        []string
115         ListBlobsRetryDelay   arvados.Duration
116         ListBlobsMaxAttempts  int
117
118         azClient  storage.Client
119         container *azureContainer
120 }
121
122 // singleSender is a single-attempt storage.Sender.
123 type singleSender struct{}
124
125 // Send performs req exactly once.
126 func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
127         return c.HTTPClient.Do(req)
128 }
129
130 // Examples implements VolumeWithExamples.
131 func (*AzureBlobVolume) Examples() []Volume {
132         return []Volume{
133                 &AzureBlobVolume{
134                         StorageAccountName:    "example-account-name",
135                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
136                         ContainerName:         "example-container-name",
137                         AzureReplication:      3,
138                         RequestTimeout:        azureDefaultRequestTimeout,
139                 },
140                 &AzureBlobVolume{
141                         StorageAccountName:    "cn-account-name",
142                         StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
143                         StorageBaseURL:        "core.chinacloudapi.cn",
144                         ContainerName:         "cn-container-name",
145                         AzureReplication:      3,
146                         RequestTimeout:        azureDefaultRequestTimeout,
147                 },
148         }
149 }
150
151 // Type implements Volume.
152 func (v *AzureBlobVolume) Type() string {
153         return "Azure"
154 }
155
156 // Start implements Volume.
157 func (v *AzureBlobVolume) Start(vm *volumeMetricsVecs) error {
158         if v.ListBlobsRetryDelay == 0 {
159                 v.ListBlobsRetryDelay = azureDefaultListBlobsRetryDelay
160         }
161         if v.ListBlobsMaxAttempts == 0 {
162                 v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts
163         }
164         if v.ContainerName == "" {
165                 return errors.New("no container name given")
166         }
167         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
168                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
169         }
170         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
171         if err != nil {
172                 return err
173         }
174         if v.StorageBaseURL == "" {
175                 v.StorageBaseURL = storage.DefaultBaseURL
176         }
177         v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
178         if err != nil {
179                 return fmt.Errorf("creating Azure storage client: %s", err)
180         }
181         v.azClient.Sender = &singleSender{}
182
183         if v.RequestTimeout == 0 {
184                 v.RequestTimeout = azureDefaultRequestTimeout
185         }
186         v.azClient.HTTPClient = &http.Client{
187                 Timeout: time.Duration(v.RequestTimeout),
188         }
189         bs := v.azClient.GetBlobService()
190         v.container = &azureContainer{
191                 ctr: bs.GetContainerReference(v.ContainerName),
192         }
193
194         if ok, err := v.container.Exists(); err != nil {
195                 return err
196         } else if !ok {
197                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
198         }
199         // Set up prometheus metrics
200         lbls := prometheus.Labels{"device_id": v.DeviceID()}
201         v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(lbls)
202
203         return nil
204 }
205
206 // DeviceID returns a globally unique ID for the storage container.
207 func (v *AzureBlobVolume) DeviceID() string {
208         return "azure://" + v.StorageBaseURL + "/" + v.StorageAccountName + "/" + v.ContainerName
209 }
210
211 // Return true if expires_at metadata attribute is found on the block
212 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
213         metadata, err := v.container.GetBlobMetadata(loc)
214         if err != nil {
215                 return false, metadata, v.translateError(err)
216         }
217         if metadata["expires_at"] != "" {
218                 return true, metadata, nil
219         }
220         return false, metadata, nil
221 }
222
223 // Get reads a Keep block that has been stored as a block blob in the
224 // container.
225 //
226 // If the block is younger than azureWriteRaceInterval and is
227 // unexpectedly empty, assume a PutBlob operation is in progress, and
228 // wait for it to finish writing.
229 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
230         trashed, _, err := v.checkTrashed(loc)
231         if err != nil {
232                 return 0, err
233         }
234         if trashed {
235                 return 0, os.ErrNotExist
236         }
237         var deadline time.Time
238         haveDeadline := false
239         size, err := v.get(ctx, loc, buf)
240         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
241                 // Seeing a brand new empty block probably means we're
242                 // in a race with CreateBlob, which under the hood
243                 // (apparently) does "CreateEmpty" and "CommitData"
244                 // with no additional transaction locking.
245                 if !haveDeadline {
246                         t, err := v.Mtime(loc)
247                         if err != nil {
248                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
249                                 break
250                         }
251                         deadline = t.Add(azureWriteRaceInterval)
252                         if time.Now().After(deadline) {
253                                 break
254                         }
255                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
256                         haveDeadline = true
257                 } else if time.Now().After(deadline) {
258                         break
259                 }
260                 select {
261                 case <-ctx.Done():
262                         return 0, ctx.Err()
263                 case <-time.After(azureWriteRacePollTime):
264                 }
265                 size, err = v.get(ctx, loc, buf)
266         }
267         if haveDeadline {
268                 log.Printf("Race ended with size==%d", size)
269         }
270         return size, err
271 }
272
273 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
274         ctx, cancel := context.WithCancel(ctx)
275         defer cancel()
276         expectSize := len(buf)
277         if azureMaxGetBytes < BlockSize {
278                 // Unfortunately the handler doesn't tell us how long the blob
279                 // is expected to be, so we have to ask Azure.
280                 props, err := v.container.GetBlobProperties(loc)
281                 if err != nil {
282                         return 0, v.translateError(err)
283                 }
284                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
285                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
286                 }
287                 expectSize = int(props.ContentLength)
288         }
289
290         if expectSize == 0 {
291                 return 0, nil
292         }
293
294         // We'll update this actualSize if/when we get the last piece.
295         actualSize := -1
296         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
297         errors := make(chan error, pieces)
298         var wg sync.WaitGroup
299         wg.Add(pieces)
300         for p := 0; p < pieces; p++ {
301                 // Each goroutine retrieves one piece. If we hit an
302                 // error, it is sent to the errors chan so get() can
303                 // return it -- but only if the error happens before
304                 // ctx is done. This way, if ctx is done before we hit
305                 // any other error (e.g., requesting client has hung
306                 // up), we return the original ctx.Err() instead of
307                 // the secondary errors from the transfers that got
308                 // interrupted as a result.
309                 go func(p int) {
310                         defer wg.Done()
311                         startPos := p * azureMaxGetBytes
312                         endPos := startPos + azureMaxGetBytes
313                         if endPos > expectSize {
314                                 endPos = expectSize
315                         }
316                         var rdr io.ReadCloser
317                         var err error
318                         gotRdr := make(chan struct{})
319                         go func() {
320                                 defer close(gotRdr)
321                                 if startPos == 0 && endPos == expectSize {
322                                         rdr, err = v.container.GetBlob(loc)
323                                 } else {
324                                         rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
325                                 }
326                         }()
327                         select {
328                         case <-ctx.Done():
329                                 go func() {
330                                         <-gotRdr
331                                         if err == nil {
332                                                 rdr.Close()
333                                         }
334                                 }()
335                                 return
336                         case <-gotRdr:
337                         }
338                         if err != nil {
339                                 errors <- err
340                                 cancel()
341                                 return
342                         }
343                         go func() {
344                                 // Close the reader when the client
345                                 // hangs up or another piece fails
346                                 // (possibly interrupting ReadFull())
347                                 // or when all pieces succeed and
348                                 // get() returns.
349                                 <-ctx.Done()
350                                 rdr.Close()
351                         }()
352                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
353                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
354                                 // If we don't know the actual size,
355                                 // and just tried reading 64 MiB, it's
356                                 // normal to encounter EOF.
357                         } else if err != nil {
358                                 if ctx.Err() == nil {
359                                         errors <- err
360                                 }
361                                 cancel()
362                                 return
363                         }
364                         if p == pieces-1 {
365                                 actualSize = startPos + n
366                         }
367                 }(p)
368         }
369         wg.Wait()
370         close(errors)
371         if len(errors) > 0 {
372                 return 0, v.translateError(<-errors)
373         }
374         if ctx.Err() != nil {
375                 return 0, ctx.Err()
376         }
377         return actualSize, nil
378 }
379
380 // Compare the given data with existing stored data.
381 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
382         trashed, _, err := v.checkTrashed(loc)
383         if err != nil {
384                 return err
385         }
386         if trashed {
387                 return os.ErrNotExist
388         }
389         var rdr io.ReadCloser
390         gotRdr := make(chan struct{})
391         go func() {
392                 defer close(gotRdr)
393                 rdr, err = v.container.GetBlob(loc)
394         }()
395         select {
396         case <-ctx.Done():
397                 go func() {
398                         <-gotRdr
399                         if err == nil {
400                                 rdr.Close()
401                         }
402                 }()
403                 return ctx.Err()
404         case <-gotRdr:
405         }
406         if err != nil {
407                 return v.translateError(err)
408         }
409         defer rdr.Close()
410         return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
411 }
412
413 // Put stores a Keep block as a block blob in the container.
414 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
415         if v.ReadOnly {
416                 return MethodDisabledError
417         }
418         // Send the block data through a pipe, so that (if we need to)
419         // we can close the pipe early and abandon our
420         // CreateBlockBlobFromReader() goroutine, without worrying
421         // about CreateBlockBlobFromReader() accessing our block
422         // buffer after we release it.
423         bufr, bufw := io.Pipe()
424         go func() {
425                 io.Copy(bufw, bytes.NewReader(block))
426                 bufw.Close()
427         }()
428         errChan := make(chan error)
429         go func() {
430                 var body io.Reader = bufr
431                 if len(block) == 0 {
432                         // We must send a "Content-Length: 0" header,
433                         // but the http client interprets
434                         // ContentLength==0 as "unknown" unless it can
435                         // confirm by introspection that Body will
436                         // read 0 bytes.
437                         body = http.NoBody
438                         bufr.Close()
439                 }
440                 errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
441         }()
442         select {
443         case <-ctx.Done():
444                 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
445                 // Our pipe might be stuck in Write(), waiting for
446                 // io.Copy() to read. If so, un-stick it. This means
447                 // CreateBlockBlobFromReader will get corrupt data,
448                 // but that's OK: the size won't match, so the write
449                 // will fail.
450                 go io.Copy(ioutil.Discard, bufr)
451                 // CloseWithError() will return once pending I/O is done.
452                 bufw.CloseWithError(ctx.Err())
453                 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
454                 return ctx.Err()
455         case err := <-errChan:
456                 return err
457         }
458 }
459
460 // Touch updates the last-modified property of a block blob.
461 func (v *AzureBlobVolume) Touch(loc string) error {
462         if v.ReadOnly {
463                 return MethodDisabledError
464         }
465         trashed, metadata, err := v.checkTrashed(loc)
466         if err != nil {
467                 return err
468         }
469         if trashed {
470                 return os.ErrNotExist
471         }
472
473         metadata["touch"] = fmt.Sprintf("%d", time.Now().Unix())
474         return v.container.SetBlobMetadata(loc, metadata, nil)
475 }
476
477 // Mtime returns the last-modified property of a block blob.
478 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
479         trashed, _, err := v.checkTrashed(loc)
480         if err != nil {
481                 return time.Time{}, err
482         }
483         if trashed {
484                 return time.Time{}, os.ErrNotExist
485         }
486
487         props, err := v.container.GetBlobProperties(loc)
488         if err != nil {
489                 return time.Time{}, err
490         }
491         return time.Time(props.LastModified), nil
492 }
493
494 // IndexTo writes a list of Keep blocks that are stored in the
495 // container.
496 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
497         params := storage.ListBlobsParameters{
498                 Prefix:  prefix,
499                 Include: &storage.IncludeBlobDataset{Metadata: true},
500         }
501         for page := 1; ; page++ {
502                 resp, err := v.listBlobs(page, params)
503                 if err != nil {
504                         return err
505                 }
506                 for _, b := range resp.Blobs {
507                         if !v.isKeepBlock(b.Name) {
508                                 continue
509                         }
510                         modtime := time.Time(b.Properties.LastModified)
511                         if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
512                                 // A new zero-length blob is probably
513                                 // just a new non-empty blob that
514                                 // hasn't committed its data yet (see
515                                 // Get()), and in any case has no
516                                 // value.
517                                 continue
518                         }
519                         if b.Metadata["expires_at"] != "" {
520                                 // Trashed blob; exclude it from response
521                                 continue
522                         }
523                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
524                 }
525                 if resp.NextMarker == "" {
526                         return nil
527                 }
528                 params.Marker = resp.NextMarker
529         }
530 }
531
532 // call v.container.ListBlobs, retrying if needed.
533 func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters) (resp storage.BlobListResponse, err error) {
534         for i := 0; i < v.ListBlobsMaxAttempts; i++ {
535                 resp, err = v.container.ListBlobs(params)
536                 err = v.translateError(err)
537                 if err == VolumeBusyError {
538                         log.Printf("ListBlobs: will retry page %d in %s after error: %s", page, v.ListBlobsRetryDelay, err)
539                         time.Sleep(time.Duration(v.ListBlobsRetryDelay))
540                         continue
541                 } else {
542                         break
543                 }
544         }
545         return
546 }
547
548 // Trash a Keep block.
549 func (v *AzureBlobVolume) Trash(loc string) error {
550         if v.ReadOnly {
551                 return MethodDisabledError
552         }
553
554         // Ideally we would use If-Unmodified-Since, but that
555         // particular condition seems to be ignored by Azure. Instead,
556         // we get the Etag before checking Mtime, and use If-Match to
557         // ensure we don't delete data if Put() or Touch() happens
558         // between our calls to Mtime() and DeleteBlob().
559         props, err := v.container.GetBlobProperties(loc)
560         if err != nil {
561                 return err
562         }
563         if t, err := v.Mtime(loc); err != nil {
564                 return err
565         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
566                 return nil
567         }
568
569         // If TrashLifetime == 0, just delete it
570         if theConfig.TrashLifetime == 0 {
571                 return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
572                         IfMatch: props.Etag,
573                 })
574         }
575
576         // Otherwise, mark as trash
577         return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
578                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
579         }, &storage.SetBlobMetadataOptions{
580                 IfMatch: props.Etag,
581         })
582 }
583
584 // Untrash a Keep block.
585 // Delete the expires_at metadata attribute
586 func (v *AzureBlobVolume) Untrash(loc string) error {
587         // if expires_at does not exist, return NotFoundError
588         metadata, err := v.container.GetBlobMetadata(loc)
589         if err != nil {
590                 return v.translateError(err)
591         }
592         if metadata["expires_at"] == "" {
593                 return os.ErrNotExist
594         }
595
596         // reset expires_at metadata attribute
597         metadata["expires_at"] = ""
598         err = v.container.SetBlobMetadata(loc, metadata, nil)
599         return v.translateError(err)
600 }
601
602 // Status returns a VolumeStatus struct with placeholder data.
603 func (v *AzureBlobVolume) Status() *VolumeStatus {
604         return &VolumeStatus{
605                 DeviceNum: 1,
606                 BytesFree: BlockSize * 1000,
607                 BytesUsed: 1,
608         }
609 }
610
611 // String returns a volume label, including the container name.
612 func (v *AzureBlobVolume) String() string {
613         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
614 }
615
616 // Writable returns true, unless the -readonly flag was on when the
617 // volume was added.
618 func (v *AzureBlobVolume) Writable() bool {
619         return !v.ReadOnly
620 }
621
622 // Replication returns the replication level of the container, as
623 // specified by the -azure-storage-replication argument.
624 func (v *AzureBlobVolume) Replication() int {
625         return v.AzureReplication
626 }
627
628 // GetStorageClasses implements Volume
629 func (v *AzureBlobVolume) GetStorageClasses() []string {
630         return v.StorageClasses
631 }
632
633 // If possible, translate an Azure SDK error to a recognizable error
634 // like os.ErrNotExist.
635 func (v *AzureBlobVolume) translateError(err error) error {
636         switch {
637         case err == nil:
638                 return err
639         case strings.Contains(err.Error(), "StatusCode=503"):
640                 // "storage: service returned error: StatusCode=503, ErrorCode=ServerBusy, ErrorMessage=The server is busy" (See #14804)
641                 return VolumeBusyError
642         case strings.Contains(err.Error(), "Not Found"):
643                 // "storage: service returned without a response body (404 Not Found)"
644                 return os.ErrNotExist
645         default:
646                 return err
647         }
648 }
649
650 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
651
652 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
653         return keepBlockRegexp.MatchString(s)
654 }
655
656 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
657 // and deletes them from the volume.
658 func (v *AzureBlobVolume) EmptyTrash() {
659         var bytesDeleted, bytesInTrash int64
660         var blocksDeleted, blocksInTrash int64
661
662         doBlob := func(b storage.Blob) {
663                 // Check whether the block is flagged as trash
664                 if b.Metadata["expires_at"] == "" {
665                         return
666                 }
667
668                 atomic.AddInt64(&blocksInTrash, 1)
669                 atomic.AddInt64(&bytesInTrash, b.Properties.ContentLength)
670
671                 expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
672                 if err != nil {
673                         log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
674                         return
675                 }
676
677                 if expiresAt > time.Now().Unix() {
678                         return
679                 }
680
681                 err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
682                         IfMatch: b.Properties.Etag,
683                 })
684                 if err != nil {
685                         log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
686                         return
687                 }
688                 atomic.AddInt64(&blocksDeleted, 1)
689                 atomic.AddInt64(&bytesDeleted, b.Properties.ContentLength)
690         }
691
692         var wg sync.WaitGroup
693         todo := make(chan storage.Blob, theConfig.EmptyTrashWorkers)
694         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
695                 wg.Add(1)
696                 go func() {
697                         defer wg.Done()
698                         for b := range todo {
699                                 doBlob(b)
700                         }
701                 }()
702         }
703
704         params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
705         for page := 1; ; page++ {
706                 resp, err := v.listBlobs(page, params)
707                 if err != nil {
708                         log.Printf("EmptyTrash: ListBlobs: %v", err)
709                         break
710                 }
711                 for _, b := range resp.Blobs {
712                         todo <- b
713                 }
714                 if resp.NextMarker == "" {
715                         break
716                 }
717                 params.Marker = resp.NextMarker
718         }
719         close(todo)
720         wg.Wait()
721
722         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
723 }
724
725 // InternalStats returns bucket I/O and API call counters.
726 func (v *AzureBlobVolume) InternalStats() interface{} {
727         return &v.container.stats
728 }
729
730 type azureBlobStats struct {
731         statsTicker
732         Ops              uint64
733         GetOps           uint64
734         GetRangeOps      uint64
735         GetMetadataOps   uint64
736         GetPropertiesOps uint64
737         CreateOps        uint64
738         SetMetadataOps   uint64
739         DelOps           uint64
740         ListOps          uint64
741 }
742
743 func (s *azureBlobStats) TickErr(err error) {
744         if err == nil {
745                 return
746         }
747         errType := fmt.Sprintf("%T", err)
748         if err, ok := err.(storage.AzureStorageServiceError); ok {
749                 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
750         }
751         log.Printf("errType %T, err %s", err, err)
752         s.statsTicker.TickErr(err, errType)
753 }
754
755 // azureContainer wraps storage.Container in order to count I/O and
756 // API usage stats.
757 type azureContainer struct {
758         ctr   *storage.Container
759         stats azureBlobStats
760 }
761
762 func (c *azureContainer) Exists() (bool, error) {
763         c.stats.TickOps("exists")
764         c.stats.Tick(&c.stats.Ops)
765         ok, err := c.ctr.Exists()
766         c.stats.TickErr(err)
767         return ok, err
768 }
769
770 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
771         c.stats.TickOps("get_metadata")
772         c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
773         b := c.ctr.GetBlobReference(bname)
774         err := b.GetMetadata(nil)
775         c.stats.TickErr(err)
776         return b.Metadata, err
777 }
778
779 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
780         c.stats.TickOps("get_properties")
781         c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
782         b := c.ctr.GetBlobReference(bname)
783         err := b.GetProperties(nil)
784         c.stats.TickErr(err)
785         return &b.Properties, err
786 }
787
788 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
789         c.stats.TickOps("get")
790         c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
791         b := c.ctr.GetBlobReference(bname)
792         rdr, err := b.Get(nil)
793         c.stats.TickErr(err)
794         return NewCountingReader(rdr, c.stats.TickInBytes), err
795 }
796
797 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
798         c.stats.TickOps("get_range")
799         c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
800         b := c.ctr.GetBlobReference(bname)
801         rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
802                 Range: &storage.BlobRange{
803                         Start: uint64(start),
804                         End:   uint64(end),
805                 },
806                 GetBlobOptions: opts,
807         })
808         c.stats.TickErr(err)
809         return NewCountingReader(rdr, c.stats.TickInBytes), err
810 }
811
812 // If we give it an io.Reader that doesn't also have a Len() int
813 // method, the Azure SDK determines data size by copying the data into
814 // a new buffer, which is not a good use of memory.
815 type readerWithAzureLen struct {
816         io.Reader
817         len int
818 }
819
820 // Len satisfies the private lener interface in azure-sdk-for-go.
821 func (r *readerWithAzureLen) Len() int {
822         return r.len
823 }
824
825 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
826         c.stats.TickOps("create")
827         c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
828         if size != 0 {
829                 rdr = &readerWithAzureLen{
830                         Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
831                         len:    size,
832                 }
833         }
834         b := c.ctr.GetBlobReference(bname)
835         err := b.CreateBlockBlobFromReader(rdr, opts)
836         c.stats.TickErr(err)
837         return err
838 }
839
840 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
841         c.stats.TickOps("set_metadata")
842         c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
843         b := c.ctr.GetBlobReference(bname)
844         b.Metadata = m
845         err := b.SetMetadata(opts)
846         c.stats.TickErr(err)
847         return err
848 }
849
850 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
851         c.stats.TickOps("list")
852         c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
853         resp, err := c.ctr.ListBlobs(params)
854         c.stats.TickErr(err)
855         return resp, err
856 }
857
858 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
859         c.stats.TickOps("delete")
860         c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
861         b := c.ctr.GetBlobReference(bname)
862         err := b.Delete(opts)
863         c.stats.TickErr(err)
864         return err
865 }