"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"git.curoverse.com/arvados.git/sdk/go/util"
- "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
"log"
"os"
"runtime/pprof"
var (
HeapProfileFilename string
- // globals for debugging
- totalManifestSize uint64
- maxManifestSize uint64
)
// Collection representation
}
// GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(arvLogger *logger.Logger, params GetCollectionsParams) (results ReadCollections) {
- results, err := GetCollections(params)
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
+ results, err = GetCollections(params)
if err != nil {
- loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetCollections with params %v: %v", params, err))
- results = ReadCollections{}
return
}
if ¶ms.Client == nil {
err = fmt.Errorf("params.Client passed to GetCollections() should " +
"contain a valid ArvadosClient, but instead it is nil.")
+ return
}
fieldsWanted := []string{"manifest_text",
}
// Process collection and update our date filter.
- var latestModificationDate time.Time
- latestModificationDate, err = ProcessCollections(params.Logger,
+ latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
collections.Items,
defaultReplicationLevel,
results.UUIDToCollection)
if err != nil {
- return
+ return results, err
}
sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
}
}
+ // Make one final API request to verify that we have processed all collections available up to the latest modification date
+ sdkParams["filters"].([][]string)[0][1] = "<="
+ sdkParams["limit"] = 0
+ err = params.Client.List("collections", sdkParams, &collections)
+ if err != nil {
+ return
+ }
+ finalNumberOfCollectionsAvailable, err :=
+ util.NumberItemsAvailable(params.Client, "collections")
+ if err != nil {
+ return
+ }
+ if totalCollections < finalNumberOfCollectionsAvailable {
+ err = fmt.Errorf("API server indicates a total of %d collections "+
+ "available up to %v, but we only retrieved %d. "+
+ "Refusing to continue as this could indicate an "+
+ "otherwise undetected failure.",
+ finalNumberOfCollectionsAvailable,
+ sdkParams["filters"].([][]string)[0][2],
+ totalCollections)
+ return
+ }
+
// Write the heap profile for examining memory usage
err = WriteHeapProfile()
func ProcessCollections(arvLogger *logger.Logger,
receivedCollections []SdkCollectionInfo,
defaultReplicationLevel int,
- UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
+ UUIDToCollection map[string]Collection,
+) (
+ latestModificationDate time.Time,
+ maxManifestSize, totalManifestSize uint64,
+ err error,
+) {
for _, sdkCollection := range receivedCollections {
collection := Collection{UUID: StrCopy(sdkCollection.UUID),
OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
if sdkCollection.ModifiedAt.IsZero() {
- return latestModificationDate, fmt.Errorf(
+ err = fmt.Errorf(
"Arvados SDK collection returned with unexpected zero "+
"modification date. This probably means that either we failed to "+
"parse the modification date or the API server has changed how "+
"it returns modification dates: %+v",
collection)
+ return
}
if sdkCollection.ModifiedAt.After(latestModificationDate) {
collection.ReplicationLevel = defaultReplicationLevel
}
- manifest := manifest.Manifest{sdkCollection.ManifestText}
+ manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
manifestSize := uint64(len(sdkCollection.ManifestText))
if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
- err = fmt.Errorf(
+ log.Printf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
collection.UUID,
storedSize,
block.Size,
block.Digest)
- return
}
collection.BlockDigestToSize[block.Digest] = block.Size
}
+ if manifest.Err != nil {
+ err = manifest.Err
+ return
+ }
+
collection.TotalSize = 0
for _, size := range collection.BlockDigestToSize {
collection.TotalSize += size