+ // Make one final API request to verify that we have processed all collections available up to the latest modification date
+ sdkParams["filters"].([][]string)[0][1] = "<="
+ sdkParams["limit"] = 0
+ err = params.Client.List("collections", sdkParams, &collections)
+ if err != nil {
+ return
+ }
+ finalNumberOfCollectionsAvailable, err :=
+ util.NumberItemsAvailable(params.Client, "collections")
+ if err != nil {
+ return
+ }
+ if totalCollections < finalNumberOfCollectionsAvailable {
+ err = fmt.Errorf("API server indicates a total of %d collections "+
+ "available up to %v, but we only retrieved %d. "+
+ "Refusing to continue as this could indicate an "+
+ "otherwise undetected failure.",
+ finalNumberOfCollectionsAvailable,
+ sdkParams["filters"].([][]string)[0][2],
+ totalCollections)
+ return
+ }
+
+ // Write the heap profile for examining memory usage
+ err = WriteHeapProfile()
+
+ return
+}
+
+// StrCopy returns a newly allocated string.
+// It is useful to copy slices so that the garbage collector can reuse
+// the memory of the longer strings they came from.
+func StrCopy(s string) string {
+ return string([]byte(s))
+}
+
+// ProcessCollections read from api server
+func ProcessCollections(arvLogger *logger.Logger,
+ receivedCollections []SdkCollectionInfo,
+ defaultReplicationLevel int,
+ UUIDToCollection map[string]Collection,
+) (
+ latestModificationDate time.Time,
+ maxManifestSize, totalManifestSize uint64,
+ err error,
+) {
+ for _, sdkCollection := range receivedCollections {
+ collection := Collection{UUID: StrCopy(sdkCollection.UUID),
+ OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
+ ReplicationLevel: sdkCollection.Redundancy,
+ BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
+
+ if sdkCollection.ModifiedAt.IsZero() {
+ err = fmt.Errorf(
+ "Arvados SDK collection returned with unexpected zero "+
+ "modification date. This probably means that either we failed to "+
+ "parse the modification date or the API server has changed how "+
+ "it returns modification dates: %+v",
+ collection)
+ return
+ }
+
+ if sdkCollection.ModifiedAt.After(latestModificationDate) {
+ latestModificationDate = sdkCollection.ModifiedAt
+ }
+
+ if collection.ReplicationLevel == 0 {
+ collection.ReplicationLevel = defaultReplicationLevel
+ }
+
+ manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
+ manifestSize := uint64(len(sdkCollection.ManifestText))
+
+ if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
+ totalManifestSize += manifestSize
+ }
+ if manifestSize > maxManifestSize {
+ maxManifestSize = manifestSize
+ }
+
+ blockChannel := manifest.BlockIterWithDuplicates()
+ for block := range blockChannel {
+ if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
+ log.Printf(
+ "Collection %s contains multiple sizes (%d and %d) for block %s",
+ collection.UUID,
+ storedSize,
+ block.Size,
+ block.Digest)