-/* Deals with parsing Collection responses from API Server. */
+// Deals with parsing Collection responses from API Server.
package collection
import (
"flag"
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ "git.curoverse.com/arvados.git/sdk/go/util"
"log"
"os"
- "runtime"
"runtime/pprof"
"time"
)
var (
- heap_profile_filename string
- // globals for debugging
- totalManifestSize uint64
- maxManifestSize uint64
+ HeapProfileFilename string
)
+// Collection representation
type Collection struct {
- Uuid string
- OwnerUuid string
- ReplicationLevel int
+ UUID string
+ OwnerUUID string
+ ReplicationLevel int
BlockDigestToSize map[blockdigest.BlockDigest]int
- TotalSize int
+ TotalSize int
}
+// ReadCollections holds information about collections from API server
type ReadCollections struct {
- ReadAllCollections bool
- UuidToCollection map[string]Collection
+ ReadAllCollections bool
+ UUIDToCollection map[string]Collection
+ OwnerToCollectionSize map[string]int
+ BlockToDesiredReplication map[blockdigest.DigestWithSize]int
+ CollectionUUIDToIndex map[string]int
+ CollectionIndexToUUID []string
+ BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
}
+// GetCollectionsParams params
type GetCollectionsParams struct {
- Client arvadosclient.ArvadosClient
- Logger *logger.Logger
+ Client arvadosclient.ArvadosClient
+ Logger *logger.Logger
BatchSize int
}
+// SdkCollectionInfo holds collection info from api
type SdkCollectionInfo struct {
- Uuid string `json:"uuid"`
- OwnerUuid string `json:"owner_uuid"`
- Redundancy int `json:"redundancy"`
- ModifiedAt time.Time `json:"modified_at"`
- ManifestText string `json:"manifest_text"`
+ UUID string `json:"uuid"`
+ OwnerUUID string `json:"owner_uuid"`
+ Redundancy int `json:"redundancy"`
+ ModifiedAt time.Time `json:"modified_at"`
+ ManifestText string `json:"manifest_text"`
}
+// SdkCollectionList lists collections from api
type SdkCollectionList struct {
- ItemsAvailable int `json:"items_available"`
- Items []SdkCollectionInfo `json:"items"`
+ ItemsAvailable int `json:"items_available"`
+ Items []SdkCollectionInfo `json:"items"`
}
func init() {
- flag.StringVar(&heap_profile_filename,
+ flag.StringVar(&HeapProfileFilename,
"heap-profile",
"",
- "File to write the heap profiles to.")
+ "File to write the heap profiles to. Leave blank to skip profiling.")
}
-// // Methods to implement util.SdkListResponse Interface
-// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
-// return s.ItemsAvailable, nil
-// }
-
-// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
-// return len(s.Items), nil
-// }
-
-// Write the heap profile to a file for later review.
+// WriteHeapProfile writes the heap profile to a file for later review.
// Since a file is expected to only contain a single heap profile this
// function overwrites the previously written profile, so it is safe
// to call multiple times in a single run.
// Otherwise we would see cumulative numbers as explained here:
// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() {
- if heap_profile_filename != "" {
-
- heap_profile, err := os.Create(heap_profile_filename)
+func WriteHeapProfile() error {
+ if HeapProfileFilename != "" {
+ heapProfile, err := os.Create(HeapProfileFilename)
if err != nil {
- log.Fatal(err)
+ return err
}
- defer heap_profile.Close()
+ defer heapProfile.Close()
- err = pprof.WriteHeapProfile(heap_profile)
- if err != nil {
- log.Fatal(err)
- }
+ err = pprof.WriteHeapProfile(heapProfile)
+ return err
}
+
+ return nil
}
+// GetCollectionsAndSummarize gets collections from api and summarizes
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
+ results, err = GetCollections(params)
+ if err != nil {
+ return
+ }
+
+ results.Summarize(params.Logger)
+
+ log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
+ log.Printf("Read and processed %d collections",
+ len(results.UUIDToCollection))
+
+ // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
+ // lots of behaviors can become warnings (and obviously we can't
+ // write anything).
+ // if !readCollections.ReadAllCollections {
+ // log.Fatalf("Did not read all collections")
+ // }
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+ return
+}
+
+// GetCollections gets collections from api
+func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
if ¶ms.Client == nil {
- log.Fatalf("params.Client passed to GetCollections() should " +
+ err = fmt.Errorf("params.Client passed to GetCollections() should " +
"contain a valid ArvadosClient, but instead it is nil.")
+ return
}
fieldsWanted := []string{"manifest_text",
"owner_uuid",
"uuid",
- // TODO(misha): Start using the redundancy field.
"redundancy",
"modified_at"}
sdkParams := arvadosclient.Dict{
- "select": fieldsWanted,
- "order": []string{"modified_at ASC"},
- "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+ "select": fieldsWanted,
+ "order": []string{"modified_at ASC"},
+ "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+ "offset": 0}
if params.BatchSize > 0 {
sdkParams["limit"] = params.BatchSize
}
- initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
+ var defaultReplicationLevel int
+ {
+ var value interface{}
+ value, err = params.Client.Discovery("defaultCollectionReplication")
+ if err != nil {
+ return
+ }
+
+ defaultReplicationLevel = int(value.(float64))
+ if defaultReplicationLevel <= 0 {
+ err = fmt.Errorf("Default collection replication returned by arvados SDK "+
+ "should be a positive integer but instead it was %d.",
+ defaultReplicationLevel)
+ return
+ }
+ }
+
+ initialNumberOfCollectionsAvailable, err :=
+ util.NumberItemsAvailable(params.Client, "collections")
+ if err != nil {
+ return
+ }
// Include a 1% margin for collections added while we're reading so
// that we don't have to grow the map in most cases.
maxExpectedCollections := int(
float64(initialNumberOfCollectionsAvailable) * 1.01)
- results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
+ results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
if params.Logger != nil {
- properties,_ := params.Logger.Edit()
- collectionInfo := make(map[string]interface{})
- collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
- properties["collection_info"] = collectionInfo
- params.Logger.Record()
+ params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+ collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
+ collectionInfo["batch_size"] = params.BatchSize
+ collectionInfo["default_replication_level"] = defaultReplicationLevel
+ })
}
// These values are just for getting the loop to run the first time,
// afterwards they'll be set to real values.
- previousTotalCollections := -1
- totalCollections := 0
- for totalCollections > previousTotalCollections {
+ remainingCollections := 1
+ var totalCollections int
+ var previousTotalCollections int
+ for remainingCollections > 0 {
// We're still finding new collections
// Write the heap profile for examining memory usage
- WriteHeapProfile()
+ err = WriteHeapProfile()
+ if err != nil {
+ return
+ }
// Get next batch of collections.
var collections SdkCollectionList
- err := params.Client.List("collections", sdkParams, &collections)
+ err = params.Client.List("collections", sdkParams, &collections)
if err != nil {
- log.Fatalf("error querying collections: %+v", err)
+ return
+ }
+
+ // Update count of remaining collections
+ remainingCollections = collections.ItemsAvailable - params.BatchSize - sdkParams["offset"].(int)
+ if remainingCollections < 0 {
+ remainingCollections = 0
}
// Process collection and update our date filter.
- sdkParams["filters"].([][]string)[0][2] =
- ProcessCollections(collections.Items, results.UuidToCollection).Format(time.RFC3339)
+ latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
+ collections.Items,
+ defaultReplicationLevel,
+ results.UUIDToCollection)
+ if err != nil {
+ return results, err
+ }
+ if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
+ sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+ sdkParams["offset"] = 0
+ } else {
+ sdkParams["offset"] = sdkParams["offset"].(int) + params.BatchSize
+ }
// update counts
previousTotalCollections = totalCollections
- totalCollections = len(results.UuidToCollection)
+ totalCollections = len(results.UUIDToCollection)
- log.Printf("%d collections read, %d new in last batch, " +
+ log.Printf("%d collections read, %d new in last batch, "+
+ "%d remaining, "+
"%s latest modified date, %.0f %d %d avg,max,total manifest size",
totalCollections,
- totalCollections - previousTotalCollections,
+ totalCollections-previousTotalCollections,
+ remainingCollections,
sdkParams["filters"].([][]string)[0][2],
float32(totalManifestSize)/float32(totalCollections),
maxManifestSize, totalManifestSize)
if params.Logger != nil {
- properties,_ := params.Logger.Edit()
- collectionInfo := properties["collection_info"].(map[string]interface{})
- collectionInfo["collections_read"] = totalCollections
- collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
- collectionInfo["total_manifest_size"] = totalManifestSize
- collectionInfo["max_manifest_size"] = maxManifestSize
- params.Logger.Record()
+ params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+ collectionInfo["collections_read"] = totalCollections
+ collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+ collectionInfo["total_manifest_size"] = totalManifestSize
+ collectionInfo["max_manifest_size"] = maxManifestSize
+ })
}
}
- // Just in case this lowers the numbers reported in the heap profile.
- runtime.GC()
-
// Write the heap profile for examining memory usage
- WriteHeapProfile()
+ err = WriteHeapProfile()
return
}
-
// StrCopy returns a newly allocated string.
// It is useful to copy slices so that the garbage collector can reuse
// the memory of the longer strings they came from.
return string([]byte(s))
}
-
-func ProcessCollections(receivedCollections []SdkCollectionInfo,
- uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
+// ProcessCollections read from api server
+func ProcessCollections(arvLogger *logger.Logger,
+ receivedCollections []SdkCollectionInfo,
+ defaultReplicationLevel int,
+ UUIDToCollection map[string]Collection,
+) (
+ latestModificationDate time.Time,
+ maxManifestSize, totalManifestSize uint64,
+ err error,
+) {
for _, sdkCollection := range receivedCollections {
- collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
- OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
- ReplicationLevel: sdkCollection.Redundancy,
+ collection := Collection{UUID: StrCopy(sdkCollection.UUID),
+ OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
+ ReplicationLevel: sdkCollection.Redundancy,
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
if sdkCollection.ModifiedAt.IsZero() {
- log.Fatalf(
- "Arvados SDK collection returned with unexpected zero modifcation " +
- "date. This probably means that either we failed to parse the " +
- "modification date or the API server has changed how it returns " +
- "modification dates: %v",
+ err = fmt.Errorf(
+ "Arvados SDK collection returned with unexpected zero "+
+ "modification date. This probably means that either we failed to "+
+ "parse the modification date or the API server has changed how "+
+ "it returns modification dates: %+v",
collection)
+ return
}
+
if sdkCollection.ModifiedAt.After(latestModificationDate) {
latestModificationDate = sdkCollection.ModifiedAt
}
- manifest := manifest.Manifest{sdkCollection.ManifestText}
+
+ if collection.ReplicationLevel == 0 {
+ collection.ReplicationLevel = defaultReplicationLevel
+ }
+
+ manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
manifestSize := uint64(len(sdkCollection.ManifestText))
- if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
+ if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
totalManifestSize += manifestSize
}
if manifestSize > maxManifestSize {
maxManifestSize = manifestSize
}
-
+
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
- if stored_size, stored := collection.BlockDigestToSize[block.Digest];
- stored && stored_size != block.Size {
- log.Fatalf(
+ if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
+ log.Printf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
- collection.Uuid,
- stored_size,
+ collection.UUID,
+ storedSize,
block.Size,
block.Digest)
}
collection.BlockDigestToSize[block.Digest] = block.Size
}
+ if manifest.Err != nil {
+ err = manifest.Err
+ return
+ }
+
collection.TotalSize = 0
for _, size := range collection.BlockDigestToSize {
collection.TotalSize += size
}
- uuidToCollection[collection.Uuid] = collection
+ UUIDToCollection[collection.UUID] = collection
// Clear out all the manifest strings that we don't need anymore.
// These hopefully form the bulk of our memory usage.
return
}
+// Summarize the collections read
+func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
+ readCollections.OwnerToCollectionSize = make(map[string]int)
+ readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
+ numCollections := len(readCollections.UUIDToCollection)
+ readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
+ readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
+ readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
+
+ for _, coll := range readCollections.UUIDToCollection {
+ collectionIndex := len(readCollections.CollectionIndexToUUID)
+ readCollections.CollectionIndexToUUID =
+ append(readCollections.CollectionIndexToUUID, coll.UUID)
+ readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
+
+ readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
+ readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
+
+ for block, size := range coll.BlockDigestToSize {
+ locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
+ readCollections.BlockToCollectionIndices[locator] =
+ append(readCollections.BlockToCollectionIndices[locator],
+ collectionIndex)
+ storedReplication := readCollections.BlockToDesiredReplication[locator]
+ if coll.ReplicationLevel > storedReplication {
+ readCollections.BlockToDesiredReplication[locator] =
+ coll.ReplicationLevel
+ }
+ }
+ }
-func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
- var collections SdkCollectionList
- sdkParams := arvadosclient.Dict{"limit": 0}
- err := client.List("collections", sdkParams, &collections)
- if err != nil {
- log.Fatalf("error querying collections for items available: %v", err)
+ if arvLogger != nil {
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+ // Since maps are shallow copied, we run a risk of concurrent
+ // updates here. By copying results.OwnerToCollectionSize into
+ // the log, we're assuming that it won't be updated.
+ collectionInfo["owner_to_collection_size"] =
+ readCollections.OwnerToCollectionSize
+ collectionInfo["distinct_blocks_named"] =
+ len(readCollections.BlockToDesiredReplication)
+ })
}
- return collections.ItemsAvailable
+ return
}