19 "git.curoverse.com/arvados.git/sdk/go/arvados"
20 log "github.com/Sirupsen/logrus"
21 "github.com/curoverse/azure-sdk-for-go/storage"
24 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
28 azureStorageAccountName string
29 azureStorageAccountKeyFile string
30 azureStorageReplication int
31 azureWriteRaceInterval = 15 * time.Second
32 azureWriteRacePollTime = time.Second
35 func readKeyFromFile(file string) (string, error) {
36 buf, err := ioutil.ReadFile(file)
38 return "", errors.New("reading key from " + file + ": " + err.Error())
40 accountKey := strings.TrimSpace(string(buf))
42 return "", errors.New("empty account key in " + file)
44 return accountKey, nil
47 type azureVolumeAdder struct {
51 // String implements flag.Value
52 func (s *azureVolumeAdder) String() string {
56 func (s *azureVolumeAdder) Set(containerName string) error {
57 s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
58 ContainerName: containerName,
59 StorageAccountName: azureStorageAccountName,
60 StorageAccountKeyFile: azureStorageAccountKeyFile,
61 AzureReplication: azureStorageReplication,
62 ReadOnly: deprecated.flagReadonly,
68 VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
70 flag.Var(&azureVolumeAdder{theConfig},
71 "azure-storage-container-volume",
72 "Use the given container as a storage volume. Can be given multiple times.")
74 &azureStorageAccountName,
75 "azure-storage-account-name",
77 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
79 &azureStorageAccountKeyFile,
80 "azure-storage-account-key-file",
82 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
84 &azureStorageReplication,
85 "azure-storage-replication",
87 "Replication level to report to clients when data is stored in an Azure container.")
90 "azure-max-get-bytes",
92 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
95 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
97 type AzureBlobVolume struct {
98 StorageAccountName string
99 StorageAccountKeyFile string
100 StorageBaseURL string // "" means default, "core.windows.net"
104 RequestTimeout arvados.Duration
106 azClient storage.Client
107 bsClient *azureBlobClient
110 // Examples implements VolumeWithExamples.
111 func (*AzureBlobVolume) Examples() []Volume {
114 StorageAccountName: "example-account-name",
115 StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
116 ContainerName: "example-container-name",
118 RequestTimeout: azureDefaultRequestTimeout,
121 StorageAccountName: "cn-account-name",
122 StorageAccountKeyFile: "/etc/azure_cn_storage_account_key.txt",
123 StorageBaseURL: "core.chinacloudapi.cn",
124 ContainerName: "cn-container-name",
126 RequestTimeout: azureDefaultRequestTimeout,
131 // Type implements Volume.
132 func (v *AzureBlobVolume) Type() string {
136 // Start implements Volume.
137 func (v *AzureBlobVolume) Start() error {
138 if v.ContainerName == "" {
139 return errors.New("no container name given")
141 if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
142 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
144 accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
148 if v.StorageBaseURL == "" {
149 v.StorageBaseURL = storage.DefaultBaseURL
151 v.azClient, err = storage.NewClient(v.StorageAccountName, accountKey, v.StorageBaseURL, storage.DefaultAPIVersion, true)
153 return fmt.Errorf("creating Azure storage client: %s", err)
156 if v.RequestTimeout == 0 {
157 v.RequestTimeout = azureDefaultRequestTimeout
159 v.azClient.HTTPClient = &http.Client{
160 Timeout: time.Duration(v.RequestTimeout),
162 bs := v.azClient.GetBlobService()
163 v.bsClient = &azureBlobClient{
167 ok, err := v.bsClient.ContainerExists(v.ContainerName)
172 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
177 // Return true if expires_at metadata attribute is found on the block
178 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
179 metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
181 return false, metadata, v.translateError(err)
183 if metadata["expires_at"] != "" {
184 return true, metadata, nil
186 return false, metadata, nil
189 // Get reads a Keep block that has been stored as a block blob in the
192 // If the block is younger than azureWriteRaceInterval and is
193 // unexpectedly empty, assume a PutBlob operation is in progress, and
194 // wait for it to finish writing.
195 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
196 trashed, _, err := v.checkTrashed(loc)
201 return 0, os.ErrNotExist
203 var deadline time.Time
204 haveDeadline := false
205 size, err := v.get(ctx, loc, buf)
206 for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
207 // Seeing a brand new empty block probably means we're
208 // in a race with CreateBlob, which under the hood
209 // (apparently) does "CreateEmpty" and "CommitData"
210 // with no additional transaction locking.
212 t, err := v.Mtime(loc)
214 log.Print("Got empty block (possible race) but Mtime failed: ", err)
217 deadline = t.Add(azureWriteRaceInterval)
218 if time.Now().After(deadline) {
221 log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
223 } else if time.Now().After(deadline) {
229 case <-time.After(azureWriteRacePollTime):
231 size, err = v.get(ctx, loc, buf)
234 log.Printf("Race ended with size==%d", size)
239 func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
240 ctx, cancel := context.WithCancel(ctx)
242 expectSize := len(buf)
243 if azureMaxGetBytes < BlockSize {
244 // Unfortunately the handler doesn't tell us how long the blob
245 // is expected to be, so we have to ask Azure.
246 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
248 return 0, v.translateError(err)
250 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
251 return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
253 expectSize = int(props.ContentLength)
260 // We'll update this actualSize if/when we get the last piece.
262 pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
263 errors := make(chan error, pieces)
264 var wg sync.WaitGroup
266 for p := 0; p < pieces; p++ {
267 // Each goroutine retrieves one piece. If we hit an
268 // error, it is sent to the errors chan so get() can
269 // return it -- but only if the error happens before
270 // ctx is done. This way, if ctx is done before we hit
271 // any other error (e.g., requesting client has hung
272 // up), we return the original ctx.Err() instead of
273 // the secondary errors from the transfers that got
274 // interrupted as a result.
277 startPos := p * azureMaxGetBytes
278 endPos := startPos + azureMaxGetBytes
279 if endPos > expectSize {
282 var rdr io.ReadCloser
284 gotRdr := make(chan struct{})
287 if startPos == 0 && endPos == expectSize {
288 rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
290 rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
310 // Close the reader when the client
311 // hangs up or another piece fails
312 // (possibly interrupting ReadFull())
313 // or when all pieces succeed and
318 n, err := io.ReadFull(rdr, buf[startPos:endPos])
319 if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
320 // If we don't know the actual size,
321 // and just tried reading 64 MiB, it's
322 // normal to encounter EOF.
323 } else if err != nil {
324 if ctx.Err() == nil {
331 actualSize = startPos + n
338 return 0, v.translateError(<-errors)
340 if ctx.Err() != nil {
343 return actualSize, nil
346 // Compare the given data with existing stored data.
347 func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
348 trashed, _, err := v.checkTrashed(loc)
353 return os.ErrNotExist
355 var rdr io.ReadCloser
356 gotRdr := make(chan struct{})
359 rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
373 return v.translateError(err)
376 return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
379 // Put stores a Keep block as a block blob in the container.
380 func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
382 return MethodDisabledError
384 // Send the block data through a pipe, so that (if we need to)
385 // we can close the pipe early and abandon our
386 // CreateBlockBlobFromReader() goroutine, without worrying
387 // about CreateBlockBlobFromReader() accessing our block
388 // buffer after we release it.
389 bufr, bufw := io.Pipe()
391 io.Copy(bufw, bytes.NewReader(block))
394 errChan := make(chan error)
396 errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bufr, nil)
400 theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
401 // Our pipe might be stuck in Write(), waiting for
402 // io.Copy() to read. If so, un-stick it. This means
403 // CreateBlockBlobFromReader will get corrupt data,
404 // but that's OK: the size won't match, so the write
406 go io.Copy(ioutil.Discard, bufr)
407 // CloseWithError() will return once pending I/O is done.
408 bufw.CloseWithError(ctx.Err())
409 theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
411 case err := <-errChan:
416 // Touch updates the last-modified property of a block blob.
417 func (v *AzureBlobVolume) Touch(loc string) error {
419 return MethodDisabledError
421 trashed, metadata, err := v.checkTrashed(loc)
426 return os.ErrNotExist
429 metadata["touch"] = fmt.Sprintf("%d", time.Now())
430 return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
433 // Mtime returns the last-modified property of a block blob.
434 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
435 trashed, _, err := v.checkTrashed(loc)
437 return time.Time{}, err
440 return time.Time{}, os.ErrNotExist
443 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
445 return time.Time{}, err
447 return time.Parse(time.RFC1123, props.LastModified)
450 // IndexTo writes a list of Keep blocks that are stored in the
452 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
453 params := storage.ListBlobsParameters{
458 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
462 for _, b := range resp.Blobs {
463 t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
467 if !v.isKeepBlock(b.Name) {
470 if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
471 // A new zero-length blob is probably
472 // just a new non-empty blob that
473 // hasn't committed its data yet (see
474 // Get()), and in any case has no
478 if b.Metadata["expires_at"] != "" {
479 // Trashed blob; exclude it from response
482 fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
484 if resp.NextMarker == "" {
487 params.Marker = resp.NextMarker
491 // Trash a Keep block.
492 func (v *AzureBlobVolume) Trash(loc string) error {
494 return MethodDisabledError
497 // Ideally we would use If-Unmodified-Since, but that
498 // particular condition seems to be ignored by Azure. Instead,
499 // we get the Etag before checking Mtime, and use If-Match to
500 // ensure we don't delete data if Put() or Touch() happens
501 // between our calls to Mtime() and DeleteBlob().
502 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
506 if t, err := v.Mtime(loc); err != nil {
508 } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
512 // If TrashLifetime == 0, just delete it
513 if theConfig.TrashLifetime == 0 {
514 return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
515 "If-Match": props.Etag,
519 // Otherwise, mark as trash
520 return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
521 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
522 }, map[string]string{
523 "If-Match": props.Etag,
527 // Untrash a Keep block.
528 // Delete the expires_at metadata attribute
529 func (v *AzureBlobVolume) Untrash(loc string) error {
530 // if expires_at does not exist, return NotFoundError
531 metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
533 return v.translateError(err)
535 if metadata["expires_at"] == "" {
536 return os.ErrNotExist
539 // reset expires_at metadata attribute
540 metadata["expires_at"] = ""
541 err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
542 return v.translateError(err)
545 // Status returns a VolumeStatus struct with placeholder data.
546 func (v *AzureBlobVolume) Status() *VolumeStatus {
547 return &VolumeStatus{
549 BytesFree: BlockSize * 1000,
554 // String returns a volume label, including the container name.
555 func (v *AzureBlobVolume) String() string {
556 return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
559 // Writable returns true, unless the -readonly flag was on when the
561 func (v *AzureBlobVolume) Writable() bool {
565 // Replication returns the replication level of the container, as
566 // specified by the -azure-storage-replication argument.
567 func (v *AzureBlobVolume) Replication() int {
568 return v.AzureReplication
571 // If possible, translate an Azure SDK error to a recognizable error
572 // like os.ErrNotExist.
573 func (v *AzureBlobVolume) translateError(err error) error {
577 case strings.Contains(err.Error(), "Not Found"):
578 // "storage: service returned without a response body (404 Not Found)"
579 return os.ErrNotExist
585 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
587 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
588 return keepBlockRegexp.MatchString(s)
591 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
592 // and deletes them from the volume.
593 func (v *AzureBlobVolume) EmptyTrash() {
594 var bytesDeleted, bytesInTrash int64
595 var blocksDeleted, blocksInTrash int
596 params := storage.ListBlobsParameters{Include: "metadata"}
599 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
601 log.Printf("EmptyTrash: ListBlobs: %v", err)
604 for _, b := range resp.Blobs {
605 // Check if the block is expired
606 if b.Metadata["expires_at"] == "" {
611 bytesInTrash += b.Properties.ContentLength
613 expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
615 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
619 if expiresAt > time.Now().Unix() {
623 err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
624 "If-Match": b.Properties.Etag,
627 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
631 bytesDeleted += b.Properties.ContentLength
633 if resp.NextMarker == "" {
636 params.Marker = resp.NextMarker
639 log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
642 // InternalStats returns bucket I/O and API call counters.
643 func (v *AzureBlobVolume) InternalStats() interface{} {
644 return &v.bsClient.stats
647 type azureBlobStats struct {
652 GetMetadataOps uint64
653 GetPropertiesOps uint64
655 SetMetadataOps uint64
660 func (s *azureBlobStats) TickErr(err error) {
664 errType := fmt.Sprintf("%T", err)
665 if err, ok := err.(storage.AzureStorageServiceError); ok {
666 errType = errType + fmt.Sprintf(" %d (%s)", err.StatusCode, err.Code)
668 log.Printf("errType %T, err %s", err, err)
669 s.statsTicker.TickErr(err, errType)
672 // azureBlobClient wraps storage.BlobStorageClient in order to count
673 // I/O and API usage stats.
674 type azureBlobClient struct {
675 client *storage.BlobStorageClient
679 func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
680 c.stats.Tick(&c.stats.Ops)
681 ok, err := c.client.ContainerExists(cname)
686 func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
687 c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
688 m, err := c.client.GetBlobMetadata(cname, bname)
693 func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
694 c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
695 p, err := c.client.GetBlobProperties(cname, bname)
700 func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
701 c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
702 rdr, err := c.client.GetBlob(cname, bname)
704 return NewCountingReader(rdr, c.stats.TickInBytes), err
707 func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
708 c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
709 rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
711 return NewCountingReader(rdr, c.stats.TickInBytes), err
714 func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
715 c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
716 rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
717 err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
722 func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
723 c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
724 err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
729 func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
730 c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
731 resp, err := c.client.ListBlobs(cname, params)
736 func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
737 c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
738 err := c.client.DeleteBlob(cname, bname, hdrs)