- results.UuidToCollection = make(map[string]Collection)
- for _, item := range items {
- item_map := item.(map[string]interface{})
- collection := Collection{Uuid: item_map["uuid"].(string),
- OwnerUuid: item_map["owner_uuid"].(string),
- BlockDigestToSize: make(map[string]int)}
- manifest := manifest.Manifest{item_map["manifest_text"].(string)}
- blockChannel := manifest.DuplicateBlockIter()
- for block := range blockChannel {
- if stored_size, stored := collection.BlockDigestToSize[block.Digest];
- stored && stored_size != block.Size {
- log.Fatalf(
- "Collection %s contains multiple sizes (%d and %d) for block %s",
- collection.Uuid,
- stored_size,
- block.Size,
- block.Digest)
- }
- collection.BlockDigestToSize[block.Digest] = block.Size
+ // Process collection and update our date filter.
+ sdkParams["filters"].([][]string)[0][2] =
+ ProcessCollections(params.Logger,
+ collections.Items,
+ results.UuidToCollection).Format(time.RFC3339)
+
+ // update counts
+ previousTotalCollections = totalCollections
+ totalCollections = len(results.UuidToCollection)
+
+ log.Printf("%d collections read, %d new in last batch, " +
+ "%s latest modified date, %.0f %d %d avg,max,total manifest size",
+ totalCollections,
+ totalCollections - previousTotalCollections,
+ sdkParams["filters"].([][]string)[0][2],
+ float32(totalManifestSize)/float32(totalCollections),
+ maxManifestSize, totalManifestSize)
+
+ if params.Logger != nil {
+ properties,_ := params.Logger.Edit()
+ collectionInfo := properties["collection_info"].(map[string]interface{})
+ collectionInfo["collections_read"] = totalCollections
+ collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+ collectionInfo["total_manifest_size"] = totalManifestSize
+ collectionInfo["max_manifest_size"] = maxManifestSize
+ params.Logger.Record()
+ }
+ }
+
+ // Just in case this lowers the numbers reported in the heap profile.
+ runtime.GC()
+
+ // Write the heap profile for examining memory usage
+ WriteHeapProfile()
+
+ return
+}
+
+
+// StrCopy returns a newly allocated string.
+// It is useful to copy slices so that the garbage collector can reuse
+// the memory of the longer strings they came from.
+func StrCopy(s string) string {
+ return string([]byte(s))
+}
+
+
+func ProcessCollections(arvLogger *logger.Logger,
+ receivedCollections []SdkCollectionInfo,
+ uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
+ for _, sdkCollection := range receivedCollections {
+ collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
+ OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
+ ReplicationLevel: sdkCollection.Redundancy,
+ BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
+
+ if sdkCollection.ModifiedAt.IsZero() {
+ fatalWithMessage(arvLogger,
+ fmt.Sprintf(
+ "Arvados SDK collection returned with unexpected zero " +
+ "modifcation date. This probably means that either we failed to " +
+ "parse the modification date or the API server has changed how " +
+ "it returns modification dates: %v",
+ collection))
+ }
+
+ if sdkCollection.ModifiedAt.After(latestModificationDate) {
+ latestModificationDate = sdkCollection.ModifiedAt
+ }
+ manifest := manifest.Manifest{sdkCollection.ManifestText}
+ manifestSize := uint64(len(sdkCollection.ManifestText))
+
+ if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
+ totalManifestSize += manifestSize
+ }
+ if manifestSize > maxManifestSize {
+ maxManifestSize = manifestSize
+ }
+
+ blockChannel := manifest.BlockIterWithDuplicates()
+ for block := range blockChannel {
+ if stored_size, stored := collection.BlockDigestToSize[block.Digest];
+ stored && stored_size != block.Size {
+ message := fmt.Sprintf(
+ "Collection %s contains multiple sizes (%d and %d) for block %s",
+ collection.Uuid,
+ stored_size,
+ block.Size,
+ block.Digest)
+ fatalWithMessage(arvLogger, message)