"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"git.curoverse.com/arvados.git/sdk/go/util"
- "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
"log"
"os"
"runtime/pprof"
)
var (
- heapProfileFilename string
- // globals for debugging
- totalManifestSize uint64
- maxManifestSize uint64
+ HeapProfileFilename string
)
// Collection representation
// SdkCollectionInfo holds collection info from api
type SdkCollectionInfo struct {
- UUID string `json:"uuid"`
- OwnerUUID string `json:"owner_uuid"`
- Redundancy int `json:"redundancy"`
- ModifiedAt time.Time `json:"modified_at"`
- ManifestText string `json:"manifest_text"`
+ UUID string `json:"uuid"`
+ OwnerUUID string `json:"owner_uuid"`
+ ReplicationDesired int `json:"replication_desired"`
+ ModifiedAt time.Time `json:"modified_at"`
+ ManifestText string `json:"manifest_text"`
}
// SdkCollectionList lists collections from api
}
func init() {
- flag.StringVar(&heapProfileFilename,
+ flag.StringVar(&HeapProfileFilename,
"heap-profile",
"",
"File to write the heap profiles to. Leave blank to skip profiling.")
// to call multiple times in a single run.
// Otherwise we would see cumulative numbers as explained here:
// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() {
- if heapProfileFilename != "" {
-
- heapProfile, err := os.Create(heapProfileFilename)
+func WriteHeapProfile() error {
+ if HeapProfileFilename != "" {
+ heapProfile, err := os.Create(HeapProfileFilename)
if err != nil {
- log.Fatal(err)
+ return err
}
defer heapProfile.Close()
err = pprof.WriteHeapProfile(heapProfile)
- if err != nil {
- log.Fatal(err)
- }
+ return err
}
+
+ return nil
}
// GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
- results = GetCollections(params)
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
+ results, err = GetCollections(params)
+ if err != nil {
+ return
+ }
+
results.Summarize(params.Logger)
log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
}
// GetCollections gets collections from api
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
if ¶ms.Client == nil {
- log.Fatalf("params.Client passed to GetCollections() should " +
+ err = fmt.Errorf("params.Client passed to GetCollections() should " +
"contain a valid ArvadosClient, but instead it is nil.")
+ return
}
fieldsWanted := []string{"manifest_text",
"owner_uuid",
"uuid",
- "redundancy",
+ "replication_desired",
"modified_at"}
sdkParams := arvadosclient.Dict{
"select": fieldsWanted,
- "order": []string{"modified_at ASC"},
- "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+ "order": []string{"modified_at ASC", "uuid ASC"},
+ "filters": [][]string{{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+ "offset": 0}
if params.BatchSize > 0 {
sdkParams["limit"] = params.BatchSize
var defaultReplicationLevel int
{
- value, err := params.Client.Discovery("defaultCollectionReplication")
+ var value interface{}
+ value, err = params.Client.Discovery("defaultCollectionReplication")
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error querying default collection replication: %v", err))
+ return
}
defaultReplicationLevel = int(value.(float64))
if defaultReplicationLevel <= 0 {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Default collection replication returned by arvados SDK "+
- "should be a positive integer but instead it was %d.",
- defaultReplicationLevel))
+ err = fmt.Errorf("Default collection replication returned by arvados SDK "+
+ "should be a positive integer but instead it was %d.",
+ defaultReplicationLevel)
+ return
}
}
initialNumberOfCollectionsAvailable, err :=
util.NumberItemsAvailable(params.Client, "collections")
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error querying collection count: %v", err))
+ return
}
// Include a 1% margin for collections added while we're reading so
// that we don't have to grow the map in most cases.
// These values are just for getting the loop to run the first time,
// afterwards they'll be set to real values.
- previousTotalCollections := -1
- totalCollections := 0
- for totalCollections > previousTotalCollections {
+ remainingCollections := 1
+ var totalCollections int
+ var previousTotalCollections int
+ for remainingCollections > 0 {
// We're still finding new collections
// Write the heap profile for examining memory usage
- WriteHeapProfile()
+ err = WriteHeapProfile()
+ if err != nil {
+ return
+ }
// Get next batch of collections.
var collections SdkCollectionList
- err := params.Client.List("collections", sdkParams, &collections)
+ err = params.Client.List("collections", sdkParams, &collections)
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error querying collections: %v", err))
+ return
+ }
+ batchCollections := len(collections.Items)
+
+ // We must always have at least one collection in the batch
+ if batchCollections < 1 {
+ err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
+ return
}
+ // Update count of remaining collections
+ remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
+
// Process collection and update our date filter.
- sdkParams["filters"].([][]string)[0][2] =
- ProcessCollections(params.Logger,
- collections.Items,
- defaultReplicationLevel,
- results.UUIDToCollection).Format(time.RFC3339)
+ latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
+ collections.Items,
+ defaultReplicationLevel,
+ results.UUIDToCollection)
+ if err != nil {
+ return results, err
+ }
+ if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
+ sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+ sdkParams["offset"] = 0
+ } else {
+ sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
+ }
// update counts
previousTotalCollections = totalCollections
totalCollections = len(results.UUIDToCollection)
- log.Printf("%d collections read, %d new in last batch, "+
+ log.Printf("%d collections read, %d (%d new) in last batch, "+
+ "%d remaining, "+
"%s latest modified date, %.0f %d %d avg,max,total manifest size",
totalCollections,
+ batchCollections,
totalCollections-previousTotalCollections,
+ remainingCollections,
sdkParams["filters"].([][]string)[0][2],
float32(totalManifestSize)/float32(totalCollections),
maxManifestSize, totalManifestSize)
}
}
+ // Make one final API request to verify that we have processed all collections available up to the latest modification date
+ var collections SdkCollectionList
+ sdkParams["filters"].([][]string)[0][1] = "<="
+ sdkParams["limit"] = 0
+ err = params.Client.List("collections", sdkParams, &collections)
+ if err != nil {
+ return
+ }
+ finalNumberOfCollectionsAvailable, err :=
+ util.NumberItemsAvailable(params.Client, "collections")
+ if err != nil {
+ return
+ }
+ if totalCollections < finalNumberOfCollectionsAvailable {
+ err = fmt.Errorf("API server indicates a total of %d collections "+
+ "available up to %v, but we only retrieved %d. "+
+ "Refusing to continue as this could indicate an "+
+ "otherwise undetected failure.",
+ finalNumberOfCollectionsAvailable,
+ sdkParams["filters"].([][]string)[0][2],
+ totalCollections)
+ return
+ }
+
// Write the heap profile for examining memory usage
- WriteHeapProfile()
+ err = WriteHeapProfile()
return
}
func ProcessCollections(arvLogger *logger.Logger,
receivedCollections []SdkCollectionInfo,
defaultReplicationLevel int,
- UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
+ UUIDToCollection map[string]Collection,
+) (
+ latestModificationDate time.Time,
+ maxManifestSize, totalManifestSize uint64,
+ err error,
+) {
for _, sdkCollection := range receivedCollections {
collection := Collection{UUID: StrCopy(sdkCollection.UUID),
OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
- ReplicationLevel: sdkCollection.Redundancy,
+ ReplicationLevel: sdkCollection.ReplicationDesired,
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
if sdkCollection.ModifiedAt.IsZero() {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf(
- "Arvados SDK collection returned with unexpected zero "+
- "modification date. This probably means that either we failed to "+
- "parse the modification date or the API server has changed how "+
- "it returns modification dates: %+v",
- collection))
+ err = fmt.Errorf(
+ "Arvados SDK collection returned with unexpected zero "+
+ "modification date. This probably means that either we failed to "+
+ "parse the modification date or the API server has changed how "+
+ "it returns modification dates: %+v",
+ collection)
+ return
}
if sdkCollection.ModifiedAt.After(latestModificationDate) {
collection.ReplicationLevel = defaultReplicationLevel
}
- manifest := manifest.Manifest{sdkCollection.ManifestText}
+ manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
manifestSize := uint64(len(sdkCollection.ManifestText))
if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
- message := fmt.Sprintf(
+ log.Printf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
collection.UUID,
storedSize,
block.Size,
block.Digest)
- loggerutil.FatalWithMessage(arvLogger, message)
}
collection.BlockDigestToSize[block.Digest] = block.Size
}
+ if manifest.Err != nil {
+ err = manifest.Err
+ return
+ }
+
collection.TotalSize = 0
for _, size := range collection.BlockDigestToSize {
collection.TotalSize += size