Merge branch '10468-blob-storage-timeouts' closes #10468
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "net/http"
12         "os"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "time"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvados"
20         "github.com/curoverse/azure-sdk-for-go/storage"
21 )
22
23 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
24
25 var (
26         azureMaxGetBytes           int
27         azureStorageAccountName    string
28         azureStorageAccountKeyFile string
29         azureStorageReplication    int
30         azureWriteRaceInterval     = 15 * time.Second
31         azureWriteRacePollTime     = time.Second
32 )
33
34 func readKeyFromFile(file string) (string, error) {
35         buf, err := ioutil.ReadFile(file)
36         if err != nil {
37                 return "", errors.New("reading key from " + file + ": " + err.Error())
38         }
39         accountKey := strings.TrimSpace(string(buf))
40         if accountKey == "" {
41                 return "", errors.New("empty account key in " + file)
42         }
43         return accountKey, nil
44 }
45
46 type azureVolumeAdder struct {
47         *Config
48 }
49
50 // String implements flag.Value
51 func (s *azureVolumeAdder) String() string {
52         return "-"
53 }
54
55 func (s *azureVolumeAdder) Set(containerName string) error {
56         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
57                 ContainerName:         containerName,
58                 StorageAccountName:    azureStorageAccountName,
59                 StorageAccountKeyFile: azureStorageAccountKeyFile,
60                 AzureReplication:      azureStorageReplication,
61                 ReadOnly:              deprecated.flagReadonly,
62         })
63         return nil
64 }
65
66 func init() {
67         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
68
69         flag.Var(&azureVolumeAdder{theConfig},
70                 "azure-storage-container-volume",
71                 "Use the given container as a storage volume. Can be given multiple times.")
72         flag.StringVar(
73                 &azureStorageAccountName,
74                 "azure-storage-account-name",
75                 "",
76                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
77         flag.StringVar(
78                 &azureStorageAccountKeyFile,
79                 "azure-storage-account-key-file",
80                 "",
81                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
82         flag.IntVar(
83                 &azureStorageReplication,
84                 "azure-storage-replication",
85                 3,
86                 "Replication level to report to clients when data is stored in an Azure container.")
87         flag.IntVar(
88                 &azureMaxGetBytes,
89                 "azure-max-get-bytes",
90                 BlockSize,
91                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
92 }
93
94 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
95 // container.
96 type AzureBlobVolume struct {
97         StorageAccountName    string
98         StorageAccountKeyFile string
99         ContainerName         string
100         AzureReplication      int
101         ReadOnly              bool
102         RequestTimeout        arvados.Duration
103
104         azClient storage.Client
105         bsClient storage.BlobStorageClient
106 }
107
108 // Examples implements VolumeWithExamples.
109 func (*AzureBlobVolume) Examples() []Volume {
110         return []Volume{
111                 &AzureBlobVolume{
112                         StorageAccountName:    "example-account-name",
113                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
114                         ContainerName:         "example-container-name",
115                         AzureReplication:      3,
116                         RequestTimeout:        azureDefaultRequestTimeout,
117                 },
118         }
119 }
120
121 // Type implements Volume.
122 func (v *AzureBlobVolume) Type() string {
123         return "Azure"
124 }
125
126 // Start implements Volume.
127 func (v *AzureBlobVolume) Start() error {
128         if v.ContainerName == "" {
129                 return errors.New("no container name given")
130         }
131         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
132                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
133         }
134         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
135         if err != nil {
136                 return err
137         }
138         v.azClient, err = storage.NewBasicClient(v.StorageAccountName, accountKey)
139         if err != nil {
140                 return fmt.Errorf("creating Azure storage client: %s", err)
141         }
142
143         if v.RequestTimeout == 0 {
144                 v.RequestTimeout = azureDefaultRequestTimeout
145         }
146         v.azClient.HTTPClient = &http.Client{
147                 Timeout: time.Duration(v.RequestTimeout),
148         }
149         v.bsClient = v.azClient.GetBlobService()
150
151         ok, err := v.bsClient.ContainerExists(v.ContainerName)
152         if err != nil {
153                 return err
154         }
155         if !ok {
156                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
157         }
158         return nil
159 }
160
161 // Return true if expires_at metadata attribute is found on the block
162 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
163         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
164         if err != nil {
165                 return false, metadata, v.translateError(err)
166         }
167         if metadata["expires_at"] != "" {
168                 return true, metadata, nil
169         }
170         return false, metadata, nil
171 }
172
173 // Get reads a Keep block that has been stored as a block blob in the
174 // container.
175 //
176 // If the block is younger than azureWriteRaceInterval and is
177 // unexpectedly empty, assume a PutBlob operation is in progress, and
178 // wait for it to finish writing.
179 func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
180         trashed, _, err := v.checkTrashed(loc)
181         if err != nil {
182                 return 0, err
183         }
184         if trashed {
185                 return 0, os.ErrNotExist
186         }
187         var deadline time.Time
188         haveDeadline := false
189         size, err := v.get(loc, buf)
190         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
191                 // Seeing a brand new empty block probably means we're
192                 // in a race with CreateBlob, which under the hood
193                 // (apparently) does "CreateEmpty" and "CommitData"
194                 // with no additional transaction locking.
195                 if !haveDeadline {
196                         t, err := v.Mtime(loc)
197                         if err != nil {
198                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
199                                 break
200                         }
201                         deadline = t.Add(azureWriteRaceInterval)
202                         if time.Now().After(deadline) {
203                                 break
204                         }
205                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
206                         haveDeadline = true
207                 } else if time.Now().After(deadline) {
208                         break
209                 }
210                 time.Sleep(azureWriteRacePollTime)
211                 size, err = v.get(loc, buf)
212         }
213         if haveDeadline {
214                 log.Printf("Race ended with size==%d", size)
215         }
216         return size, err
217 }
218
219 func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
220         expectSize := len(buf)
221         if azureMaxGetBytes < BlockSize {
222                 // Unfortunately the handler doesn't tell us how long the blob
223                 // is expected to be, so we have to ask Azure.
224                 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
225                 if err != nil {
226                         return 0, v.translateError(err)
227                 }
228                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
229                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
230                 }
231                 expectSize = int(props.ContentLength)
232         }
233
234         if expectSize == 0 {
235                 return 0, nil
236         }
237
238         // We'll update this actualSize if/when we get the last piece.
239         actualSize := -1
240         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
241         errors := make([]error, pieces)
242         var wg sync.WaitGroup
243         wg.Add(pieces)
244         for p := 0; p < pieces; p++ {
245                 go func(p int) {
246                         defer wg.Done()
247                         startPos := p * azureMaxGetBytes
248                         endPos := startPos + azureMaxGetBytes
249                         if endPos > expectSize {
250                                 endPos = expectSize
251                         }
252                         var rdr io.ReadCloser
253                         var err error
254                         if startPos == 0 && endPos == expectSize {
255                                 rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
256                         } else {
257                                 rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
258                         }
259                         if err != nil {
260                                 errors[p] = err
261                                 return
262                         }
263                         defer rdr.Close()
264                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
265                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
266                                 // If we don't know the actual size,
267                                 // and just tried reading 64 MiB, it's
268                                 // normal to encounter EOF.
269                         } else if err != nil {
270                                 errors[p] = err
271                         }
272                         if p == pieces-1 {
273                                 actualSize = startPos + n
274                         }
275                 }(p)
276         }
277         wg.Wait()
278         for _, err := range errors {
279                 if err != nil {
280                         return 0, v.translateError(err)
281                 }
282         }
283         return actualSize, nil
284 }
285
286 // Compare the given data with existing stored data.
287 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
288         trashed, _, err := v.checkTrashed(loc)
289         if err != nil {
290                 return err
291         }
292         if trashed {
293                 return os.ErrNotExist
294         }
295         rdr, err := v.bsClient.GetBlob(v.ContainerName, loc)
296         if err != nil {
297                 return v.translateError(err)
298         }
299         defer rdr.Close()
300         return compareReaderWithBuf(rdr, expect, loc[:32])
301 }
302
303 // Put stores a Keep block as a block blob in the container.
304 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
305         if v.ReadOnly {
306                 return MethodDisabledError
307         }
308         return v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
309 }
310
311 // Touch updates the last-modified property of a block blob.
312 func (v *AzureBlobVolume) Touch(loc string) error {
313         if v.ReadOnly {
314                 return MethodDisabledError
315         }
316         trashed, metadata, err := v.checkTrashed(loc)
317         if err != nil {
318                 return err
319         }
320         if trashed {
321                 return os.ErrNotExist
322         }
323
324         metadata["touch"] = fmt.Sprintf("%d", time.Now())
325         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
326 }
327
328 // Mtime returns the last-modified property of a block blob.
329 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
330         trashed, _, err := v.checkTrashed(loc)
331         if err != nil {
332                 return time.Time{}, err
333         }
334         if trashed {
335                 return time.Time{}, os.ErrNotExist
336         }
337
338         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
339         if err != nil {
340                 return time.Time{}, err
341         }
342         return time.Parse(time.RFC1123, props.LastModified)
343 }
344
345 // IndexTo writes a list of Keep blocks that are stored in the
346 // container.
347 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
348         params := storage.ListBlobsParameters{
349                 Prefix:  prefix,
350                 Include: "metadata",
351         }
352         for {
353                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
354                 if err != nil {
355                         return err
356                 }
357                 for _, b := range resp.Blobs {
358                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
359                         if err != nil {
360                                 return err
361                         }
362                         if !v.isKeepBlock(b.Name) {
363                                 continue
364                         }
365                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
366                                 // A new zero-length blob is probably
367                                 // just a new non-empty blob that
368                                 // hasn't committed its data yet (see
369                                 // Get()), and in any case has no
370                                 // value.
371                                 continue
372                         }
373                         if b.Metadata["expires_at"] != "" {
374                                 // Trashed blob; exclude it from response
375                                 continue
376                         }
377                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
378                 }
379                 if resp.NextMarker == "" {
380                         return nil
381                 }
382                 params.Marker = resp.NextMarker
383         }
384 }
385
386 // Trash a Keep block.
387 func (v *AzureBlobVolume) Trash(loc string) error {
388         if v.ReadOnly {
389                 return MethodDisabledError
390         }
391
392         // Ideally we would use If-Unmodified-Since, but that
393         // particular condition seems to be ignored by Azure. Instead,
394         // we get the Etag before checking Mtime, and use If-Match to
395         // ensure we don't delete data if Put() or Touch() happens
396         // between our calls to Mtime() and DeleteBlob().
397         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
398         if err != nil {
399                 return err
400         }
401         if t, err := v.Mtime(loc); err != nil {
402                 return err
403         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
404                 return nil
405         }
406
407         // If TrashLifetime == 0, just delete it
408         if theConfig.TrashLifetime == 0 {
409                 return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
410                         "If-Match": props.Etag,
411                 })
412         }
413
414         // Otherwise, mark as trash
415         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
416                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
417         }, map[string]string{
418                 "If-Match": props.Etag,
419         })
420 }
421
422 // Untrash a Keep block.
423 // Delete the expires_at metadata attribute
424 func (v *AzureBlobVolume) Untrash(loc string) error {
425         // if expires_at does not exist, return NotFoundError
426         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
427         if err != nil {
428                 return v.translateError(err)
429         }
430         if metadata["expires_at"] == "" {
431                 return os.ErrNotExist
432         }
433
434         // reset expires_at metadata attribute
435         metadata["expires_at"] = ""
436         err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
437         return v.translateError(err)
438 }
439
440 // Status returns a VolumeStatus struct with placeholder data.
441 func (v *AzureBlobVolume) Status() *VolumeStatus {
442         return &VolumeStatus{
443                 DeviceNum: 1,
444                 BytesFree: BlockSize * 1000,
445                 BytesUsed: 1,
446         }
447 }
448
449 // String returns a volume label, including the container name.
450 func (v *AzureBlobVolume) String() string {
451         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
452 }
453
454 // Writable returns true, unless the -readonly flag was on when the
455 // volume was added.
456 func (v *AzureBlobVolume) Writable() bool {
457         return !v.ReadOnly
458 }
459
460 // Replication returns the replication level of the container, as
461 // specified by the -azure-storage-replication argument.
462 func (v *AzureBlobVolume) Replication() int {
463         return v.AzureReplication
464 }
465
466 // If possible, translate an Azure SDK error to a recognizable error
467 // like os.ErrNotExist.
468 func (v *AzureBlobVolume) translateError(err error) error {
469         switch {
470         case err == nil:
471                 return err
472         case strings.Contains(err.Error(), "Not Found"):
473                 // "storage: service returned without a response body (404 Not Found)"
474                 return os.ErrNotExist
475         default:
476                 return err
477         }
478 }
479
480 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
481
482 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
483         return keepBlockRegexp.MatchString(s)
484 }
485
486 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
487 // and deletes them from the volume.
488 func (v *AzureBlobVolume) EmptyTrash() {
489         var bytesDeleted, bytesInTrash int64
490         var blocksDeleted, blocksInTrash int
491         params := storage.ListBlobsParameters{Include: "metadata"}
492
493         for {
494                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
495                 if err != nil {
496                         log.Printf("EmptyTrash: ListBlobs: %v", err)
497                         break
498                 }
499                 for _, b := range resp.Blobs {
500                         // Check if the block is expired
501                         if b.Metadata["expires_at"] == "" {
502                                 continue
503                         }
504
505                         blocksInTrash++
506                         bytesInTrash += b.Properties.ContentLength
507
508                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
509                         if err != nil {
510                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
511                                 continue
512                         }
513
514                         if expiresAt > time.Now().Unix() {
515                                 continue
516                         }
517
518                         err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
519                                 "If-Match": b.Properties.Etag,
520                         })
521                         if err != nil {
522                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
523                                 continue
524                         }
525                         blocksDeleted++
526                         bytesDeleted += b.Properties.ContentLength
527                 }
528                 if resp.NextMarker == "" {
529                         break
530                 }
531                 params.Marker = resp.NextMarker
532         }
533
534         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
535 }