import (
"flag"
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
+ "git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/sdk/go/manifest"
- //"git.curoverse.com/arvados.git/sdk/go/util"
"log"
"os"
+ "runtime"
"runtime/pprof"
+ "time"
)
var (
heap_profile_filename string
- heap_profile *os.File
+ // globals for debugging
+ totalManifestSize uint64
+ maxManifestSize uint64
)
type Collection struct {
type ReadCollections struct {
ReadAllCollections bool
UuidToCollection map[string]Collection
+ OwnerToCollectionSize map[string]int
}
type GetCollectionsParams struct {
Client arvadosclient.ArvadosClient
+ Logger *logger.Logger
BatchSize int
}
type SdkCollectionInfo struct {
- Uuid string `json:"uuid"`
- OwnerUuid string `json:"owner_uuid"`
- Redundancy int `json:"redundancy"`
- ModifiedAt string `json:"modified_at"`
- ManifestText string `json:"manifest_text"`
+ Uuid string `json:"uuid"`
+ OwnerUuid string `json:"owner_uuid"`
+ Redundancy int `json:"redundancy"`
+ ModifiedAt time.Time `json:"modified_at"`
+ ManifestText string `json:"manifest_text"`
}
type SdkCollectionList struct {
flag.StringVar(&heap_profile_filename,
"heap-profile",
"",
- "File to write the heap profiles to.")
+ "File to write the heap profiles to. Leave blank to skip profiling.")
}
// // Methods to implement util.SdkListResponse Interface
// return len(s.Items), nil
// }
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
- if ¶ms.Client == nil {
- log.Fatalf("params.Client passed to GetCollections() should " +
- "contain a valid ArvadosClient, but instead it is nil.")
- }
-
- // TODO(misha): move this code somewhere better and make sure it's
- // only run once
+// Write the heap profile to a file for later review.
+// Since a file is expected to only contain a single heap profile this
+// function overwrites the previously written profile, so it is safe
+// to call multiple times in a single run.
+// Otherwise we would see cumulative numbers as explained here:
+// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
+func WriteHeapProfile() {
if heap_profile_filename != "" {
- var err error
- heap_profile, err = os.Create(heap_profile_filename)
+
+ heap_profile, err := os.Create(heap_profile_filename)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer heap_profile.Close()
+
+ err = pprof.WriteHeapProfile(heap_profile)
if err != nil {
log.Fatal(err)
}
}
+}
+
+
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
+ results = GetCollections(params)
+ ComputeSizeOfOwnedCollections(&results)
+
+ if params.Logger != nil {
+ properties,_ := params.Logger.Edit()
+ collectionInfo := properties["collection_info"].(map[string]interface{})
+ collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
+ params.Logger.Record()
+ }
+
+ log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
+ log.Printf("Read and processed %d collections",
+ len(results.UuidToCollection))
+
+ // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
+ // lots of behaviors can become warnings (and obviously we can't
+ // write anything).
+ // if !readCollections.ReadAllCollections {
+ // log.Fatalf("Did not read all collections")
+ // }
+
+ return
+}
+
+func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+ if ¶ms.Client == nil {
+ log.Fatalf("params.Client passed to GetCollections() should " +
+ "contain a valid ArvadosClient, but instead it is nil.")
+ }
fieldsWanted := []string{"manifest_text",
"owner_uuid",
"select": fieldsWanted,
"order": []string{"modified_at ASC"},
"filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
- // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
- //"filters": [][]string{[]string{"modified_at", ">=", "2014-11-05T20:44:50Z"}}}
if params.BatchSize > 0 {
sdkParams["limit"] = params.BatchSize
}
- // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
- sdkParams["limit"] = 50
-
- // {
- // var numReceived, numAvailable int
- // results.ReadAllCollections, numReceived, numAvailable =
- // util.ContainsAllAvailableItems(collections)
-
- // if (!results.ReadAllCollections) {
- // log.Printf("ERROR: Did not receive all collections.")
- // }
- // log.Printf("Received %d of %d available collections.",
- // numReceived,
- // numAvailable)
- // }
-
initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
// Include a 1% margin for collections added while we're reading so
// that we don't have to grow the map in most cases.
float64(initialNumberOfCollectionsAvailable) * 1.01)
results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
+ if params.Logger != nil {
+ properties,_ := params.Logger.Edit()
+ collectionInfo := make(map[string]interface{})
+ collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
+ collectionInfo["batch_size"] = params.BatchSize
+ properties["collection_info"] = collectionInfo
+ params.Logger.Record()
+ }
+
+ // These values are just for getting the loop to run the first time,
+ // afterwards they'll be set to real values.
previousTotalCollections := -1
- for len(results.UuidToCollection) > previousTotalCollections {
+ totalCollections := 0
+ for totalCollections > previousTotalCollections {
// We're still finding new collections
- log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection))
-
- // update count
- previousTotalCollections = len(results.UuidToCollection)
// Write the heap profile for examining memory usage
- if heap_profile != nil {
- err := pprof.WriteHeapProfile(heap_profile)
- if err != nil {
- log.Fatal(err)
- }
- }
+ WriteHeapProfile()
// Get next batch of collections.
var collections SdkCollectionList
- log.Printf("Running with SDK Params: %v", sdkParams)
err := params.Client.List("collections", sdkParams, &collections)
if err != nil {
- log.Fatalf("error querying collections: %+v", err)
+ fatalWithMessage(params.Logger,
+ fmt.Sprintf("Error querying collections: %v", err))
}
// Process collection and update our date filter.
- sdkParams["filters"].([][]string)[0][2] = ProcessCollections(collections.Items, results.UuidToCollection)
- log.Printf("Latest date seen %s", sdkParams["filters"].([][]string)[0][2])
+ sdkParams["filters"].([][]string)[0][2] =
+ ProcessCollections(params.Logger,
+ collections.Items,
+ results.UuidToCollection).Format(time.RFC3339)
+
+ // update counts
+ previousTotalCollections = totalCollections
+ totalCollections = len(results.UuidToCollection)
+
+ log.Printf("%d collections read, %d new in last batch, " +
+ "%s latest modified date, %.0f %d %d avg,max,total manifest size",
+ totalCollections,
+ totalCollections - previousTotalCollections,
+ sdkParams["filters"].([][]string)[0][2],
+ float32(totalManifestSize)/float32(totalCollections),
+ maxManifestSize, totalManifestSize)
+
+ if params.Logger != nil {
+ properties,_ := params.Logger.Edit()
+ collectionInfo := properties["collection_info"].(map[string]interface{})
+ collectionInfo["collections_read"] = totalCollections
+ collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+ collectionInfo["total_manifest_size"] = totalManifestSize
+ collectionInfo["max_manifest_size"] = maxManifestSize
+ params.Logger.Record()
+ }
}
- log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection))
+
+ // Just in case this lowers the numbers reported in the heap profile.
+ runtime.GC()
// Write the heap profile for examining memory usage
- if heap_profile != nil {
- err := pprof.WriteHeapProfile(heap_profile)
- if err != nil {
- log.Fatal(err)
- }
- }
+ WriteHeapProfile()
return
}
-func ProcessCollections(receivedCollections []SdkCollectionInfo,
- uuidToCollection map[string]Collection) (latestModificationDate string) {
+// StrCopy returns a newly allocated string.
+// It is useful to copy slices so that the garbage collector can reuse
+// the memory of the longer strings they came from.
+func StrCopy(s string) string {
+ return string([]byte(s))
+}
+
+
+func ProcessCollections(arvLogger *logger.Logger,
+ receivedCollections []SdkCollectionInfo,
+ uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
for _, sdkCollection := range receivedCollections {
- collection := Collection{Uuid: sdkCollection.Uuid,
- OwnerUuid: sdkCollection.OwnerUuid,
+ collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
+ OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
ReplicationLevel: sdkCollection.Redundancy,
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
- // log.Printf("Seeing modification date, owner_uuid: %s %s",
- // sdkCollection.ModifiedAt,
- // sdkCollection.OwnerUuid)
- if sdkCollection.ModifiedAt > latestModificationDate {
+
+ if sdkCollection.ModifiedAt.IsZero() {
+ fatalWithMessage(arvLogger,
+ fmt.Sprintf(
+ "Arvados SDK collection returned with unexpected zero " +
+ "modifcation date. This probably means that either we failed to " +
+ "parse the modification date or the API server has changed how " +
+ "it returns modification dates: %v",
+ collection))
+ }
+
+ if sdkCollection.ModifiedAt.After(latestModificationDate) {
latestModificationDate = sdkCollection.ModifiedAt
}
manifest := manifest.Manifest{sdkCollection.ManifestText}
+ manifestSize := uint64(len(sdkCollection.ManifestText))
+
+ if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
+ totalManifestSize += manifestSize
+ }
+ if manifestSize > maxManifestSize {
+ maxManifestSize = manifestSize
+ }
+
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
if stored_size, stored := collection.BlockDigestToSize[block.Digest];
stored && stored_size != block.Size {
- log.Fatalf(
+ message := fmt.Sprintf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
collection.Uuid,
stored_size,
block.Size,
block.Digest)
+ fatalWithMessage(arvLogger, message)
}
collection.BlockDigestToSize[block.Digest] = block.Size
}
return collections.ItemsAvailable
}
+
+
+func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
+ readCollections.OwnerToCollectionSize = make(map[string]int)
+ for _, coll := range readCollections.UuidToCollection {
+ readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
+ readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+ }
+
+ return
+}
+
+
+// Assumes you haven't already called arvLogger.Edit()!
+// If you have called arvLogger.Edit() this method will hang waiting
+// for the lock you're already holding.
+func fatalWithMessage(arvLogger *logger.Logger, message string) {
+ if arvLogger != nil {
+ properties,_ := arvLogger.Edit()
+ properties["FATAL"] = message
+ properties["run_info"].(map[string]interface{})["end_time"] = time.Now()
+ arvLogger.ForceRecord()
+ }
+
+ log.Fatalf(message)
+}