Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / services / datamanager / datamanager_test.go
index 5ed2b4fb56a076a23e9fd2809de62371aeb50860..7a8fff5c32a30d3a79926305df5db1ef81e48f6a 100644 (file)
@@ -12,13 +12,14 @@ import (
        "net/http"
        "os"
        "os/exec"
+       "path"
        "regexp"
        "strings"
        "testing"
        "time"
 )
 
-var arv arvadosclient.ArvadosClient
+var arv *arvadosclient.ArvadosClient
 var keepClient *keepclient.KeepClient
 var keepServers []string
 
@@ -39,9 +40,8 @@ func SetupDataManagerTest(t *testing.T) {
 
        // keep client
        keepClient = &keepclient.KeepClient{
-               Arvados:       &arv,
+               Arvados:       arv,
                Want_replicas: 2,
-               Using_proxy:   true,
                Client:        &http.Client{},
        }
 
@@ -258,13 +258,10 @@ func valueInArray(value string, list []string) bool {
        return false
 }
 
-/*
-Test env uses two keep volumes. The volume names can be found by reading the files
-  ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
-
-The keep volumes are of the dir structure:
-  volumeN/subdir/locator
-*/
+// Test env uses two keep volumes. The volume names can be found by reading the files
+// ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
+//
+// The keep volumes are of the dir structure: volumeN/subdir/locator
 func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
        // First get rid of any size hints in the locators
        var trimmedBlockLocators []string
@@ -346,11 +343,9 @@ func waitUntilQueuesFinishWork(t *testing.T) {
        }
 }
 
-/*
-Create some blocks and backdate some of them.
-Also create some collections and delete some of them.
-Verify block indexes.
-*/
+// Create some blocks and backdate some of them.
+// Also create some collections and delete some of them.
+// Verify block indexes.
 func TestPutAndGetBlocks(t *testing.T) {
        defer TearDownDataManagerTest(t)
        SetupDataManagerTest(t)
@@ -544,18 +539,36 @@ func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
 }
 
 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
-       testOldBlocksNotDeletedOnDataManagerError(t, "/badwritetofile", "", true, true)
+       badpath, err := arvadostest.CreateBadPath()
+       if err != nil {
+               t.Fatalf(err.Error())
+       }
+       defer func() {
+               err = arvadostest.DestroyBadPath(badpath)
+               if err != nil {
+                       t.Fatalf(err.Error())
+               }
+       }()
+       testOldBlocksNotDeletedOnDataManagerError(t, path.Join(badpath, "writetofile"), "", true, true)
 }
 
 func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
-       testOldBlocksNotDeletedOnDataManagerError(t, "", "/badheapprofilefile", true, true)
+       badpath, err := arvadostest.CreateBadPath()
+       if err != nil {
+               t.Fatalf(err.Error())
+       }
+       defer func() {
+               err = arvadostest.DestroyBadPath(badpath)
+               if err != nil {
+                       t.Fatalf(err.Error())
+               }
+       }()
+       testOldBlocksNotDeletedOnDataManagerError(t, "", path.Join(badpath, "heapprofilefile"), true, true)
 }
 
-/*
-  Create some blocks and backdate some of them.
-  Run datamanager while producing an error condition.
-  Verify that the blocks are hence not deleted.
-*/
+// Create some blocks and backdate some of them.
+// Run datamanager while producing an error condition.
+// Verify that the blocks are hence not deleted.
 func testOldBlocksNotDeletedOnDataManagerError(t *testing.T, writeDataTo string, heapProfileFile string, expectError bool, expectOldBlocks bool) {
        defer TearDownDataManagerTest(t)
        SetupDataManagerTest(t)
@@ -621,28 +634,42 @@ func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, num
        }
 
        var locs []string
-       for k, _ := range locators {
+       for k := range locators {
                locs = append(locs, k)
        }
 
        return collection["uuid"].(string), locs
 }
 
-/*
-  Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
-  Also, create stray block and backdate it.
-  After datamanager run: expect blocks from the collection, but not the stray block.
-*/
-func TestPutAndGetCollectionsWithMultipleStreamsAndBlocks(t *testing.T) {
+// Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
+// Also, create stray block and backdate it.
+// After datamanager run: expect blocks from the collection, but not the stray block.
+func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
+       testManifestWithMultipleStreamsAndBlocks(t, 100, 10, "", false)
+}
+
+// Same test as TestManifestWithMultipleStreamsAndBlocks with an additional
+// keepstore of a service type other than "disk". Only the "disk" type services
+// will be indexed by datamanager and hence should work the same way.
+func TestManifestWithMultipleStreamsAndBlocks_WithOneUnsupportedKeepServer(t *testing.T) {
+       testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "testblobstore", false)
+}
+
+// Test datamanager with dry-run. Expect no block to be deleted.
+func TestManifestWithMultipleStreamsAndBlocks_DryRun(t *testing.T) {
+       testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "", true)
+}
+
+func testManifestWithMultipleStreamsAndBlocks(t *testing.T, numStreams, numBlocks int, createExtraKeepServerWithType string, isDryRun bool) {
        defer TearDownDataManagerTest(t)
        SetupDataManagerTest(t)
 
        // create collection whose blocks will be backdated
-       collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", 100, 10)
+       collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", numStreams, numBlocks)
        if collectionWithOldBlocks == "" {
-               t.Fatalf("Failed to create collection with 1000 blocks")
+               t.Fatalf("Failed to create collection with %d blocks", numStreams*numBlocks)
        }
-       if len(oldBlocks) != 1000 {
+       if len(oldBlocks) != numStreams*numBlocks {
                t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
        }
 
@@ -659,9 +686,47 @@ func TestPutAndGetCollectionsWithMultipleStreamsAndBlocks(t *testing.T) {
        // also backdate the stray old block
        backdateBlocks(t, []string{strayOldBlock})
 
+       // If requested, create an extra keepserver with the given type
+       // This should be ignored during indexing and hence not change the datamanager outcome
+       var extraKeepServerUUID string
+       if createExtraKeepServerWithType != "" {
+               extraKeepServerUUID = addExtraKeepServer(t, createExtraKeepServerWithType)
+               defer deleteExtraKeepServer(extraKeepServerUUID)
+       }
+
        // run datamanager
+       dryRun = isDryRun
        dataManagerSingleRun(t)
 
-       // verify that strayOldBlock is not to be found, but the collections blocks are still there
-       verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
+       if dryRun {
+               // verify that all blocks, including strayOldBlock, are still to be found
+               verifyBlocks(t, nil, expected, 2)
+       } else {
+               // verify that strayOldBlock is not to be found, but the collections blocks are still there
+               verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
+       }
+}
+
+// Add one more keepstore with the given service type
+func addExtraKeepServer(t *testing.T, serviceType string) string {
+       defer switchToken(arvadostest.AdminToken)()
+
+       extraKeepService := make(arvadosclient.Dict)
+       err := arv.Create("keep_services",
+               arvadosclient.Dict{"keep_service": arvadosclient.Dict{
+                       "service_host":     "localhost",
+                       "service_port":     "21321",
+                       "service_ssl_flag": false,
+                       "service_type":     serviceType}},
+               &extraKeepService)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       return extraKeepService["uuid"].(string)
+}
+
+func deleteExtraKeepServer(uuid string) {
+       defer switchToken(arvadostest.AdminToken)()
+       arv.Delete("keep_services", uuid, nil, nil)
 }