From: radhika Date: Tue, 15 Sep 2015 16:24:30 +0000 (-0400) Subject: Merge branch 'master' into 6260-test-datamanager X-Git-Tag: 1.1.0~1354^2~1 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/558574e31f56db08c82ca7c2b955e74df04242dd?hp=3acd3536404f889e259ed3634015ed66b173ff3d Merge branch 'master' into 6260-test-datamanager Conflicts: services/keepstore/keepstore.go --- diff --git a/sdk/go/keepclient/keepclient_test.go b/sdk/go/keepclient/keepclient_test.go index c1f6a3e6f9..e4e459e83a 100644 --- a/sdk/go/keepclient/keepclient_test.go +++ b/sdk/go/keepclient/keepclient_test.go @@ -743,7 +743,7 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) { } { hash2, replicas, err := kc.PutB(content) - c.Check(hash2, Equals, fmt.Sprintf("%s+%d", hash, len(content))) + c.Check(hash2, Matches, fmt.Sprintf(`%s\+%d\b.*`, hash, len(content))) c.Check(replicas, Equals, 2) c.Check(err, Equals, nil) } diff --git a/sdk/python/tests/run_test_server.py b/sdk/python/tests/run_test_server.py index 1c5162b97d..5d0c42ad21 100644 --- a/sdk/python/tests/run_test_server.py +++ b/sdk/python/tests/run_test_server.py @@ -310,8 +310,9 @@ def _start_keep(n, keep_args): for arg, val in keep_args.iteritems(): keep_cmd.append("{}={}".format(arg, val)) + logf = open(os.path.join(TEST_TMPDIR, 'keep{}.log'.format(n)), 'a+') kp0 = subprocess.Popen( - keep_cmd, stdin=open('/dev/null'), stdout=sys.stderr) + keep_cmd, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True) with open(_pidfile('keep{}'.format(n)), 'w') as f: f.write(str(kp0.pid)) @@ -326,12 +327,17 @@ def run_keep(blob_signing_key=None, enforce_permissions=False): stop_keep() keep_args = {} - if blob_signing_key: - with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f: - keep_args['--permission-key-file'] = f.name - f.write(blob_signing_key) + if not blob_signing_key: + blob_signing_key = 'zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc' + with open(os.path.join(TEST_TMPDIR, "keep.blob_signing_key"), "w") as f: + keep_args['-blob-signing-key-file'] = f.name + f.write(blob_signing_key) if enforce_permissions: - keep_args['--enforce-permissions'] = 'true' + keep_args['-enforce-permissions'] = 'true' + with open(os.path.join(TEST_TMPDIR, "keep.data-manager-token-file"), "w") as f: + keep_args['-data-manager-token-file'] = f.name + f.write(os.environ['ARVADOS_API_TOKEN']) + keep_args['-never-delete'] = 'false' api = arvados.api( version='v1', diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py index 13fc88def3..ac7dd1b9f6 100644 --- a/sdk/python/tests/test_collections.py +++ b/sdk/python/tests/test_collections.py @@ -1144,7 +1144,7 @@ class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers): c2.save() c1.update() - self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3 7ac66c0f148de9519b8bd264312c4d64\+7\+A[a-f0-9]{40}@[a-f0-9]{8} 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$") + self.assertRegexpMatches(c1.manifest_text(), r"\. e65075d550f9b5bf9992fa1d71a131be\+3\S* 7ac66c0f148de9519b8bd264312c4d64\+7\S* 0:3:count\.txt 3:7:count\.txt~\d\d\d\d\d\d\d\d-\d\d\d\d\d\d~conflict~$") if __name__ == '__main__': diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 5519ad8670..ca03627405 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -24,38 +24,43 @@ var ( maxManifestSize uint64 ) +// Collection representation type Collection struct { - Uuid string - OwnerUuid string + UUID string + OwnerUUID string ReplicationLevel int BlockDigestToSize map[blockdigest.BlockDigest]int TotalSize int } +// ReadCollections holds information about collections from API server type ReadCollections struct { ReadAllCollections bool - UuidToCollection map[string]Collection + UUIDToCollection map[string]Collection OwnerToCollectionSize map[string]int BlockToDesiredReplication map[blockdigest.DigestWithSize]int - CollectionUuidToIndex map[string]int - CollectionIndexToUuid []string + CollectionUUIDToIndex map[string]int + CollectionIndexToUUID []string BlockToCollectionIndices map[blockdigest.DigestWithSize][]int } +// GetCollectionsParams params type GetCollectionsParams struct { Client arvadosclient.ArvadosClient Logger *logger.Logger BatchSize int } +// SdkCollectionInfo holds collection info from api type SdkCollectionInfo struct { - Uuid string `json:"uuid"` - OwnerUuid string `json:"owner_uuid"` + UUID string `json:"uuid"` + OwnerUUID string `json:"owner_uuid"` Redundancy int `json:"redundancy"` ModifiedAt time.Time `json:"modified_at"` ManifestText string `json:"manifest_text"` } +// SdkCollectionList lists collections from api type SdkCollectionList struct { ItemsAvailable int `json:"items_available"` Items []SdkCollectionInfo `json:"items"` @@ -68,7 +73,7 @@ func init() { "File to write the heap profiles to. Leave blank to skip profiling.") } -// Write the heap profile to a file for later review. +// WriteHeapProfile writes the heap profile to a file for later review. // Since a file is expected to only contain a single heap profile this // function overwrites the previously written profile, so it is safe // to call multiple times in a single run. @@ -77,27 +82,28 @@ func init() { func WriteHeapProfile() { if heapProfileFilename != "" { - heap_profile, err := os.Create(heapProfileFilename) + heapProfile, err := os.Create(heapProfileFilename) if err != nil { log.Fatal(err) } - defer heap_profile.Close() + defer heapProfile.Close() - err = pprof.WriteHeapProfile(heap_profile) + err = pprof.WriteHeapProfile(heapProfile) if err != nil { log.Fatal(err) } } } +// GetCollectionsAndSummarize gets collections from api and summarizes func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) { results = GetCollections(params) results.Summarize(params.Logger) log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize) log.Printf("Read and processed %d collections", - len(results.UuidToCollection)) + len(results.UUIDToCollection)) // TODO(misha): Add a "readonly" flag. If we're in readonly mode, // lots of behaviors can become warnings (and obviously we can't @@ -109,6 +115,7 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec return } +// GetCollections gets collections from api func GetCollections(params GetCollectionsParams) (results ReadCollections) { if ¶ms.Client == nil { log.Fatalf("params.Client passed to GetCollections() should " + @@ -157,7 +164,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { // that we don't have to grow the map in most cases. maxExpectedCollections := int( float64(initialNumberOfCollectionsAvailable) * 1.01) - results.UuidToCollection = make(map[string]Collection, maxExpectedCollections) + results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections) if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { @@ -191,11 +198,11 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { ProcessCollections(params.Logger, collections.Items, defaultReplicationLevel, - results.UuidToCollection).Format(time.RFC3339) + results.UUIDToCollection).Format(time.RFC3339) // update counts previousTotalCollections = totalCollections - totalCollections = len(results.UuidToCollection) + totalCollections = len(results.UUIDToCollection) log.Printf("%d collections read, %d new in last batch, "+ "%s latest modified date, %.0f %d %d avg,max,total manifest size", @@ -229,13 +236,14 @@ func StrCopy(s string) string { return string([]byte(s)) } +// ProcessCollections read from api server func ProcessCollections(arvLogger *logger.Logger, receivedCollections []SdkCollectionInfo, defaultReplicationLevel int, - uuidToCollection map[string]Collection) (latestModificationDate time.Time) { + UUIDToCollection map[string]Collection) (latestModificationDate time.Time) { for _, sdkCollection := range receivedCollections { - collection := Collection{Uuid: StrCopy(sdkCollection.Uuid), - OwnerUuid: StrCopy(sdkCollection.OwnerUuid), + collection := Collection{UUID: StrCopy(sdkCollection.UUID), + OwnerUUID: StrCopy(sdkCollection.OwnerUUID), ReplicationLevel: sdkCollection.Redundancy, BlockDigestToSize: make(map[blockdigest.BlockDigest]int)} @@ -260,7 +268,7 @@ func ProcessCollections(arvLogger *logger.Logger, manifest := manifest.Manifest{sdkCollection.ManifestText} manifestSize := uint64(len(sdkCollection.ManifestText)) - if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen { + if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen { totalManifestSize += manifestSize } if manifestSize > maxManifestSize { @@ -269,11 +277,11 @@ func ProcessCollections(arvLogger *logger.Logger, blockChannel := manifest.BlockIterWithDuplicates() for block := range blockChannel { - if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size { + if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size { message := fmt.Sprintf( "Collection %s contains multiple sizes (%d and %d) for block %s", - collection.Uuid, - stored_size, + collection.UUID, + storedSize, block.Size, block.Digest) loggerutil.FatalWithMessage(arvLogger, message) @@ -284,7 +292,7 @@ func ProcessCollections(arvLogger *logger.Logger, for _, size := range collection.BlockDigestToSize { collection.TotalSize += size } - uuidToCollection[collection.Uuid] = collection + UUIDToCollection[collection.UUID] = collection // Clear out all the manifest strings that we don't need anymore. // These hopefully form the bulk of our memory usage. @@ -295,22 +303,23 @@ func ProcessCollections(arvLogger *logger.Logger, return } +// Summarize the collections read func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) { readCollections.OwnerToCollectionSize = make(map[string]int) readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int) - numCollections := len(readCollections.UuidToCollection) - readCollections.CollectionUuidToIndex = make(map[string]int, numCollections) - readCollections.CollectionIndexToUuid = make([]string, 0, numCollections) + numCollections := len(readCollections.UUIDToCollection) + readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections) + readCollections.CollectionIndexToUUID = make([]string, 0, numCollections) readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int) - for _, coll := range readCollections.UuidToCollection { - collectionIndex := len(readCollections.CollectionIndexToUuid) - readCollections.CollectionIndexToUuid = - append(readCollections.CollectionIndexToUuid, coll.Uuid) - readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex + for _, coll := range readCollections.UUIDToCollection { + collectionIndex := len(readCollections.CollectionIndexToUUID) + readCollections.CollectionIndexToUUID = + append(readCollections.CollectionIndexToUUID, coll.UUID) + readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex - readCollections.OwnerToCollectionSize[coll.OwnerUuid] = - readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize + readCollections.OwnerToCollectionSize[coll.OwnerUUID] = + readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize for block, size := range coll.BlockDigestToSize { locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)} diff --git a/services/datamanager/collection/collection_test.go b/services/datamanager/collection/collection_test.go index 1669bb7784..07c82e1abc 100644 --- a/services/datamanager/collection/collection_test.go +++ b/services/datamanager/collection/collection_test.go @@ -16,7 +16,7 @@ type MySuite struct{} var _ = Suite(&MySuite{}) // This captures the result we expect from -// ReadCollections.Summarize(). Because CollectionUuidToIndex is +// ReadCollections.Summarize(). Because CollectionUUIDToIndex is // indeterminate, we replace BlockToCollectionIndices with // BlockToCollectionUuids. type ExpectedSummary struct { @@ -41,7 +41,7 @@ func CompareSummarizedReadCollections(c *C, uuidSet := make(map[string]struct{}) summarizedBlockToCollectionUuids[digest] = uuidSet for _, index := range indices { - uuidSet[summarized.CollectionIndexToUuid[index]] = struct{}{} + uuidSet[summarized.CollectionIndexToUUID[index]] = struct{}{} } } @@ -67,15 +67,15 @@ func (s *MySuite) TestSummarizeSimple(checker *C) { rc.Summarize(nil) - c := rc.UuidToCollection["col0"] + c := rc.UUIDToCollection["col0"] blockDigest1 := blockdigest.MakeTestDigestWithSize(1) blockDigest2 := blockdigest.MakeTestDigestWithSize(2) expected := ExpectedSummary{ - OwnerToCollectionSize: map[string]int{c.OwnerUuid: c.TotalSize}, + OwnerToCollectionSize: map[string]int{c.OwnerUUID: c.TotalSize}, BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{blockDigest1: 5, blockDigest2: 5}, - BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{blockDigest1: []string{c.Uuid}, blockDigest2: []string{c.Uuid}}, + BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{blockDigest1: []string{c.UUID}, blockDigest2: []string{c.UUID}}, } CompareSummarizedReadCollections(checker, rc, expected) @@ -95,8 +95,8 @@ func (s *MySuite) TestSummarizeOverlapping(checker *C) { rc.Summarize(nil) - c0 := rc.UuidToCollection["col0"] - c1 := rc.UuidToCollection["col1"] + c0 := rc.UUIDToCollection["col0"] + c1 := rc.UUIDToCollection["col1"] blockDigest1 := blockdigest.MakeTestDigestWithSize(1) blockDigest2 := blockdigest.MakeTestDigestWithSize(2) @@ -104,8 +104,8 @@ func (s *MySuite) TestSummarizeOverlapping(checker *C) { expected := ExpectedSummary{ OwnerToCollectionSize: map[string]int{ - c0.OwnerUuid: c0.TotalSize, - c1.OwnerUuid: c1.TotalSize, + c0.OwnerUUID: c0.TotalSize, + c1.OwnerUUID: c1.TotalSize, }, BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{ blockDigest1: 5, @@ -113,9 +113,9 @@ func (s *MySuite) TestSummarizeOverlapping(checker *C) { blockDigest3: 8, }, BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{ - blockDigest1: []string{c0.Uuid}, - blockDigest2: []string{c0.Uuid, c1.Uuid}, - blockDigest3: []string{c1.Uuid}, + blockDigest1: []string{c0.UUID}, + blockDigest2: []string{c0.UUID, c1.UUID}, + blockDigest3: []string{c1.UUID}, }, } diff --git a/services/datamanager/collection/testing.go b/services/datamanager/collection/testing.go index f3c1f47664..2238433722 100644 --- a/services/datamanager/collection/testing.go +++ b/services/datamanager/collection/testing.go @@ -7,6 +7,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/blockdigest" ) +// TestCollectionSpec with test blocks and desired replication level type TestCollectionSpec struct { // The desired replication level ReplicationLevel int @@ -15,23 +16,23 @@ type TestCollectionSpec struct { Blocks []int } -// Creates a ReadCollections object for testing based on the give -// specs. Only the ReadAllCollections and UuidToCollection fields are -// populated. To populate other fields call rc.Summarize(). +// MakeTestReadCollections creates a ReadCollections object for testing +// based on the give specs. Only the ReadAllCollections and UUIDToCollection +// fields are populated. To populate other fields call rc.Summarize(). func MakeTestReadCollections(specs []TestCollectionSpec) (rc ReadCollections) { rc = ReadCollections{ ReadAllCollections: true, - UuidToCollection: map[string]Collection{}, + UUIDToCollection: map[string]Collection{}, } for i, spec := range specs { c := Collection{ - Uuid: fmt.Sprintf("col%d", i), - OwnerUuid: fmt.Sprintf("owner%d", i), + UUID: fmt.Sprintf("col%d", i), + OwnerUUID: fmt.Sprintf("owner%d", i), ReplicationLevel: spec.ReplicationLevel, BlockDigestToSize: map[blockdigest.BlockDigest]int{}, } - rc.UuidToCollection[c.Uuid] = c + rc.UUIDToCollection[c.UUID] = c for _, j := range spec.Blocks { c.BlockDigestToSize[blockdigest.MakeTestBlockDigest(j)] = j } @@ -45,16 +46,16 @@ func MakeTestReadCollections(specs []TestCollectionSpec) (rc ReadCollections) { return } -// Returns a slice giving the collection index of each collection that -// was passed in to MakeTestReadCollections. rc.Summarize() must be -// called before this method, since Summarize() assigns an index to -// each collection. +// CollectionIndicesForTesting returns a slice giving the collection +// index of each collection that was passed in to MakeTestReadCollections. +// rc.Summarize() must be called before this method, since Summarize() +// assigns an index to each collection. func (rc ReadCollections) CollectionIndicesForTesting() (indices []int) { // TODO(misha): Assert that rc.Summarize() has been called. - numCollections := len(rc.CollectionIndexToUuid) + numCollections := len(rc.CollectionIndexToUUID) indices = make([]int, numCollections) for i := 0; i < numCollections; i++ { - indices[i] = rc.CollectionUuidToIndex[fmt.Sprintf("col%d", i)] + indices[i] = rc.CollectionUUIDToIndex[fmt.Sprintf("col%d", i)] } return } diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index 70a9ae7859..a9306ce83a 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -3,6 +3,7 @@ package main import ( + "errors" "flag" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" @@ -41,17 +42,17 @@ func init() { func main() { flag.Parse() if minutesBetweenRuns == 0 { - err := singlerun() + err := singlerun(makeArvadosClient()) if err != nil { - log.Fatalf("Got an error: %v", err) + log.Fatalf("singlerun: %v", err) } } else { waitTime := time.Minute * time.Duration(minutesBetweenRuns) for { log.Println("Beginning Run") - err := singlerun() + err := singlerun(makeArvadosClient()) if err != nil { - log.Printf("Got an error: %v", err) + log.Printf("singlerun: %v", err) } log.Printf("Sleeping for %d minutes", minutesBetweenRuns) time.Sleep(waitTime) @@ -59,16 +60,20 @@ func main() { } } -func singlerun() error { +func makeArvadosClient() arvadosclient.ArvadosClient { arv, err := arvadosclient.MakeArvadosClient() if err != nil { - log.Fatalf("Error setting up arvados client %s", err.Error()) + log.Fatalf("Error setting up arvados client: %s", err) } + return arv +} - if is_admin, err := util.UserIsAdmin(arv); err != nil { - log.Fatalf("Error querying current arvados user %s", err.Error()) - } else if !is_admin { - log.Fatalf("Current user is not an admin. Datamanager can only be run by admins.") +func singlerun(arv arvadosclient.ArvadosClient) error { + var err error + if isAdmin, err := util.UserIsAdmin(arv); err != nil { + return errors.New("Error verifying admin token: " + err.Error()) + } else if !isAdmin { + return errors.New("Current user is not an admin. Datamanager requires a privileged token.") } var arvLogger *logger.Logger @@ -153,14 +158,13 @@ func singlerun() error { if trashErr != nil { return err - } else { - keep.SendTrashLists(keep.GetDataManagerToken(arvLogger), kc, trashLists) } + keep.SendTrashLists(kc, trashLists) return nil } -// Returns a data fetcher that fetches data from remote servers. +// BuildDataFetcher returns a data fetcher that fetches data from remote servers. func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { return func(arvLogger *logger.Logger, readCollections *collection.ReadCollections, diff --git a/services/datamanager/datamanager_test.go b/services/datamanager/datamanager_test.go new file mode 100644 index 0000000000..3d9bb3da90 --- /dev/null +++ b/services/datamanager/datamanager_test.go @@ -0,0 +1,513 @@ +package main + +import ( + "encoding/json" + "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "io/ioutil" + "net/http" + "os" + "os/exec" + "regexp" + "strings" + "testing" + "time" +) + +const ( + ActiveUserToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi" + AdminToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h" +) + +var arv arvadosclient.ArvadosClient +var keepClient *keepclient.KeepClient +var keepServers []string + +func SetupDataManagerTest(t *testing.T) { + os.Setenv("ARVADOS_API_HOST_INSECURE", "true") + + // start api and keep servers + arvadostest.ResetEnv() + arvadostest.StartAPI() + arvadostest.StartKeep() + + arv = makeArvadosClient() + + // keep client + keepClient = &keepclient.KeepClient{ + Arvados: &arv, + Want_replicas: 2, + Using_proxy: true, + Client: &http.Client{}, + } + + // discover keep services + if err := keepClient.DiscoverKeepServers(); err != nil { + t.Fatalf("Error discovering keep services: %s", err) + } + keepServers = []string{} + for _, host := range keepClient.LocalRoots() { + keepServers = append(keepServers, host) + } +} + +func TearDownDataManagerTest(t *testing.T) { + arvadostest.StopKeep() + arvadostest.StopAPI() +} + +func putBlock(t *testing.T, data string) string { + locator, _, err := keepClient.PutB([]byte(data)) + if err != nil { + t.Fatalf("Error putting test data for %s %s %v", data, locator, err) + } + if locator == "" { + t.Fatalf("No locator found after putting test data") + } + + splits := strings.Split(locator, "+") + return splits[0] + "+" + splits[1] +} + +func getBlock(t *testing.T, locator string, data string) { + reader, blocklen, _, err := keepClient.Get(locator) + if err != nil { + t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err) + } + if reader == nil { + t.Fatalf("No reader found after putting test data") + } + if blocklen != int64(len(data)) { + t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data)) + } + + all, err := ioutil.ReadAll(reader) + if string(all) != data { + t.Fatalf("Data read %s did not match expected data %s", string(all), data) + } +} + +// Create a collection using arv-put +func createCollection(t *testing.T, data string) string { + tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file") + defer os.Remove(tempfile.Name()) + + _, err = tempfile.Write([]byte(data)) + if err != nil { + t.Fatalf("Error writing to tempfile %v", err) + } + + // arv-put + output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output() + if err != nil { + t.Fatalf("Error running arv-put %s", err) + } + + uuid := string(output[0:27]) // trim terminating char + return uuid +} + +// Get collection locator +var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`) + +func getFirstLocatorFromCollection(t *testing.T, uuid string) string { + manifest := getCollection(t, uuid)["manifest_text"].(string) + + locator := strings.Split(manifest, " ")[1] + match := locatorMatcher.FindStringSubmatch(locator) + if match == nil { + t.Fatalf("No locator found in collection manifest %s", manifest) + } + + return match[1] + "+" + match[2] +} + +func getCollection(t *testing.T, uuid string) Dict { + getback := make(Dict) + err := arv.Get("collections", uuid, nil, &getback) + if err != nil { + t.Fatalf("Error getting collection %s", err) + } + if getback["uuid"] != uuid { + t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"]) + } + + return getback +} + +func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) { + err := arv.Update("collections", uuid, arvadosclient.Dict{ + "collection": arvadosclient.Dict{ + paramName: paramValue, + }, + }, &arvadosclient.Dict{}) + + if err != nil { + t.Fatalf("Error updating collection %s", err) + } +} + +type Dict map[string]interface{} + +func deleteCollection(t *testing.T, uuid string) { + getback := make(Dict) + err := arv.Delete("collections", uuid, nil, &getback) + if err != nil { + t.Fatalf("Error deleting collection %s", err) + } + if getback["uuid"] != uuid { + t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"]) + } +} + +func dataManagerSingleRun(t *testing.T) { + err := singlerun(arv) + if err != nil { + t.Fatalf("Error during singlerun %s", err) + } +} + +func getBlockIndexesForServer(t *testing.T, i int) []string { + var indexes []string + + path := keepServers[i] + "/index" + client := http.Client{} + req, err := http.NewRequest("GET", path, nil) + req.Header.Add("Authorization", "OAuth2 "+AdminToken) + req.Header.Add("Content-Type", "application/octet-stream") + resp, err := client.Do(req) + defer resp.Body.Close() + + if err != nil { + t.Fatalf("Error during %s %s", path, err) + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Error reading response from %s %s", path, err) + } + + lines := strings.Split(string(body), "\n") + for _, line := range lines { + indexes = append(indexes, strings.Split(line, " ")...) + } + + return indexes +} + +func getBlockIndexes(t *testing.T) [][]string { + var indexes [][]string + + for i := 0; i < len(keepServers); i++ { + indexes = append(indexes, getBlockIndexesForServer(t, i)) + } + return indexes +} + +func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) { + blocks := getBlockIndexes(t) + + for _, block := range notExpected { + for _, idx := range blocks { + if valueInArray(block, idx) { + t.Fatalf("Found unexpected block %s", block) + } + } + } + + for _, block := range expected { + nFound := 0 + for _, idx := range blocks { + if valueInArray(block, idx) { + nFound++ + } + } + if nFound < minReplication { + t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication) + } + } +} + +func valueInArray(value string, list []string) bool { + for _, v := range list { + if value == v { + return true + } + } + return false +} + +/* +Test env uses two keep volumes. The volume names can be found by reading the files + ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume + +The keep volumes are of the dir structure: + volumeN/subdir/locator +*/ +func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) { + // First get rid of any size hints in the locators + var trimmedBlockLocators []string + for _, block := range oldUnusedBlockLocators { + trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0]) + } + + // Get the working dir so that we can read keep{n}.volume files + wd, err := os.Getwd() + if err != nil { + t.Fatalf("Error getting working dir %s", err) + } + + // Now cycle through the two keep volumes + oldTime := time.Now().AddDate(0, -2, 0) + for i := 0; i < 2; i++ { + filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i) + volumeDir, err := ioutil.ReadFile(filename) + if err != nil { + t.Fatalf("Error reading keep volume file %s %s", filename, err) + } + + // Read the keep volume dir structure + volumeContents, err := ioutil.ReadDir(string(volumeDir)) + if err != nil { + t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err) + } + + // Read each subdir for each of the keep volume dir + for _, subdir := range volumeContents { + subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name()) + subdirContents, err := ioutil.ReadDir(string(subdirName)) + if err != nil { + t.Fatalf("Error reading keep dir %s %s", string(subdirName), err) + } + + // Now we got to the files. The files are names are the block locators + for _, fileInfo := range subdirContents { + blockName := fileInfo.Name() + myname := fmt.Sprintf("%s/%s", subdirName, blockName) + if valueInArray(blockName, trimmedBlockLocators) { + err = os.Chtimes(myname, oldTime, oldTime) + } + } + } + } +} + +func getStatus(t *testing.T, path string) interface{} { + client := http.Client{} + req, err := http.NewRequest("GET", path, nil) + req.Header.Add("Authorization", "OAuth2 "+AdminToken) + req.Header.Add("Content-Type", "application/octet-stream") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("Error during %s %s", path, err) + } + defer resp.Body.Close() + + var s interface{} + json.NewDecoder(resp.Body).Decode(&s) + + return s +} + +// Wait until PullQueue and TrashQueue are empty on all keepServers. +func waitUntilQueuesFinishWork(t *testing.T) { + for _, ks := range keepServers { + for done := false; !done; { + time.Sleep(100 * time.Millisecond) + s := getStatus(t, ks+"/status.json") + for _, qName := range []string{"PullQueue", "TrashQueue"} { + qStatus := s.(map[string]interface{})[qName].(map[string]interface{}) + if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 { + done = true + } + } + } + } +} + +/* +Create some blocks and backdate some of them. +Also create some collections and delete some of them. +Verify block indexes. +*/ +func TestPutAndGetBlocks(t *testing.T) { + defer TearDownDataManagerTest(t) + SetupDataManagerTest(t) + + // Put some blocks which will be backdated later on + // The first one will also be used in a collection and hence should not be deleted when datamanager runs. + // The rest will be old and unreferenced and hence should be deleted when datamanager runs. + var oldUnusedBlockLocators []string + oldUnusedBlockData := "this block will have older mtime" + for i := 0; i < 5; i++ { + oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i))) + } + for i := 0; i < 5; i++ { + getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i)) + } + + // The rest will be old and unreferenced and hence should be deleted when datamanager runs. + oldUsedBlockData := "this collection block will have older mtime" + oldUsedBlockLocator := putBlock(t, oldUsedBlockData) + getBlock(t, oldUsedBlockLocator, oldUsedBlockData) + + // Put some more blocks which will not be backdated; hence they are still new, but not in any collection. + // Hence, even though unreferenced, these should not be deleted when datamanager runs. + var newBlockLocators []string + newBlockData := "this block is newer" + for i := 0; i < 5; i++ { + newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i))) + } + for i := 0; i < 5; i++ { + getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i)) + } + + // Create a collection that would be deleted later on + toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation") + toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID) + + // Create another collection that has the same data as the one of the old blocks + oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData) + oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID) + if oldUsedBlockCollectionLocator != oldUsedBlockLocator { + t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator) + } + + // Create another collection whose replication level will be changed + replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced") + replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID) + + // Create two collections with same data; one will be deleted later on + dataForTwoCollections := "one of these collections will be deleted" + oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections) + oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID) + secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections) + secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID) + if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator { + t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator) + } + + // Verify blocks before doing any backdating / deleting. + var expected []string + expected = append(expected, oldUnusedBlockLocators...) + expected = append(expected, newBlockLocators...) + expected = append(expected, toBeDeletedCollectionLocator) + expected = append(expected, replicationCollectionLocator) + expected = append(expected, oneOfTwoWithSameDataLocator) + expected = append(expected, secondOfTwoWithSameDataLocator) + + verifyBlocks(t, nil, expected, 2) + + // Run datamanager in singlerun mode + dataManagerSingleRun(t) + waitUntilQueuesFinishWork(t) + + verifyBlocks(t, nil, expected, 2) + + // Backdate the to-be old blocks and delete the collections + backdateBlocks(t, oldUnusedBlockLocators) + deleteCollection(t, toBeDeletedCollectionUUID) + deleteCollection(t, secondOfTwoWithSameDataUUID) + + // Run data manager again + dataManagerSingleRun(t) + waitUntilQueuesFinishWork(t) + + // Get block indexes and verify that all backdated blocks except the first one used in collection are not included. + expected = expected[:0] + expected = append(expected, oldUsedBlockLocator) + expected = append(expected, newBlockLocators...) + expected = append(expected, toBeDeletedCollectionLocator) + expected = append(expected, oneOfTwoWithSameDataLocator) + expected = append(expected, secondOfTwoWithSameDataLocator) + + verifyBlocks(t, oldUnusedBlockLocators, expected, 2) + + // Reduce desired replication on replicationCollectionUUID + // collection, and verify that Data Manager does not reduce + // actual replication any further than that. (It might not + // reduce actual replication at all; that's OK for this test.) + + // Reduce desired replication level. + updateCollection(t, replicationCollectionUUID, "replication_desired", "1") + collection := getCollection(t, replicationCollectionUUID) + if collection["replication_desired"].(interface{}) != float64(1) { + t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"]) + } + + // Verify data is currently overreplicated. + verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2) + + // Run data manager again + dataManagerSingleRun(t) + waitUntilQueuesFinishWork(t) + + // Verify data is not underreplicated. + verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1) + + // Verify *other* collections' data is not underreplicated. + verifyBlocks(t, oldUnusedBlockLocators, expected, 2) +} + +func TestDatamanagerSingleRunRepeatedly(t *testing.T) { + defer TearDownDataManagerTest(t) + SetupDataManagerTest(t) + + for i := 0; i < 10; i++ { + err := singlerun(arv) + if err != nil { + t.Fatalf("Got an error during datamanager singlerun: %v", err) + } + } +} + +func TestGetStatusRepeatedly(t *testing.T) { + defer TearDownDataManagerTest(t) + SetupDataManagerTest(t) + + for i := 0; i < 10; i++ { + for j := 0; j < 2; j++ { + s := getStatus(t, keepServers[j]+"/status.json") + + var pullQueueStatus interface{} + pullQueueStatus = s.(map[string]interface{})["PullQueue"] + var trashQueueStatus interface{} + trashQueueStatus = s.(map[string]interface{})["TrashQueue"] + + if pullQueueStatus.(map[string]interface{})["Queued"] == nil || + pullQueueStatus.(map[string]interface{})["InProgress"] == nil || + trashQueueStatus.(map[string]interface{})["Queued"] == nil || + trashQueueStatus.(map[string]interface{})["InProgress"] == nil { + t.Fatalf("PullQueue and TrashQueue status not found") + } + + time.Sleep(100 * time.Millisecond) + } + } +} + +func TestRunDatamanagerWithBogusServer(t *testing.T) { + defer TearDownDataManagerTest(t) + SetupDataManagerTest(t) + + arv.ApiServer = "bogus-server" + + err := singlerun(arv) + if err == nil { + t.Fatalf("Expected error during singlerun with bogus server") + } +} + +func TestRunDatamanagerAsNonAdminUser(t *testing.T) { + defer TearDownDataManagerTest(t) + SetupDataManagerTest(t) + + arv.ApiToken = ActiveUserToken + + err := singlerun(arv) + if err == nil { + t.Fatalf("Expected error during singlerun as non-admin user") + } +} diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go index 0e3cc1d44e..5b855dc61e 100644 --- a/services/datamanager/keep/keep.go +++ b/services/datamanager/keep/keep.go @@ -6,7 +6,6 @@ import ( "bufio" "encoding/json" "errors" - "flag" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/blockdigest" @@ -19,38 +18,41 @@ import ( "net/http" "strconv" "strings" - "sync" "time" ) +// ServerAddress struct type ServerAddress struct { SSL bool `json:service_ssl_flag` Host string `json:"service_host"` Port int `json:"service_port"` - Uuid string `json:"uuid"` + UUID string `json:"uuid"` } -// Info about a particular block returned by the server +// BlockInfo is info about a particular block returned by the server type BlockInfo struct { Digest blockdigest.DigestWithSize Mtime int64 // TODO(misha): Replace this with a timestamp. } -// Info about a specified block given by a server +// BlockServerInfo is info about a specified block given by a server type BlockServerInfo struct { ServerIndex int Mtime int64 // TODO(misha): Replace this with a timestamp. } +// ServerContents struct type ServerContents struct { BlockDigestToInfo map[blockdigest.DigestWithSize]BlockInfo } +// ServerResponse struct type ServerResponse struct { Address ServerAddress Contents ServerContents } +// ReadServers struct type ReadServers struct { ReadAllServers bool KeepServerIndexToAddress []ServerAddress @@ -60,67 +62,34 @@ type ReadServers struct { BlockReplicationCounts map[int]int } +// GetKeepServersParams struct type GetKeepServersParams struct { Client arvadosclient.ArvadosClient Logger *logger.Logger Limit int } -type KeepServiceList struct { +// ServiceList consists of the addresses of all the available kee servers +type ServiceList struct { ItemsAvailable int `json:"items_available"` KeepServers []ServerAddress `json:"items"` } -var ( - // Don't access the token directly, use getDataManagerToken() to - // make sure it's been read. - dataManagerToken string - dataManagerTokenFile string - dataManagerTokenFileReadOnce sync.Once -) - -func init() { - flag.StringVar(&dataManagerTokenFile, - "data-manager-token-file", - "", - "File with the API token we should use to contact keep servers.") -} - +// String // TODO(misha): Change this to include the UUID as well. func (s ServerAddress) String() string { return s.URL() } +// URL of the keep server func (s ServerAddress) URL() string { if s.SSL { return fmt.Sprintf("https://%s:%d", s.Host, s.Port) - } else { - return fmt.Sprintf("http://%s:%d", s.Host, s.Port) } + return fmt.Sprintf("http://%s:%d", s.Host, s.Port) } -func GetDataManagerToken(arvLogger *logger.Logger) string { - readDataManagerToken := func() { - if dataManagerTokenFile == "" { - flag.Usage() - loggerutil.FatalWithMessage(arvLogger, - "Data Manager Token needed, but data manager token file not specified.") - } else { - rawRead, err := ioutil.ReadFile(dataManagerTokenFile) - if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Unexpected error reading token file %s: %v", - dataManagerTokenFile, - err)) - } - dataManagerToken = strings.TrimSpace(string(rawRead)) - } - } - - dataManagerTokenFileReadOnce.Do(readDataManagerToken) - return dataManagerToken -} - +// GetKeepServersAndSummarize gets keep servers from api func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) { results = GetKeepServers(params) log.Printf("Returned %d keep disks", len(results.ServerToContents)) @@ -132,12 +101,8 @@ func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServer return } +// GetKeepServers from api server func GetKeepServers(params GetKeepServersParams) (results ReadServers) { - if ¶ms.Client == nil { - log.Fatalf("params.Client passed to GetKeepServers() should " + - "contain a valid ArvadosClient, but instead it is nil.") - } - sdkParams := arvadosclient.Dict{ "filters": [][]string{[]string{"service_type", "=", "disk"}}, } @@ -145,7 +110,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) { sdkParams["limit"] = params.Limit } - var sdkResponse KeepServiceList + var sdkResponse ServiceList err := params.Client.List("keep_services", sdkParams, &sdkResponse) if err != nil { @@ -177,9 +142,6 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) { log.Printf("Got Server Addresses: %v", results) - // This is safe for concurrent use - client := http.Client{} - // Send off all the index requests concurrently responseChan := make(chan ServerResponse) for _, keepServer := range sdkResponse.KeepServers { @@ -192,7 +154,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) { go func(keepServer ServerAddress) { responseChan <- GetServerContents(params.Logger, keepServer, - client) + params.Client) }(keepServer) } @@ -218,14 +180,15 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) { return } +// GetServerContents of the keep server func GetServerContents(arvLogger *logger.Logger, keepServer ServerAddress, - client http.Client) (response ServerResponse) { + arv arvadosclient.ArvadosClient) (response ServerResponse) { - GetServerStatus(arvLogger, keepServer, client) + GetServerStatus(arvLogger, keepServer, arv) - req := CreateIndexRequest(arvLogger, keepServer) - resp, err := client.Do(req) + req := CreateIndexRequest(arvLogger, keepServer, arv) + resp, err := arv.Client.Do(req) if err != nil { loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error fetching %s: %v. Response was %+v", @@ -237,9 +200,10 @@ func GetServerContents(arvLogger *logger.Logger, return ReadServerResponse(arvLogger, keepServer, resp) } +// GetServerStatus get keep server status by invoking /status.json func GetServerStatus(arvLogger *logger.Logger, keepServer ServerAddress, - client http.Client) { + arv arvadosclient.ArvadosClient) { url := fmt.Sprintf("http://%s:%d/status.json", keepServer.Host, keepServer.Port) @@ -253,11 +217,11 @@ func GetServerStatus(arvLogger *logger.Logger, serverInfo["host"] = keepServer.Host serverInfo["port"] = keepServer.Port - keepInfo[keepServer.Uuid] = serverInfo + keepInfo[keepServer.UUID] = serverInfo }) } - resp, err := client.Get(url) + resp, err := arv.Client.Get(url) if err != nil { loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error getting keep status from %s: %v", url, err)) @@ -281,15 +245,17 @@ func GetServerStatus(arvLogger *logger.Logger, now := time.Now() arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := logger.GetOrCreateMap(p, "keep_info") - serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) + serverInfo := keepInfo[keepServer.UUID].(map[string]interface{}) serverInfo["status_response_processed_at"] = now serverInfo["status"] = keepStatus }) } } +// CreateIndexRequest to the keep server func CreateIndexRequest(arvLogger *logger.Logger, - keepServer ServerAddress) (req *http.Request) { + keepServer ServerAddress, + arv arvadosclient.ArvadosClient) (req *http.Request) { url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port) log.Println("About to fetch keep server contents from " + url) @@ -297,7 +263,7 @@ func CreateIndexRequest(arvLogger *logger.Logger, now := time.Now() arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := logger.GetOrCreateMap(p, "keep_info") - serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) + serverInfo := keepInfo[keepServer.UUID].(map[string]interface{}) serverInfo["index_request_sent_at"] = now }) } @@ -308,11 +274,11 @@ func CreateIndexRequest(arvLogger *logger.Logger, fmt.Sprintf("Error building http request for %s: %v", url, err)) } - req.Header.Add("Authorization", - fmt.Sprintf("OAuth2 %s", GetDataManagerToken(arvLogger))) + req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken) return } +// ReadServerResponse reads reasponse from keep server func ReadServerResponse(arvLogger *logger.Logger, keepServer ServerAddress, resp *http.Response) (response ServerResponse) { @@ -328,7 +294,7 @@ func ReadServerResponse(arvLogger *logger.Logger, now := time.Now() arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := logger.GetOrCreateMap(p, "keep_info") - serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) + serverInfo := keepInfo[keepServer.UUID].(map[string]interface{}) serverInfo["index_response_received_at"] = now }) } @@ -375,7 +341,7 @@ func ReadServerResponse(arvLogger *logger.Logger, if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok { // This server returned multiple lines containing the same block digest. - numDuplicates += 1 + numDuplicates++ // Keep the block that's newer. if storedBlock.Mtime < blockInfo.Mtime { response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo @@ -396,7 +362,7 @@ func ReadServerResponse(arvLogger *logger.Logger, now := time.Now() arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := logger.GetOrCreateMap(p, "keep_info") - serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) + serverInfo := keepInfo[keepServer.UUID].(map[string]interface{}) serverInfo["processing_finished_at"] = now serverInfo["lines_received"] = numLines @@ -439,11 +405,12 @@ func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err err return } +// Summarize results from keep server func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) { readServers.BlockReplicationCounts = make(map[int]int) for _, infos := range readServers.BlockToServers { replication := len(infos) - readServers.BlockReplicationCounts[replication] += 1 + readServers.BlockReplicationCounts[replication]++ } if arvLogger != nil { @@ -452,24 +419,26 @@ func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) { keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers) }) } - } +// TrashRequest struct type TrashRequest struct { Locator string `json:"locator"` BlockMtime int64 `json:"block_mtime"` } +// TrashList is an array of TrashRequest objects type TrashList []TrashRequest -func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) { +// SendTrashLists to trash queue +func SendTrashLists(kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) { count := 0 barrier := make(chan error) client := kc.Client for url, v := range spl { - count += 1 + count++ log.Printf("Sending trash list to %v", url) go (func(url string, v TrashList) { @@ -487,8 +456,7 @@ func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[ return } - // Add api token header - req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", dataManagerToken)) + req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken) // Make the request var resp *http.Response @@ -512,7 +480,7 @@ func SendTrashLists(dataManagerToken string, kc *keepclient.KeepClient, spl map[ } - for i := 0; i < count; i += 1 { + for i := 0; i < count; i++ { b := <-barrier if b != nil { errs = append(errs, b) diff --git a/services/datamanager/keep/keep_test.go b/services/datamanager/keep/keep_test.go index f39463ed62..2ccf17d45f 100644 --- a/services/datamanager/keep/keep_test.go +++ b/services/datamanager/keep/keep_test.go @@ -22,9 +22,9 @@ type TestHandler struct { request TrashList } -func (this *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) { +func (ts *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) { r := json.NewDecoder(req.Body) - r.Decode(&this.request) + r.Decode(&ts.request) } func (s *KeepSuite) TestSendTrashLists(c *C) { @@ -53,7 +53,7 @@ func (s *KeepSuite) TestSendTrashLists(c *C) { type TestHandlerError struct { } -func (this *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) { +func (tse *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) { http.Error(writer, "I'm a teapot", 418) } diff --git a/services/datamanager/summary/canonical_string.go b/services/datamanager/summary/canonical_string.go index 94f06764a1..152314cf6f 100644 --- a/services/datamanager/summary/canonical_string.go +++ b/services/datamanager/summary/canonical_string.go @@ -1,13 +1,16 @@ /* Ensures that we only have one copy of each unique string. This is /* not designed for concurrent access. */ + package summary // This code should probably be moved somewhere more universal. +// CanonicalString struct type CanonicalString struct { m map[string]string } +// Get a CanonicalString func (cs *CanonicalString) Get(s string) (r string) { if cs.m == nil { cs.m = make(map[string]string) diff --git a/services/datamanager/summary/file.go b/services/datamanager/summary/file.go index 8c37e99ade..18b3aec819 100644 --- a/services/datamanager/summary/file.go +++ b/services/datamanager/summary/file.go @@ -26,6 +26,7 @@ var ( readDataFrom string ) +// DataFetcher to fetch data from keep servers type DataFetcher func(arvLogger *logger.Logger, readCollections *collection.ReadCollections, keepServerInfo *keep.ReadServers) @@ -41,7 +42,7 @@ func init() { "Avoid network i/o and read summary data from this file instead. Used for development only.") } -// Writes data we've read to a file. +// MaybeWriteData writes data we've read to a file. // // This is useful for development, so that we don't need to read all // our data from the network every time we tweak something. @@ -53,33 +54,33 @@ func MaybeWriteData(arvLogger *logger.Logger, keepServerInfo keep.ReadServers) bool { if writeDataTo == "" { return false - } else { - summaryFile, err := os.Create(writeDataTo) - if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Failed to open %s: %v", writeDataTo, err)) - } - defer summaryFile.Close() + } + summaryFile, err := os.Create(writeDataTo) + if err != nil { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Failed to open %s: %v", writeDataTo, err)) + } + defer summaryFile.Close() - enc := gob.NewEncoder(summaryFile) - data := serializedData{ - ReadCollections: readCollections, - KeepServerInfo: keepServerInfo} - err = enc.Encode(data) - if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Failed to write summary data: %v", err)) - } - log.Printf("Wrote summary data to: %s", writeDataTo) - return true + enc := gob.NewEncoder(summaryFile) + data := serializedData{ + ReadCollections: readCollections, + KeepServerInfo: keepServerInfo} + err = enc.Encode(data) + if err != nil { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Failed to write summary data: %v", err)) } + log.Printf("Wrote summary data to: %s", writeDataTo) + return true } +// ShouldReadData should not be used outside of development func ShouldReadData() bool { return readDataFrom != "" } -// Reads data that we've written to a file. +// ReadData reads data that we've written to a file. // // This is useful for development, so that we don't need to read all // our data from the network every time we tweak something. diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go index b326c9521a..cc01249a62 100644 --- a/services/datamanager/summary/pull_list.go +++ b/services/datamanager/summary/pull_list.go @@ -1,4 +1,5 @@ // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List + package summary import ( @@ -14,19 +15,21 @@ import ( "strings" ) +// Locator is a block digest type Locator blockdigest.DigestWithSize +// MarshalJSON encoding func (l Locator) MarshalJSON() ([]byte, error) { return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil } -// One entry in the Pull List +// PullRequest represents one entry in the Pull List type PullRequest struct { Locator Locator `json:"locator"` Servers []string `json:"servers"` } -// The Pull List for a particular server +// PullList for a particular server type PullList []PullRequest // PullListByLocator implements sort.Interface for PullList based on @@ -49,6 +52,7 @@ func (a PullListByLocator) Less(i, j int) bool { return false } +// PullServers struct // For a given under-replicated block, this structure represents which // servers should pull the specified block and which servers they can // pull it from. @@ -57,8 +61,8 @@ type PullServers struct { From []string // Servers that already contain the specified block } -// Creates a map from block locator to PullServers with one entry for -// each under-replicated block. +// ComputePullServers creates a map from block locator to PullServers +// with one entry for each under-replicated block. // // This method ignores zero-replica blocks since there are no servers // to pull them from, so callers should feel free to omit them, but @@ -78,7 +82,7 @@ func ComputePullServers(kc *keepclient.KeepClient, writableServers[cs.Get(url)] = struct{}{} } - for block, _ := range underReplicated { + for block := range underReplicated { serversStoringBlock := keepServerInfo.BlockToServers[block] numCopies := len(serversStoringBlock) numCopiesMissing := blockToDesiredReplication[block] - numCopies @@ -109,9 +113,9 @@ func ComputePullServers(kc *keepclient.KeepClient, return m } -// Creates a pull list in which the To and From fields preserve the -// ordering of sorted servers and the contents are all canonical -// strings. +// CreatePullServers creates a pull list in which the To and From +// fields preserve the ordering of sorted servers and the contents +// are all canonical strings. func CreatePullServers(cs CanonicalString, serverHasBlock map[string]struct{}, writableServers map[string]struct{}, @@ -142,12 +146,12 @@ func CreatePullServers(cs CanonicalString, return } -// Strips the protocol prefix from a url. +// RemoveProtocolPrefix strips the protocol prefix from a url. func RemoveProtocolPrefix(url string) string { return url[(strings.LastIndex(url, "/") + 1):] } -// Produces a PullList for each keep server. +// BuildPullLists produces a PullList for each keep server. func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) { spl = map[string]PullList{} // We don't worry about canonicalizing our strings here, because we @@ -166,7 +170,7 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) { return } -// Writes each pull list to a file. +// WritePullLists writes each pull list to a file. // The filename is based on the hostname. // // This is just a hack for prototyping, it is not expected to be used diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go index edd760b035..9fb0316b73 100644 --- a/services/datamanager/summary/summary.go +++ b/services/datamanager/summary/summary.go @@ -1,4 +1,5 @@ // Summarizes Collection Data and Keep Server Contents. + package summary // TODO(misha): Check size of blocks as well as their digest. @@ -11,31 +12,33 @@ import ( "sort" ) +// BlockSet is a map of blocks type BlockSet map[blockdigest.DigestWithSize]struct{} -// Adds a single block to the set. +// Insert adds a single block to the set. func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) { bs[digest] = struct{}{} } -// Adds a set of blocks to the set. +// Union adds a set of blocks to the set. func (bs BlockSet) Union(obs BlockSet) { for k, v := range obs { bs[k] = v } } -// We use the collection index to save space. To convert to and from +// CollectionIndexSet is used to save space. To convert to and from // the uuid, use collection.ReadCollections' fields -// CollectionIndexToUuid and CollectionUuidToIndex. +// CollectionIndexToUUID and CollectionUUIDToIndex. type CollectionIndexSet map[int]struct{} -// Adds a single collection to the set. The collection is specified by +// Insert adds a single collection to the set. The collection is specified by // its index. func (cis CollectionIndexSet) Insert(collectionIndex int) { cis[collectionIndex] = struct{}{} } +// ToCollectionIndexSet gets block to collection indices func (bs BlockSet) ToCollectionIndexSet( readCollections collection.ReadCollections, collectionIndexSet *CollectionIndexSet) { @@ -46,6 +49,7 @@ func (bs BlockSet) ToCollectionIndexSet( } } +// ReplicationLevels struct // Keeps track of the requested and actual replication levels. // Currently this is only used for blocks but could easily be used for // collections as well. @@ -59,18 +63,20 @@ type ReplicationLevels struct { Actual int } -// Maps from replication levels to their blocks. +// ReplicationLevelBlockSetMap maps from replication levels to their blocks. type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet -// An individual entry from ReplicationLevelBlockSetMap which only reports the number of blocks, not which blocks. +// ReplicationLevelBlockCount is an individual entry from ReplicationLevelBlockSetMap +// which only reports the number of blocks, not which blocks. type ReplicationLevelBlockCount struct { Levels ReplicationLevels Count int } -// An ordered list of ReplicationLevelBlockCount useful for reporting. +// ReplicationLevelBlockSetSlice is an ordered list of ReplicationLevelBlockCount useful for reporting. type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount +// ReplicationSummary sturct type ReplicationSummary struct { CollectionBlocksNotInKeep BlockSet UnderReplicatedBlocks BlockSet @@ -84,7 +90,7 @@ type ReplicationSummary struct { CorrectlyReplicatedCollections CollectionIndexSet } -// This struct counts the elements in each set in ReplicationSummary. +// ReplicationSummaryCounts struct counts the elements in each set in ReplicationSummary. type ReplicationSummaryCounts struct { CollectionBlocksNotInKeep int UnderReplicatedBlocks int @@ -97,8 +103,8 @@ type ReplicationSummaryCounts struct { CorrectlyReplicatedCollections int } -// Gets the BlockSet for a given set of ReplicationLevels, creating it -// if it doesn't already exist. +// GetOrCreate gets the BlockSet for a given set of ReplicationLevels, +// creating it if it doesn't already exist. func (rlbs ReplicationLevelBlockSetMap) GetOrCreate( repLevels ReplicationLevels) (bs BlockSet) { bs, exists := rlbs[repLevels] @@ -109,21 +115,21 @@ func (rlbs ReplicationLevelBlockSetMap) GetOrCreate( return } -// Adds a block to the set for a given replication level. +// Insert adds a block to the set for a given replication level. func (rlbs ReplicationLevelBlockSetMap) Insert( repLevels ReplicationLevels, block blockdigest.DigestWithSize) { rlbs.GetOrCreate(repLevels).Insert(block) } -// Adds a set of blocks to the set for a given replication level. +// Union adds a set of blocks to the set for a given replication level. func (rlbs ReplicationLevelBlockSetMap) Union( repLevels ReplicationLevels, bs BlockSet) { rlbs.GetOrCreate(repLevels).Union(bs) } -// Outputs a sorted list of ReplicationLevelBlockCounts. +// Counts outputs a sorted list of ReplicationLevelBlockCounts. func (rlbs ReplicationLevelBlockSetMap) Counts() ( sorted ReplicationLevelBlockSetSlice) { sorted = make(ReplicationLevelBlockSetSlice, len(rlbs)) @@ -153,6 +159,7 @@ func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) { rlbss[i], rlbss[j] = rlbss[j], rlbss[i] } +// ComputeCounts returns ReplicationSummaryCounts func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) { // TODO(misha): Consider rewriting this method to iterate through // the fields using reflection, instead of explictily listing the @@ -169,6 +176,7 @@ func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) { return rsc } +// PrettyPrint ReplicationSummaryCounts func (rsc ReplicationSummaryCounts) PrettyPrint() string { return fmt.Sprintf("Replication Block Counts:"+ "\n Missing From Keep: %d, "+ @@ -192,12 +200,13 @@ func (rsc ReplicationSummaryCounts) PrettyPrint() string { rsc.CorrectlyReplicatedCollections) } +// BucketReplication returns ReplicationLevelBlockSetMap func BucketReplication(readCollections collection.ReadCollections, - keepServerInfo keep.ReadServers) (rlbsm ReplicationLevelBlockSetMap) { - rlbsm = make(ReplicationLevelBlockSetMap) + keepServerInfo keep.ReadServers) (rlbs ReplicationLevelBlockSetMap) { + rlbs = make(ReplicationLevelBlockSetMap) for block, requestedReplication := range readCollections.BlockToDesiredReplication { - rlbsm.Insert( + rlbs.Insert( ReplicationLevels{ Requested: requestedReplication, Actual: len(keepServerInfo.BlockToServers[block])}, @@ -206,7 +215,7 @@ func BucketReplication(readCollections collection.ReadCollections, for block, servers := range keepServerInfo.BlockToServers { if 0 == readCollections.BlockToDesiredReplication[block] { - rlbsm.Insert( + rlbs.Insert( ReplicationLevels{Requested: 0, Actual: len(servers)}, block) } @@ -214,7 +223,8 @@ func BucketReplication(readCollections collection.ReadCollections, return } -func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets( +// SummarizeBuckets reads collections and summarizes +func (rlbs ReplicationLevelBlockSetMap) SummarizeBuckets( readCollections collection.ReadCollections) ( rs ReplicationSummary) { rs.CollectionBlocksNotInKeep = make(BlockSet) @@ -228,7 +238,7 @@ func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets( rs.OverReplicatedCollections = make(CollectionIndexSet) rs.CorrectlyReplicatedCollections = make(CollectionIndexSet) - for levels, bs := range rlbsm { + for levels, bs := range rlbs { if levels.Actual == 0 { rs.CollectionBlocksNotInKeep.Union(bs) } else if levels.Requested == 0 { @@ -254,7 +264,7 @@ func (rlbsm ReplicationLevelBlockSetMap) SummarizeBuckets( rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections, &rs.OverReplicatedCollections) - for i := range readCollections.CollectionIndexToUuid { + for i := range readCollections.CollectionIndexToUUID { if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep { } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated { } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated { diff --git a/services/datamanager/summary/summary_test.go b/services/datamanager/summary/summary_test.go index ea76df4d34..cc4eb92560 100644 --- a/services/datamanager/summary/summary_test.go +++ b/services/datamanager/summary/summary_test.go @@ -215,6 +215,6 @@ func TestMixedReplication(t *testing.T) { returnedSummary := SummarizeReplication(rc, keepInfo) if !reflect.DeepEqual(returnedSummary, expectedSummary) { - t.Fatalf("Expected returnedSummary to look like: \n%+v but instead it is: \n%+v. Index to UUID is %v. BlockToCollectionIndices is %v.", expectedSummary, returnedSummary, rc.CollectionIndexToUuid, rc.BlockToCollectionIndices) + t.Fatalf("Expected returnedSummary to look like: \n%+v but instead it is: \n%+v. Index to UUID is %v. BlockToCollectionIndices is %v.", expectedSummary, returnedSummary, rc.CollectionIndexToUUID, rc.BlockToCollectionIndices) } } diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go index 0bedc9cc3a..b6ceacecde 100644 --- a/services/datamanager/summary/trash_list.go +++ b/services/datamanager/summary/trash_list.go @@ -1,4 +1,5 @@ // Code for generating trash lists + package summary import ( @@ -9,6 +10,7 @@ import ( "time" ) +// BuildTrashLists builds list of blocks to be sent to trash queue func BuildTrashLists(kc *keepclient.KeepClient, keepServerInfo *keep.ReadServers, keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) { @@ -40,19 +42,19 @@ func buildTrashListsInternal(writableServers map[string]struct{}, m = make(map[string]keep.TrashList) for block := range keepBlocksNotInCollections { - for _, block_on_server := range keepServerInfo.BlockToServers[block] { - if block_on_server.Mtime >= expiry { + for _, blockOnServer := range keepServerInfo.BlockToServers[block] { + if blockOnServer.Mtime >= expiry { continue } // block is older than expire cutoff - srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String() + srv := keepServerInfo.KeepServerIndexToAddress[blockOnServer.ServerIndex].String() if _, writable := writableServers[srv]; !writable { continue } - m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime}) + m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: blockOnServer.Mtime}) } } return diff --git a/services/datamanager/summary/trash_list_test.go b/services/datamanager/summary/trash_list_test.go index 7620631a15..555211fe02 100644 --- a/services/datamanager/summary/trash_list_test.go +++ b/services/datamanager/summary/trash_list_test.go @@ -34,7 +34,7 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) { keep.BlockServerInfo{1, 101}}}} // only block0 is in delete set - var bs BlockSet = make(BlockSet) + var bs = make(BlockSet) bs[block0] = struct{}{} // Test trash list where only sv0 is on writable list. diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index c4a12c5a5c..ec11af5cf5 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -36,6 +36,10 @@ const BlockSize = 64 * 1024 * 1024 // in order to permit writes. const MinFreeKilobytes = BlockSize / 1024 +// Until #6221 is resolved, never_delete must be true. +// However, allow it to be false in testing with TEST_DATA_MANAGER_TOKEN +const TEST_DATA_MANAGER_TOKEN = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h" + // ProcMounts /proc/mounts var ProcMounts = "/proc/mounts" @@ -293,10 +297,6 @@ func main() { flag.Parse() - if neverDelete != true { - log.Fatal("neverDelete must be true, see #6221") - } - if maxBuffers < 0 { log.Fatal("-max-buffers must be greater than zero.") } @@ -347,6 +347,11 @@ func main() { log.Fatalf("reading data manager token: %s\n", err) } } + + if neverDelete != true && dataManagerToken != TEST_DATA_MANAGER_TOKEN { + log.Fatal("never_delete must be true, see #6221") + } + if blobSigningKeyFile != "" { if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil { PermissionSecret = bytes.TrimSpace(buf)