+
+// Create a collection with multiple streams and blocks
+func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, numBlocks int) (string, []string) {
+ defer switchToken(arvadostest.AdminToken)()
+
+ manifest := ""
+ locators := make(map[string]bool)
+ for s := 0; s < numStreams; s++ {
+ manifest += fmt.Sprintf("./stream%d ", s)
+ for b := 0; b < numBlocks; b++ {
+ locator, _, err := keepClient.PutB([]byte(fmt.Sprintf("%s in stream %d and block %d", data, s, b)))
+ if err != nil {
+ t.Fatalf("Error creating block %d in stream %d: %v", b, s, err)
+ }
+ locators[strings.Split(locator, "+A")[0]] = true
+ manifest += locator + " "
+ }
+ manifest += "0:1:dummyfile.txt\n"
+ }
+
+ collection := make(Dict)
+ err := arv.Create("collections",
+ arvadosclient.Dict{"collection": arvadosclient.Dict{"manifest_text": manifest}},
+ &collection)
+
+ if err != nil {
+ t.Fatalf("Error creating collection %v", err)
+ }
+
+ var locs []string
+ for k := range locators {
+ locs = append(locs, k)
+ }
+
+ return collection["uuid"].(string), locs
+}
+
+// Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
+// Also, create stray block and backdate it.
+// After datamanager run: expect blocks from the collection, but not the stray block.
+func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
+ testManifestWithMultipleStreamsAndBlocks(t, 100, 10, "", false)
+}
+
+// Same test as TestManifestWithMultipleStreamsAndBlocks with an additional
+// keepstore of a service type other than "disk". Only the "disk" type services
+// will be indexed by datamanager and hence should work the same way.
+func TestManifestWithMultipleStreamsAndBlocks_WithOneUnsupportedKeepServer(t *testing.T) {
+ testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "testblobstore", false)
+}
+
+// Test datamanager with dry-run. Expect no block to be deleted.
+func TestManifestWithMultipleStreamsAndBlocks_DryRun(t *testing.T) {
+ testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "", true)
+}
+
+func testManifestWithMultipleStreamsAndBlocks(t *testing.T, numStreams, numBlocks int, createExtraKeepServerWithType string, isDryRun bool) {
+ defer TearDownDataManagerTest(t)
+ SetupDataManagerTest(t)
+
+ // create collection whose blocks will be backdated
+ collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", numStreams, numBlocks)
+ if collectionWithOldBlocks == "" {
+ t.Fatalf("Failed to create collection with %d blocks", numStreams*numBlocks)
+ }
+ if len(oldBlocks) != numStreams*numBlocks {
+ t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
+ }
+
+ // create a stray block that will be backdated
+ strayOldBlock := putBlock(t, "this stray block is old")
+
+ expected := []string{strayOldBlock}
+ expected = append(expected, oldBlocks...)
+ verifyBlocks(t, nil, expected, 2)
+
+ // Backdate old blocks; but the collection still references these blocks
+ backdateBlocks(t, oldBlocks)
+
+ // also backdate the stray old block
+ backdateBlocks(t, []string{strayOldBlock})
+
+ // If requested, create an extra keepserver with the given type
+ // This should be ignored during indexing and hence not change the datamanager outcome
+ var extraKeepServerUUID string
+ if createExtraKeepServerWithType != "" {
+ extraKeepServerUUID = addExtraKeepServer(t, createExtraKeepServerWithType)
+ defer deleteExtraKeepServer(extraKeepServerUUID)
+ }
+
+ // run datamanager
+ dryRun = isDryRun
+ dataManagerSingleRun(t)
+
+ if dryRun {
+ // verify that all blocks, including strayOldBlock, are still to be found
+ verifyBlocks(t, nil, expected, 2)
+ } else {
+ // verify that strayOldBlock is not to be found, but the collections blocks are still there
+ verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
+ }
+}
+
+// Add one more keepstore with the given service type
+func addExtraKeepServer(t *testing.T, serviceType string) string {
+ defer switchToken(arvadostest.AdminToken)()
+
+ extraKeepService := make(arvadosclient.Dict)
+ err := arv.Create("keep_services",
+ arvadosclient.Dict{"keep_service": arvadosclient.Dict{
+ "service_host": "localhost",
+ "service_port": "21321",
+ "service_ssl_flag": false,
+ "service_type": serviceType}},
+ &extraKeepService)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ return extraKeepService["uuid"].(string)
+}
+
+func deleteExtraKeepServer(uuid string) {
+ defer switchToken(arvadostest.AdminToken)()
+ arv.Delete("keep_services", uuid, nil, nil)
+}