From 08fbba821251a58cc99921cb477fbbc076a1bfee Mon Sep 17 00:00:00 2001 From: mishaz Date: Wed, 7 Jan 2015 04:16:40 +0000 Subject: [PATCH] Started focusing on Keep Server responses again. Switched to using blockdigest instead of strings. Added per block info so that we can track block replication across servers. --- services/datamanager/datamanager.go | 42 ++++++++++++++++------------- services/datamanager/keep/keep.go | 42 ++++++++++++++++++++++++----- 2 files changed, 60 insertions(+), 24 deletions(-) diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index 6393787e01..7b79f0f5bc 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -7,7 +7,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" -// "git.curoverse.com/arvados.git/services/datamanager/keep" + "git.curoverse.com/arvados.git/services/datamanager/keep" "log" ) @@ -27,29 +27,35 @@ func main() { // TODO(misha): Read Collections and Keep Contents concurrently as goroutines. - readCollections := collection.GetCollections( - collection.GetCollectionsParams{ - Client: arv, BatchSize: 500}) + // readCollections := collection.GetCollections( + // collection.GetCollectionsParams{ + // Client: arv, BatchSize: 500}) - //log.Printf("Read Collections: %v", readCollections) + // UserUsage := ComputeSizeOfOwnedCollections(readCollections) + // log.Printf("Uuid to Size used: %v", UserUsage) - UserUsage := ComputeSizeOfOwnedCollections(readCollections) - log.Printf("Uuid to Size used: %v", UserUsage) + // // TODO(misha): Add a "readonly" flag. If we're in readonly mode, + // // lots of behaviors can become warnings (and obviously we can't + // // write anything). + // // if !readCollections.ReadAllCollections { + // // log.Fatalf("Did not read all collections") + // // } - // TODO(misha): Add a "readonly" flag. If we're in readonly mode, - // lots of behaviors can become warnings (and obviously we can't - // write anything). - // if !readCollections.ReadAllCollections { - // log.Fatalf("Did not read all collections") - // } + // log.Printf("Read and processed %d collections", + // len(readCollections.UuidToCollection)) - log.Printf("Read and processed %d collections", - len(readCollections.UuidToCollection)) + readServers := keep.GetKeepServers( + keep.GetKeepServersParams{Client: arv, Limit: 1000}) - // readServers := keep.GetKeepServers( - // keep.GetKeepServersParams{Client: arv, Limit: 1000}) + log.Printf("Returned %d keep disks", len(readServers.ServerToContents)) - // log.Printf("Returned %d keep disks", len(readServers.AddressToContents)) + blockReplicationCounts := make(map[int]int) + for _, infos := range readServers.BlockToServers { + replication := len(infos) + blockReplicationCounts[replication] += 1 + } + + log.Printf("Replication level distribution: %v", blockReplicationCounts) } func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) ( diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go index e600c4ac07..91af2014ae 100644 --- a/services/datamanager/keep/keep.go +++ b/services/datamanager/keep/keep.go @@ -8,6 +8,7 @@ import ( "fmt" //"git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/sdk/go/manifest" "git.curoverse.com/arvados.git/sdk/go/util" "log" @@ -23,14 +24,22 @@ type ServerAddress struct { Port int `json:"service_port"` } +// Info about a particular block returned by the server type BlockInfo struct { - Digest string + Digest blockdigest.BlockDigest Size int Mtime int // TODO(misha): Replace this with a timestamp. } +// Info about a specified block given by a server +type BlockServerInfo struct { + ServerIndex int + Size int + Mtime int // TODO(misha): Replace this with a timestamp. +} + type ServerContents struct { - BlockDigestToInfo map[string]BlockInfo + BlockDigestToInfo map[blockdigest.BlockDigest]BlockInfo } type ServerResponse struct { @@ -40,7 +49,10 @@ type ServerResponse struct { type ReadServers struct { ReadAllServers bool - AddressToContents map[ServerAddress]ServerContents + KeepServerIndexToAddress []ServerAddress + KeepServerAddressToIndex map[ServerAddress]int + ServerToContents map[ServerAddress]ServerContents + BlockToServers map[blockdigest.BlockDigest][]BlockServerInfo } type GetKeepServersParams struct { @@ -130,6 +142,14 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) { numAvailable) } + results.KeepServerIndexToAddress = sdkResponse.KeepServers + results.KeepServerAddressToIndex = make(map[ServerAddress]int) + for i, address := range results.KeepServerIndexToAddress { + results.KeepServerAddressToIndex[address] = i + } + + log.Printf("Got Server Addresses: %v", results) + // This is safe for concurrent use client := http.Client{} @@ -139,7 +159,8 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) { go GetServerContents(keepServer, client, responseChan) } - results.AddressToContents = make(map[ServerAddress]ServerContents) + results.ServerToContents = make(map[ServerAddress]ServerContents) + results.BlockToServers = make(map[blockdigest.BlockDigest][]BlockServerInfo) // Read all the responses for i := range sdkResponse.KeepServers { @@ -148,7 +169,15 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) { log.Printf("Received channel response from %v containing %d files", response.Address, len(response.Contents.BlockDigestToInfo)) - results.AddressToContents[response.Address] = response.Contents + results.ServerToContents[response.Address] = response.Contents + serverIndex := results.KeepServerAddressToIndex[response.Address] + for _, blockInfo := range response.Contents.BlockDigestToInfo { + results.BlockToServers[blockInfo.Digest] = append( + results.BlockToServers[blockInfo.Digest], + BlockServerInfo{ServerIndex: serverIndex, + Size: blockInfo.Size, + Mtime: blockInfo.Mtime}) + } } return } @@ -183,7 +212,8 @@ func GetServerContents(keepServer ServerAddress, response := ServerResponse{} response.Address = keepServer - response.Contents.BlockDigestToInfo = make(map[string]BlockInfo) + response.Contents.BlockDigestToInfo = + make(map[blockdigest.BlockDigest]BlockInfo) scanner := bufio.NewScanner(resp.Body) numLines, numDuplicates, numSizeDisagreements := 0, 0, 0 for scanner.Scan() { -- 2.30.2