X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/53f785c298338645b6880f22f26b0c36a7cfab4d..6a59d473c1574eec4db1f83b5d1a963b4f976e5a:/services/datamanager/datamanager.go?ds=sidebyside diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index d3efe62173..91612668b5 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -4,7 +4,9 @@ package main import ( "flag" + "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" @@ -77,30 +79,19 @@ func singlerun() { } var ( + dataFetcher summary.DataFetcher readCollections collection.ReadCollections keepServerInfo keep.ReadServers ) - if !summary.MaybeReadData(arvLogger, &readCollections, &keepServerInfo) { - collectionChannel := make(chan collection.ReadCollections) - - go func() { - collectionChannel <- collection.GetCollectionsAndSummarize( - collection.GetCollectionsParams{ - Client: arv, - Logger: arvLogger, - BatchSize: 50}) - }() - - keepServerInfo = keep.GetKeepServersAndSummarize( - keep.GetKeepServersParams{ - Client: arv, - Logger: arvLogger, - Limit: 1000}) - - readCollections = <-collectionChannel + if summary.ShouldReadData() { + dataFetcher = summary.ReadData + } else { + dataFetcher = BuildDataFetcher(arv) } + dataFetcher(arvLogger, &readCollections, &keepServerInfo) + summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo) buckets := summary.BucketReplication(readCollections, keepServerInfo) @@ -111,7 +102,7 @@ func singlerun() { log.Printf("Blocks In Collections: %d, "+ "\nBlocks In Keep: %d.", - len(readCollections.BlockToReplication), + len(readCollections.BlockToDesiredReplication), len(keepServerInfo.BlockToServers)) log.Println(replicationCounts.PrettyPrint()) @@ -122,6 +113,21 @@ func singlerun() { rlbss.Count) } + kc, err := keepclient.MakeKeepClient(&arv) + if err != nil { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Error setting up keep client %s", err.Error())) + } + + pullServers := summary.ComputePullServers(kc, + &keepServerInfo, + readCollections.BlockToDesiredReplication, + replicationSummary.UnderReplicatedBlocks) + + pullLists := summary.BuildPullLists(pullServers) + + summary.WritePullLists(arvLogger, pullLists) + // Log that we're finished. We force the recording, since go will // not wait for the write timer before exiting. if arvLogger != nil { @@ -135,3 +141,28 @@ func singlerun() { }) } } + +// Returns a data fetcher that fetches data from remote servers. +func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher { + return func(arvLogger *logger.Logger, + readCollections *collection.ReadCollections, + keepServerInfo *keep.ReadServers) { + collectionChannel := make(chan collection.ReadCollections) + + go func() { + collectionChannel <- collection.GetCollectionsAndSummarize( + collection.GetCollectionsParams{ + Client: arv, + Logger: arvLogger, + BatchSize: 50}) + }() + + *keepServerInfo = keep.GetKeepServersAndSummarize( + keep.GetKeepServersParams{ + Client: arv, + Logger: arvLogger, + Limit: 1000}) + + *readCollections = <-collectionChannel + } +}