+ // Get next batch of collections.
+ var collections SdkCollectionList
+ err := params.Client.List("collections", sdkParams, &collections)
+ if err != nil {
+ loggerutil.FatalWithMessage(params.Logger,
+ fmt.Sprintf("Error querying collections: %v", err))
+ }
+
+ // Process collection and update our date filter.
+ sdkParams["filters"].([][]string)[0][2] =
+ ProcessCollections(params.Logger,
+ collections.Items,
+ results.UuidToCollection).Format(time.RFC3339)
+
+ // update counts
+ previousTotalCollections = totalCollections
+ totalCollections = len(results.UuidToCollection)
+
+ log.Printf("%d collections read, %d new in last batch, "+
+ "%s latest modified date, %.0f %d %d avg,max,total manifest size",
+ totalCollections,
+ totalCollections-previousTotalCollections,
+ sdkParams["filters"].([][]string)[0][2],
+ float32(totalManifestSize)/float32(totalCollections),
+ maxManifestSize, totalManifestSize)
+
+ if params.Logger != nil {
+ params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+ collectionInfo["collections_read"] = totalCollections
+ collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+ collectionInfo["total_manifest_size"] = totalManifestSize
+ collectionInfo["max_manifest_size"] = maxManifestSize
+ })
+ }
+ }
+
+ // Write the heap profile for examining memory usage
+ WriteHeapProfile()
+
+ return
+}
+
+// StrCopy returns a newly allocated string.
+// It is useful to copy slices so that the garbage collector can reuse
+// the memory of the longer strings they came from.
+func StrCopy(s string) string {
+ return string([]byte(s))
+}
+
+func ProcessCollections(arvLogger *logger.Logger,
+ receivedCollections []SdkCollectionInfo,
+ uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
+ for _, sdkCollection := range receivedCollections {
+ collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
+ OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
+ ReplicationLevel: sdkCollection.Redundancy,
+ BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
+
+ if sdkCollection.ModifiedAt.IsZero() {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf(
+ "Arvados SDK collection returned with unexpected zero "+
+ "modification date. This probably means that either we failed to "+
+ "parse the modification date or the API server has changed how "+
+ "it returns modification dates: %+v",
+ collection))
+ }
+
+ if sdkCollection.ModifiedAt.After(latestModificationDate) {
+ latestModificationDate = sdkCollection.ModifiedAt
+ }
+
+ if collection.ReplicationLevel == 0 {
+ collection.ReplicationLevel = DefaultReplicationLevel
+ }
+
+ manifest := manifest.Manifest{sdkCollection.ManifestText}
+ manifestSize := uint64(len(sdkCollection.ManifestText))
+
+ if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
+ totalManifestSize += manifestSize
+ }
+ if manifestSize > maxManifestSize {
+ maxManifestSize = manifestSize
+ }
+
+ blockChannel := manifest.BlockIterWithDuplicates()
+ for block := range blockChannel {
+ if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
+ message := fmt.Sprintf(
+ "Collection %s contains multiple sizes (%d and %d) for block %s",
+ collection.Uuid,
+ stored_size,
+ block.Size,
+ block.Digest)
+ loggerutil.FatalWithMessage(arvLogger, message)