var (
HeapProfileFilename string
- // globals for debugging
- totalManifestSize uint64
- maxManifestSize uint64
)
// Collection representation
CollectionUUIDToIndex map[string]int
CollectionIndexToUUID []string
BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
- Err error
}
// GetCollectionsParams params
}
// GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
- results, err := GetCollections(params)
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
+ results, err = GetCollections(params)
if err != nil {
- results.Err = err
return
}
sdkParams := arvadosclient.Dict{
"select": fieldsWanted,
- "order": []string{"modified_at ASC"},
- "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+ "order": []string{"modified_at ASC", "uuid ASC"},
+ "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+ "offset": 0}
if params.BatchSize > 0 {
sdkParams["limit"] = params.BatchSize
// These values are just for getting the loop to run the first time,
// afterwards they'll be set to real values.
- previousTotalCollections := -1
- totalCollections := 0
- for totalCollections > previousTotalCollections {
+ remainingCollections := 1
+ var totalCollections int
+ var previousTotalCollections int
+ for remainingCollections > 0 {
// We're still finding new collections
// Write the heap profile for examining memory usage
if err != nil {
return
}
+ batchCollections := len(collections.Items)
+
+ // We must always have at least one collection in the batch
+ if batchCollections < 1 {
+ err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
+ return
+ }
+
+ // Update count of remaining collections
+ remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
// Process collection and update our date filter.
- var latestModificationDate time.Time
- latestModificationDate, err = ProcessCollections(params.Logger,
+ latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
collections.Items,
defaultReplicationLevel,
results.UUIDToCollection)
if err != nil {
- return
+ return results, err
+ }
+ if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
+ sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+ sdkParams["offset"] = 0
+ } else {
+ sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
}
- sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
// update counts
previousTotalCollections = totalCollections
totalCollections = len(results.UUIDToCollection)
- log.Printf("%d collections read, %d new in last batch, "+
+ log.Printf("%d collections read, %d (%d new) in last batch, "+
+ "%d remaining, "+
"%s latest modified date, %.0f %d %d avg,max,total manifest size",
totalCollections,
+ batchCollections,
totalCollections-previousTotalCollections,
+ remainingCollections,
sdkParams["filters"].([][]string)[0][2],
float32(totalManifestSize)/float32(totalCollections),
maxManifestSize, totalManifestSize)
func ProcessCollections(arvLogger *logger.Logger,
receivedCollections []SdkCollectionInfo,
defaultReplicationLevel int,
- UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
+ UUIDToCollection map[string]Collection,
+) (
+ latestModificationDate time.Time,
+ maxManifestSize, totalManifestSize uint64,
+ err error,
+) {
for _, sdkCollection := range receivedCollections {
collection := Collection{UUID: StrCopy(sdkCollection.UUID),
OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
if sdkCollection.ModifiedAt.IsZero() {
- return latestModificationDate, fmt.Errorf(
+ err = fmt.Errorf(
"Arvados SDK collection returned with unexpected zero "+
"modification date. This probably means that either we failed to "+
"parse the modification date or the API server has changed how "+
"it returns modification dates: %+v",
collection)
+ return
}
if sdkCollection.ModifiedAt.After(latestModificationDate) {
collection.ReplicationLevel = defaultReplicationLevel
}
- manifest := manifest.Manifest{sdkCollection.ManifestText}
+ manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
manifestSize := uint64(len(sdkCollection.ManifestText))
if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
- if block.Err != nil {
- err = block.Err
- return
- }
- if storedSize, stored := collection.BlockDigestToSize[block.Locator.Digest]; stored && storedSize != block.Locator.Size {
- err = fmt.Errorf(
+ if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
+ log.Printf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
collection.UUID,
storedSize,
- block.Locator.Size,
- block.Locator.Digest)
- return
+ block.Size,
+ block.Digest)
}
- collection.BlockDigestToSize[block.Locator.Digest] = block.Locator.Size
+ collection.BlockDigestToSize[block.Digest] = block.Size
}
+ if manifest.Err != nil {
+ err = manifest.Err
+ return
+ }
+
collection.TotalSize = 0
for _, size := range collection.BlockDigestToSize {
collection.TotalSize += size