Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / s3_volume_test.go
index b8c4458a5b363626289d2718a2507a450da18a5c..a82098356859cb3cc481d20df453efb97e1726d0 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "bytes"
@@ -10,15 +10,15 @@ import (
        "crypto/md5"
        "encoding/json"
        "fmt"
-       "log"
+       "io"
        "net/http"
        "net/http/httptest"
        "os"
        "strings"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
        "github.com/prometheus/client_golang/prometheus"
@@ -45,6 +45,7 @@ var _ = check.Suite(&StubbedS3Suite{})
 
 type StubbedS3Suite struct {
        s3server *httptest.Server
+       metadata *httptest.Server
        cluster  *arvados.Cluster
        handler  *handler
        volumes  []*TestableS3Volume
@@ -52,6 +53,7 @@ type StubbedS3Suite struct {
 
 func (s *StubbedS3Suite) SetUpTest(c *check.C) {
        s.s3server = nil
+       s.metadata = nil
        s.cluster = testCluster(c)
        s.cluster.Volumes = map[string]arvados.Volume{
                "zzzzz-nyw5e-000000000000000": {Driver: "S3"},
@@ -74,6 +76,14 @@ func (s *StubbedS3Suite) TestGenericReadOnly(c *check.C) {
        })
 }
 
+func (s *StubbedS3Suite) TestGenericWithPrefix(c *check.C) {
+       DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+               v := s.newTestableVolume(c, cluster, volume, metrics, -2*time.Second)
+               v.PrefixLength = 3
+               return v
+       })
+}
+
 func (s *StubbedS3Suite) TestIndex(c *check.C) {
        v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 0)
        v.IndexPageSize = 3
@@ -99,6 +109,89 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
        }
 }
 
+func (s *StubbedS3Suite) TestSignatureVersion(c *check.C) {
+       var header http.Header
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               header = r.Header
+       }))
+       defer stub.Close()
+
+       // Default V4 signature
+       vol := S3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       AccessKeyID:     "xxx",
+                       SecretAccessKey: "xxx",
+                       Endpoint:        stub.URL,
+                       Region:          "test-region-1",
+                       Bucket:          "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := vol.check()
+       c.Check(err, check.IsNil)
+       err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(header.Get("Authorization"), check.Matches, `AWS4-HMAC-SHA256 .*`)
+
+       // Force V2 signature
+       vol = S3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       AccessKeyID:     "xxx",
+                       SecretAccessKey: "xxx",
+                       Endpoint:        stub.URL,
+                       Region:          "test-region-1",
+                       Bucket:          "test-bucket-name",
+                       V2Signature:     true,
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err = vol.check()
+       c.Check(err, check.IsNil)
+       err = vol.Put(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(header.Get("Authorization"), check.Matches, `AWS xxx:.*`)
+}
+
+func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
+       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
+               exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
+               // Literal example from
+               // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+               // but with updated timestamps
+               io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+       }))
+       defer s.metadata.Close()
+
+       v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+       c.Check(v.AccessKeyID, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+       c.Check(v.SecretAccessKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+       c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+       c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+
+       s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               w.WriteHeader(http.StatusNotFound)
+       }))
+       deadv := &S3Volume{
+               S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                       IAMRole:  s.metadata.URL + "/fake-metadata/test-role",
+                       Endpoint: "http://localhost:12345",
+                       Region:   "test-region-1",
+                       Bucket:   "test-bucket-name",
+               },
+               cluster: s.cluster,
+               logger:  ctxlog.TestLogger(c),
+               metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+       }
+       err := deadv.check()
+       c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
+       c.Check(err, check.ErrorMatches, `.*404.*`)
+}
+
 func (s *StubbedS3Suite) TestStats(c *check.C) {
        v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
        stats := func() string {
@@ -229,7 +322,7 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        return
                }
                v.serverClock.now = &t
-               v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+               v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
        }
 
        t0 := time.Now()
@@ -316,12 +409,12 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, false, false,
                },
                {
-                       "Erroneously trashed during a race, detected before TrashLifetime",
+                       "Erroneously trashed during a race, detected before BlobTrashLifetime",
                        none, t0.Add(-30 * time.Minute), t0.Add(-29 * time.Minute),
                        true, false, true, true, true, false,
                },
                {
-                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching TrashLifetime",
+                       "Erroneously trashed during a race, rescue during EmptyTrash despite reaching BlobTrashLifetime",
                        none, t0.Add(-90 * time.Minute), t0.Add(-89 * time.Minute),
                        true, false, true, true, true, false,
                },
@@ -331,81 +424,88 @@ func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
                        false, false, false, true, true, true,
                },
        } {
-               c.Log("Scenario: ", scenario.label)
-
-               // We have a few tests to run for each scenario, and
-               // the tests are expected to change state. By calling
-               // this setup func between tests, we (re)create the
-               // scenario as specified, using a new unique block
-               // locator to prevent interference from previous
-               // tests.
-
-               setupScenario := func() (string, []byte) {
-                       nextKey++
-                       blk := []byte(fmt.Sprintf("%d", nextKey))
-                       loc := fmt.Sprintf("%x", md5.Sum(blk))
-                       c.Log("\t", loc)
-                       putS3Obj(scenario.dataT, loc, blk)
-                       putS3Obj(scenario.recentT, "recent/"+loc, nil)
-                       putS3Obj(scenario.trashT, "trash/"+loc, blk)
-                       v.serverClock.now = &t0
-                       return loc, blk
-               }
-
-               // Check canGet
-               loc, blk := setupScenario()
-               buf := make([]byte, len(blk))
-               _, err := v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGet)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Trash, then check canTrash and canGetAfterTrash
-               loc, _ = setupScenario()
-               err = v.Trash(loc)
-               c.Check(err == nil, check.Equals, scenario.canTrash)
-               _, err = v.Get(context.Background(), loc, buf)
-               c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
-               if err != nil {
-                       c.Check(os.IsNotExist(err), check.Equals, true)
-               }
-
-               // Call Untrash, then check canUntrash
-               loc, _ = setupScenario()
-               err = v.Untrash(loc)
-               c.Check(err == nil, check.Equals, scenario.canUntrash)
-               if scenario.dataT != none || scenario.trashT != none {
-                       // In all scenarios where the data exists, we
-                       // should be able to Get after Untrash --
-                       // regardless of timestamps, errors, race
-                       // conditions, etc.
+               for _, prefixLength := range []int{0, 3} {
+                       v.PrefixLength = prefixLength
+                       c.Logf("Scenario: %q (prefixLength=%d)", scenario.label, prefixLength)
+
+                       // We have a few tests to run for each scenario, and
+                       // the tests are expected to change state. By calling
+                       // this setup func between tests, we (re)create the
+                       // scenario as specified, using a new unique block
+                       // locator to prevent interference from previous
+                       // tests.
+
+                       setupScenario := func() (string, []byte) {
+                               nextKey++
+                               blk := []byte(fmt.Sprintf("%d", nextKey))
+                               loc := fmt.Sprintf("%x", md5.Sum(blk))
+                               key := loc
+                               if prefixLength > 0 {
+                                       key = loc[:prefixLength] + "/" + loc
+                               }
+                               c.Log("\t", loc)
+                               putS3Obj(scenario.dataT, key, blk)
+                               putS3Obj(scenario.recentT, "recent/"+key, nil)
+                               putS3Obj(scenario.trashT, "trash/"+key, blk)
+                               v.serverClock.now = &t0
+                               return loc, blk
+                       }
+
+                       // Check canGet
+                       loc, blk := setupScenario()
+                       buf := make([]byte, len(blk))
+                       _, err := v.Get(context.Background(), loc, buf)
+                       c.Check(err == nil, check.Equals, scenario.canGet)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Trash, then check canTrash and canGetAfterTrash
+                       loc, _ = setupScenario()
+                       err = v.Trash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canTrash)
                        _, err = v.Get(context.Background(), loc, buf)
+                       c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
+                       if err != nil {
+                               c.Check(os.IsNotExist(err), check.Equals, true)
+                       }
+
+                       // Call Untrash, then check canUntrash
+                       loc, _ = setupScenario()
+                       err = v.Untrash(loc)
+                       c.Check(err == nil, check.Equals, scenario.canUntrash)
+                       if scenario.dataT != none || scenario.trashT != none {
+                               // In all scenarios where the data exists, we
+                               // should be able to Get after Untrash --
+                               // regardless of timestamps, errors, race
+                               // conditions, etc.
+                               _, err = v.Get(context.Background(), loc, buf)
+                               c.Check(err, check.IsNil)
+                       }
+
+                       // Call EmptyTrash, then check haveTrashAfterEmpty and
+                       // freshAfterEmpty
+                       loc, _ = setupScenario()
+                       v.EmptyTrash()
+                       _, err = v.bucket.Head("trash/"+v.key(loc), nil)
+                       c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
+                       if scenario.freshAfterEmpty {
+                               t, err := v.Mtime(loc)
+                               c.Check(err, check.IsNil)
+                               // new mtime must be current (with an
+                               // allowance for 1s timestamp precision)
+                               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
+                       }
+
+                       // Check for current Mtime after Put (applies to all
+                       // scenarios)
+                       loc, blk = setupScenario()
+                       err = v.Put(context.Background(), loc, blk)
                        c.Check(err, check.IsNil)
-               }
-
-               // Call EmptyTrash, then check haveTrashAfterEmpty and
-               // freshAfterEmpty
-               loc, _ = setupScenario()
-               v.EmptyTrash()
-               _, err = v.bucket.Head("trash/"+loc, nil)
-               c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
-               if scenario.freshAfterEmpty {
                        t, err := v.Mtime(loc)
                        c.Check(err, check.IsNil)
-                       // new mtime must be current (with an
-                       // allowance for 1s timestamp precision)
                        c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
                }
-
-               // Check for current Mtime after Put (applies to all
-               // scenarios)
-               loc, blk = setupScenario()
-               err = v.Put(context.Background(), loc, blk)
-               c.Check(err, check.IsNil)
-               t, err := v.Mtime(loc)
-               c.Check(err, check.IsNil)
-               c.Check(t.After(t0.Add(-time.Second)), check.Equals, true)
        }
 }
 
@@ -425,27 +525,35 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
                endpoint = s.s3server.URL
        }
 
+       iamRole, accessKey, secretKey := "", "xxx", "xxx"
+       if s.metadata != nil {
+               iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+       }
+
        v := &TestableS3Volume{
                S3Volume: &S3Volume{
-                       AccessKey:          "xxx",
-                       SecretKey:          "xxx",
-                       Bucket:             TestBucketName,
-                       Endpoint:           endpoint,
-                       Region:             "test-region-1",
-                       LocationConstraint: true,
-                       UnsafeDelete:       true,
-                       IndexPageSize:      1000,
-                       cluster:            cluster,
-                       volume:             volume,
-                       logger:             ctxlog.TestLogger(c),
-                       metrics:            metrics,
+                       S3VolumeDriverParameters: arvados.S3VolumeDriverParameters{
+                               IAMRole:            iamRole,
+                               AccessKeyID:        accessKey,
+                               SecretAccessKey:    secretKey,
+                               Bucket:             TestBucketName,
+                               Endpoint:           endpoint,
+                               Region:             "test-region-1",
+                               LocationConstraint: true,
+                               UnsafeDelete:       true,
+                               IndexPageSize:      1000,
+                       },
+                       cluster: cluster,
+                       volume:  volume,
+                       logger:  ctxlog.TestLogger(c),
+                       metrics: metrics,
                },
                c:           c,
                server:      srv,
                serverClock: clock,
        }
        c.Assert(v.S3Volume.check(), check.IsNil)
-       c.Assert(v.bucket.PutBucket(s3.ACL("private")), check.IsNil)
+       c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
        // We couldn't set RaceWindow until now because check()
        // rejects negative values.
        v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
@@ -454,13 +562,14 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
-       err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+       key := v.key(loc)
+       err := v.bucket.Bucket().Put(key, block, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
-               log.Printf("PutRaw: %s: %+v", loc, err)
+               v.logger.Printf("PutRaw: %s: %+v", loc, err)
        }
-       err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err = v.bucket.Bucket().Put("recent/"+key, nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
-               log.Printf("PutRaw: recent/%s: %+v", loc, err)
+               v.logger.Printf("PutRaw: recent/%s: %+v", key, err)
        }
 }
 
@@ -469,7 +578,7 @@ func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 // while we do this.
 func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
        v.serverClock.now = &lastPut
-       err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+       err := v.bucket.Bucket().Put("recent/"+v.key(locator), nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
                panic(err)
        }