21126: Add AllowTrashWhenReadOnly flag.
[arvados.git] / services / keepstore / azure_blob_volume.go
index b52b706b26bbcfd4cf43df4e9b74c919f9417768..56a52c913a196149d3d4bbe03ec1f8f382018b72 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "bytes"
@@ -21,8 +21,8 @@ import (
        "sync/atomic"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/Azure/azure-sdk-for-go/storage"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
@@ -34,7 +34,6 @@ func init() {
 
 func newAzureBlobVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
        v := &AzureBlobVolume{
-               StorageBaseURL:    storage.DefaultBaseURL,
                RequestTimeout:    azureDefaultRequestTimeout,
                WriteRaceInterval: azureDefaultWriteRaceInterval,
                WriteRacePollTime: azureDefaultWriteRacePollTime,
@@ -53,6 +52,9 @@ func newAzureBlobVolume(cluster *arvados.Cluster, volume arvados.Volume, logger
        if v.ListBlobsMaxAttempts == 0 {
                v.ListBlobsMaxAttempts = azureDefaultListBlobsMaxAttempts
        }
+       if v.StorageBaseURL == "" {
+               v.StorageBaseURL = storage.DefaultBaseURL
+       }
        if v.ContainerName == "" || v.StorageAccountName == "" || v.StorageAccountKey == "" {
                return nil, errors.New("DriverParameters: ContainerName, StorageAccountName, and StorageAccountKey must be provided")
        }
@@ -478,10 +480,9 @@ func (v *AzureBlobVolume) listBlobs(page int, params storage.ListBlobsParameters
 
 // Trash a Keep block.
 func (v *AzureBlobVolume) Trash(loc string) error {
-       if v.volume.ReadOnly {
+       if v.volume.ReadOnly && !v.volume.AllowTrashWhenReadOnly {
                return MethodDisabledError
        }
-
        // Ideally we would use If-Unmodified-Since, but that
        // particular condition seems to be ignored by Azure. Instead,
        // we get the Etag before checking Mtime, and use If-Match to
@@ -497,7 +498,7 @@ func (v *AzureBlobVolume) Trash(loc string) error {
                return nil
        }
 
-       // If TrashLifetime == 0, just delete it
+       // If BlobTrashLifetime == 0, just delete it
        if v.cluster.Collections.BlobTrashLifetime == 0 {
                return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
                        IfMatch: props.Etag,
@@ -556,6 +557,9 @@ func (v *AzureBlobVolume) translateError(err error) error {
        case strings.Contains(err.Error(), "Not Found"):
                // "storage: service returned without a response body (404 Not Found)"
                return os.ErrNotExist
+       case strings.Contains(err.Error(), "ErrorCode=BlobNotFound"):
+               // "storage: service returned error: StatusCode=404, ErrorCode=BlobNotFound, ErrorMessage=The specified blob does not exist.\n..."
+               return os.ErrNotExist
        default:
                return err
        }
@@ -567,13 +571,9 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
        return keepBlockRegexp.MatchString(s)
 }
 
-// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
+// EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
 // and deletes them from the volume.
 func (v *AzureBlobVolume) EmptyTrash() {
-       if v.cluster.Collections.BlobDeleteConcurrency < 1 {
-               return
-       }
-
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int64