-/* Deals with parsing Collection responses from API Server. */
+// Deals with parsing Collection responses from API Server.
package collection
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
"log"
"os"
- "runtime"
"runtime/pprof"
"time"
)
var (
- heap_profile_filename string
+ heapProfileFilename string
// globals for debugging
totalManifestSize uint64
maxManifestSize uint64
}
type ReadCollections struct {
- ReadAllCollections bool
- UuidToCollection map[string]Collection
- OwnerToCollectionSize map[string]int
+ ReadAllCollections bool
+ UuidToCollection map[string]Collection
+ OwnerToCollectionSize map[string]int
+ BlockToDesiredReplication map[blockdigest.DigestWithSize]int
+ CollectionUuidToIndex map[string]int
+ CollectionIndexToUuid []string
+ BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
}
type GetCollectionsParams struct {
}
func init() {
- flag.StringVar(&heap_profile_filename,
+ flag.StringVar(&heapProfileFilename,
"heap-profile",
"",
"File to write the heap profiles to. Leave blank to skip profiling.")
// Otherwise we would see cumulative numbers as explained here:
// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
func WriteHeapProfile() {
- if heap_profile_filename != "" {
+ if heapProfileFilename != "" {
- heap_profile, err := os.Create(heap_profile_filename)
+ heap_profile, err := os.Create(heapProfileFilename)
if err != nil {
log.Fatal(err)
}
func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
results = GetCollections(params)
- ComputeSizeOfOwnedCollections(&results)
-
- if params.Logger != nil {
- params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- collectionInfo := p["collection_info"].(map[string]interface{})
- // Since maps are shallow copied, we run a risk of concurrent
- // updates here. By copying results.OwnerToCollectionSize into
- // the log, we're assuming that it won't be updated.
- collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
- })
- }
+ results.Summarize(params.Logger)
log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
log.Printf("Read and processed %d collections",
fieldsWanted := []string{"manifest_text",
"owner_uuid",
"uuid",
- // TODO(misha): Start using the redundancy field.
"redundancy",
"modified_at"}
sdkParams["limit"] = params.BatchSize
}
+ var defaultReplicationLevel int
+ {
+ value, err := params.Client.Discovery("defaultCollectionReplication")
+ if err != nil {
+ loggerutil.FatalWithMessage(params.Logger,
+ fmt.Sprintf("Error querying default collection replication: %v", err))
+ }
+
+ defaultReplicationLevel = int(value.(float64))
+ if defaultReplicationLevel <= 0 {
+ loggerutil.FatalWithMessage(params.Logger,
+ fmt.Sprintf("Default collection replication returned by arvados SDK "+
+ "should be a positive integer but instead it was %d.",
+ defaultReplicationLevel))
+ }
+ }
+
initialNumberOfCollectionsAvailable, err :=
util.NumberItemsAvailable(params.Client, "collections")
if err != nil {
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- collectionInfo := make(map[string]interface{})
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
collectionInfo["batch_size"] = params.BatchSize
- p["collection_info"] = collectionInfo
+ collectionInfo["default_replication_level"] = defaultReplicationLevel
})
}
sdkParams["filters"].([][]string)[0][2] =
ProcessCollections(params.Logger,
collections.Items,
+ defaultReplicationLevel,
results.UuidToCollection).Format(time.RFC3339)
// update counts
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- collectionInfo := p["collection_info"].(map[string]interface{})
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
collectionInfo["collections_read"] = totalCollections
collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
collectionInfo["total_manifest_size"] = totalManifestSize
}
}
- // Just in case this lowers the numbers reported in the heap profile.
- runtime.GC()
-
// Write the heap profile for examining memory usage
WriteHeapProfile()
func ProcessCollections(arvLogger *logger.Logger,
receivedCollections []SdkCollectionInfo,
+ defaultReplicationLevel int,
uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
for _, sdkCollection := range receivedCollections {
collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
loggerutil.FatalWithMessage(arvLogger,
fmt.Sprintf(
"Arvados SDK collection returned with unexpected zero "+
- "modifcation date. This probably means that either we failed to "+
+ "modification date. This probably means that either we failed to "+
"parse the modification date or the API server has changed how "+
- "it returns modification dates: %v",
+ "it returns modification dates: %+v",
collection))
}
if sdkCollection.ModifiedAt.After(latestModificationDate) {
latestModificationDate = sdkCollection.ModifiedAt
}
+
+ if collection.ReplicationLevel == 0 {
+ collection.ReplicationLevel = defaultReplicationLevel
+ }
+
manifest := manifest.Manifest{sdkCollection.ManifestText}
manifestSize := uint64(len(sdkCollection.ManifestText))
return
}
-func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
+func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
readCollections.OwnerToCollectionSize = make(map[string]int)
+ readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
+ numCollections := len(readCollections.UuidToCollection)
+ readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
+ readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
+ readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
+
for _, coll := range readCollections.UuidToCollection {
+ collectionIndex := len(readCollections.CollectionIndexToUuid)
+ readCollections.CollectionIndexToUuid =
+ append(readCollections.CollectionIndexToUuid, coll.Uuid)
+ readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
+
readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+
+ for block, size := range coll.BlockDigestToSize {
+ locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
+ readCollections.BlockToCollectionIndices[locator] =
+ append(readCollections.BlockToCollectionIndices[locator],
+ collectionIndex)
+ storedReplication := readCollections.BlockToDesiredReplication[locator]
+ if coll.ReplicationLevel > storedReplication {
+ readCollections.BlockToDesiredReplication[locator] =
+ coll.ReplicationLevel
+ }
+ }
+ }
+
+ if arvLogger != nil {
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+ // Since maps are shallow copied, we run a risk of concurrent
+ // updates here. By copying results.OwnerToCollectionSize into
+ // the log, we're assuming that it won't be updated.
+ collectionInfo["owner_to_collection_size"] =
+ readCollections.OwnerToCollectionSize
+ collectionInfo["distinct_blocks_named"] =
+ len(readCollections.BlockToDesiredReplication)
+ })
}
return