"git.curoverse.com/arvados.git/sdk/go/blockdigest"
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ "git.curoverse.com/arvados.git/sdk/go/util"
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
"log"
"os"
)
var (
- heap_profile_filename string
+ heapProfileFilename string
// globals for debugging
totalManifestSize uint64
maxManifestSize uint64
)
+const (
+ DefaultReplicationLevel = 2
+)
+
type Collection struct {
Uuid string
OwnerUuid string
ReadAllCollections bool
UuidToCollection map[string]Collection
OwnerToCollectionSize map[string]int
+ BlockToReplication map[blockdigest.BlockDigest]int
}
type GetCollectionsParams struct {
}
func init() {
- flag.StringVar(&heap_profile_filename,
+ flag.StringVar(&heapProfileFilename,
"heap-profile",
"",
"File to write the heap profiles to. Leave blank to skip profiling.")
}
-// // Methods to implement util.SdkListResponse Interface
-// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
-// return s.ItemsAvailable, nil
-// }
-
-// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
-// return len(s.Items), nil
-// }
-
// Write the heap profile to a file for later review.
// Since a file is expected to only contain a single heap profile this
// function overwrites the previously written profile, so it is safe
// Otherwise we would see cumulative numbers as explained here:
// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
func WriteHeapProfile() {
- if heap_profile_filename != "" {
+ if heapProfileFilename != "" {
- heap_profile, err := os.Create(heap_profile_filename)
+ heap_profile, err := os.Create(heapProfileFilename)
if err != nil {
log.Fatal(err)
}
func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
results = GetCollections(params)
- ComputeSizeOfOwnedCollections(&results)
+ Summarize(&results)
if params.Logger != nil {
- properties, _ := params.Logger.Edit()
- collectionInfo := properties["collection_info"].(map[string]interface{})
- collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
- params.Logger.Record()
+ params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ collectionInfo := p["collection_info"].(map[string]interface{})
+ // Since maps are shallow copied, we run a risk of concurrent
+ // updates here. By copying results.OwnerToCollectionSize into
+ // the log, we're assuming that it won't be updated.
+ collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
+ })
}
log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
sdkParams["limit"] = params.BatchSize
}
- initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
+ initialNumberOfCollectionsAvailable, err :=
+ util.NumberItemsAvailable(params.Client, "collections")
+ if err != nil {
+ loggerutil.FatalWithMessage(params.Logger,
+ fmt.Sprintf("Error querying collection count: %v", err))
+ }
// Include a 1% margin for collections added while we're reading so
// that we don't have to grow the map in most cases.
maxExpectedCollections := int(
results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
if params.Logger != nil {
- properties, _ := params.Logger.Edit()
- collectionInfo := make(map[string]interface{})
- collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
- collectionInfo["batch_size"] = params.BatchSize
- properties["collection_info"] = collectionInfo
- params.Logger.Record()
+ params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ collectionInfo := make(map[string]interface{})
+ collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
+ collectionInfo["batch_size"] = params.BatchSize
+ p["collection_info"] = collectionInfo
+ })
}
// These values are just for getting the loop to run the first time,
maxManifestSize, totalManifestSize)
if params.Logger != nil {
- properties, _ := params.Logger.Edit()
- collectionInfo := properties["collection_info"].(map[string]interface{})
- collectionInfo["collections_read"] = totalCollections
- collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
- collectionInfo["total_manifest_size"] = totalManifestSize
- collectionInfo["max_manifest_size"] = maxManifestSize
- params.Logger.Record()
+ params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ collectionInfo := p["collection_info"].(map[string]interface{})
+ collectionInfo["collections_read"] = totalCollections
+ collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+ collectionInfo["total_manifest_size"] = totalManifestSize
+ collectionInfo["max_manifest_size"] = maxManifestSize
+ })
}
}
"Arvados SDK collection returned with unexpected zero "+
"modifcation date. This probably means that either we failed to "+
"parse the modification date or the API server has changed how "+
- "it returns modification dates: %v",
+ "it returns modification dates: %+v",
collection))
}
if sdkCollection.ModifiedAt.After(latestModificationDate) {
latestModificationDate = sdkCollection.ModifiedAt
}
+
+ if collection.ReplicationLevel == 0 {
+ collection.ReplicationLevel = DefaultReplicationLevel
+ }
+
manifest := manifest.Manifest{sdkCollection.ManifestText}
manifestSize := uint64(len(sdkCollection.ManifestText))
return
}
-func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) int {
- var collections SdkCollectionList
- sdkParams := arvadosclient.Dict{"limit": 0}
- err := client.List("collections", sdkParams, &collections)
- if err != nil {
- log.Fatalf("error querying collections for items available: %v", err)
- }
-
- return collections.ItemsAvailable
-}
-
-func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
+func Summarize(readCollections *ReadCollections) {
readCollections.OwnerToCollectionSize = make(map[string]int)
+ readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
+
for _, coll := range readCollections.UuidToCollection {
readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+
+ for block, _ := range coll.BlockDigestToSize {
+ storedReplication := readCollections.BlockToReplication[block]
+ if coll.ReplicationLevel > storedReplication {
+ readCollections.BlockToReplication[block] = coll.ReplicationLevel
+ }
+ }
}
return