Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / services / datamanager / datamanager_test.go
index 28faf989ce956c667e51fd00fc19954e18e91794..7a8fff5c32a30d3a79926305df5db1ef81e48f6a 100644 (file)
@@ -12,13 +12,14 @@ import (
        "net/http"
        "os"
        "os/exec"
+       "path"
        "regexp"
        "strings"
        "testing"
        "time"
 )
 
-var arv arvadosclient.ArvadosClient
+var arv *arvadosclient.ArvadosClient
 var keepClient *keepclient.KeepClient
 var keepServers []string
 
@@ -39,9 +40,8 @@ func SetupDataManagerTest(t *testing.T) {
 
        // keep client
        keepClient = &keepclient.KeepClient{
-               Arvados:       &arv,
+               Arvados:       arv,
                Want_replicas: 2,
-               Using_proxy:   true,
                Client:        &http.Client{},
        }
 
@@ -539,11 +539,31 @@ func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
 }
 
 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
-       testOldBlocksNotDeletedOnDataManagerError(t, "/badwritetofile", "", true, true)
+       badpath, err := arvadostest.CreateBadPath()
+       if err != nil {
+               t.Fatalf(err.Error())
+       }
+       defer func() {
+               err = arvadostest.DestroyBadPath(badpath)
+               if err != nil {
+                       t.Fatalf(err.Error())
+               }
+       }()
+       testOldBlocksNotDeletedOnDataManagerError(t, path.Join(badpath, "writetofile"), "", true, true)
 }
 
 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
-       testOldBlocksNotDeletedOnDataManagerError(t, "", "/badheapprofilefile", true, true)
+       badpath, err := arvadostest.CreateBadPath()
+       if err != nil {
+               t.Fatalf(err.Error())
+       }
+       defer func() {
+               err = arvadostest.DestroyBadPath(badpath)
+               if err != nil {
+                       t.Fatalf(err.Error())
+               }
+       }()
+       testOldBlocksNotDeletedOnDataManagerError(t, "", path.Join(badpath, "heapprofilefile"), true, true)
 }
 
 // Create some blocks and backdate some of them.
@@ -614,7 +634,7 @@ func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, num
        }
 
        var locs []string
-       for k, _ := range locators {
+       for k := range locators {
                locs = append(locs, k)
        }
 
@@ -625,15 +645,31 @@ func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, num
 // Also, create stray block and backdate it.
 // After datamanager run: expect blocks from the collection, but not the stray block.
 func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
+       testManifestWithMultipleStreamsAndBlocks(t, 100, 10, "", false)
+}
+
+// Same test as TestManifestWithMultipleStreamsAndBlocks with an additional
+// keepstore of a service type other than "disk". Only the "disk" type services
+// will be indexed by datamanager and hence should work the same way.
+func TestManifestWithMultipleStreamsAndBlocks_WithOneUnsupportedKeepServer(t *testing.T) {
+       testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "testblobstore", false)
+}
+
+// Test datamanager with dry-run. Expect no block to be deleted.
+func TestManifestWithMultipleStreamsAndBlocks_DryRun(t *testing.T) {
+       testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "", true)
+}
+
+func testManifestWithMultipleStreamsAndBlocks(t *testing.T, numStreams, numBlocks int, createExtraKeepServerWithType string, isDryRun bool) {
        defer TearDownDataManagerTest(t)
        SetupDataManagerTest(t)
 
        // create collection whose blocks will be backdated
-       collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", 100, 10)
+       collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", numStreams, numBlocks)
        if collectionWithOldBlocks == "" {
-               t.Fatalf("Failed to create collection with 1000 blocks")
+               t.Fatalf("Failed to create collection with %d blocks", numStreams*numBlocks)
        }
-       if len(oldBlocks) != 1000 {
+       if len(oldBlocks) != numStreams*numBlocks {
                t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
        }
 
@@ -650,9 +686,47 @@ func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
        // also backdate the stray old block
        backdateBlocks(t, []string{strayOldBlock})
 
+       // If requested, create an extra keepserver with the given type
+       // This should be ignored during indexing and hence not change the datamanager outcome
+       var extraKeepServerUUID string
+       if createExtraKeepServerWithType != "" {
+               extraKeepServerUUID = addExtraKeepServer(t, createExtraKeepServerWithType)
+               defer deleteExtraKeepServer(extraKeepServerUUID)
+       }
+
        // run datamanager
+       dryRun = isDryRun
        dataManagerSingleRun(t)
 
-       // verify that strayOldBlock is not to be found, but the collections blocks are still there
-       verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
+       if dryRun {
+               // verify that all blocks, including strayOldBlock, are still to be found
+               verifyBlocks(t, nil, expected, 2)
+       } else {
+               // verify that strayOldBlock is not to be found, but the collections blocks are still there
+               verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
+       }
+}
+
+// Add one more keepstore with the given service type
+func addExtraKeepServer(t *testing.T, serviceType string) string {
+       defer switchToken(arvadostest.AdminToken)()
+
+       extraKeepService := make(arvadosclient.Dict)
+       err := arv.Create("keep_services",
+               arvadosclient.Dict{"keep_service": arvadosclient.Dict{
+                       "service_host":     "localhost",
+                       "service_port":     "21321",
+                       "service_ssl_flag": false,
+                       "service_type":     serviceType}},
+               &extraKeepService)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       return extraKeepService["uuid"].(string)
+}
+
+func deleteExtraKeepServer(uuid string) {
+       defer switchToken(arvadostest.AdminToken)()
+       arv.Delete("keep_services", uuid, nil, nil)
 }