var (
HeapProfileFilename string
- // globals for debugging
- totalManifestSize uint64
- maxManifestSize uint64
)
// Collection representation
CollectionUUIDToIndex map[string]int
CollectionIndexToUUID []string
BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
- Err error
}
// GetCollectionsParams params
}
// GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
- results, err := GetCollections(params)
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
+ results, err = GetCollections(params)
if err != nil {
- results.Err = err
return
}
sdkParams := arvadosclient.Dict{
"select": fieldsWanted,
"order": []string{"modified_at ASC"},
- "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+ "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+ "offset": 0}
if params.BatchSize > 0 {
sdkParams["limit"] = params.BatchSize
// These values are just for getting the loop to run the first time,
// afterwards they'll be set to real values.
- previousTotalCollections := -1
- totalCollections := 0
- for totalCollections > previousTotalCollections {
+ remainingCollections := 1
+ var totalCollections int
+ var previousTotalCollections int
+ for remainingCollections > 0 {
// We're still finding new collections
// Write the heap profile for examining memory usage
return
}
+ // Update count of remaining collections
+ remainingCollections = collections.ItemsAvailable - params.BatchSize - sdkParams["offset"].(int)
+ if remainingCollections < 0 {
+ remainingCollections = 0
+ }
+
// Process collection and update our date filter.
- var latestModificationDate time.Time
- latestModificationDate, err = ProcessCollections(params.Logger,
+ latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
collections.Items,
defaultReplicationLevel,
results.UUIDToCollection)
if err != nil {
- return
+ return results, err
+ }
+ if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
+ sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+ sdkParams["offset"] = 0
+ } else {
+ sdkParams["offset"] = sdkParams["offset"].(int) + params.BatchSize
}
- sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
// update counts
previousTotalCollections = totalCollections
totalCollections = len(results.UUIDToCollection)
log.Printf("%d collections read, %d new in last batch, "+
+ "%d remaining, "+
"%s latest modified date, %.0f %d %d avg,max,total manifest size",
totalCollections,
totalCollections-previousTotalCollections,
+ remainingCollections,
sdkParams["filters"].([][]string)[0][2],
float32(totalManifestSize)/float32(totalCollections),
maxManifestSize, totalManifestSize)
func ProcessCollections(arvLogger *logger.Logger,
receivedCollections []SdkCollectionInfo,
defaultReplicationLevel int,
- UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
+ UUIDToCollection map[string]Collection,
+) (
+ latestModificationDate time.Time,
+ maxManifestSize, totalManifestSize uint64,
+ err error,
+) {
for _, sdkCollection := range receivedCollections {
collection := Collection{UUID: StrCopy(sdkCollection.UUID),
OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
if sdkCollection.ModifiedAt.IsZero() {
- return latestModificationDate, fmt.Errorf(
+ err = fmt.Errorf(
"Arvados SDK collection returned with unexpected zero "+
"modification date. This probably means that either we failed to "+
"parse the modification date or the API server has changed how "+
"it returns modification dates: %+v",
collection)
+ return
}
if sdkCollection.ModifiedAt.After(latestModificationDate) {
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
- err = fmt.Errorf(
+ log.Printf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
collection.UUID,
storedSize,
block.Size,
block.Digest)
- return
}
collection.BlockDigestToSize[block.Digest] = block.Size
}