package collection
import (
+ "flag"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ //"git.curoverse.com/arvados.git/sdk/go/util"
"log"
+ "os"
+ "runtime/pprof"
+)
+
+var (
+ heap_profile_filename string
+ heap_profile *os.File
)
type Collection struct {
- BlockDigestToSize map[string]int
- ReplicationLevel int
Uuid string
OwnerUuid string
+ ReplicationLevel int
+ BlockDigestToSize map[string]int
+ TotalSize int
}
type ReadCollections struct {
type GetCollectionsParams struct {
Client arvadosclient.ArvadosClient
- Limit int
- LogEveryNthCollectionProcessed int // 0 means don't report any
+ BatchSize int
}
-// TODO(misha): Move this method somewhere more central
-func SdkListResponseContainsAllAvailableItems(response map[string]interface{}) (containsAll bool, numContained int, numAvailable int) {
- if value, ok := response["items"]; ok {
- items := value.([]interface{})
- {
- var itemsAvailable interface{}
- if itemsAvailable, ok = response["items_available"]; !ok {
- // TODO(misha): Consider returning an error here (and above if
- // we can't find items) so that callers can recover.
- log.Fatalf("API server did not return the number of items available")
- }
- numContained = len(items)
- numAvailable = int(itemsAvailable.(float64))
- // If we never entered this block, allAvailable would be false by
- // default, which is what we want
- containsAll = numContained == numAvailable
- }
- }
- return
+type SdkCollectionInfo struct {
+ Uuid string `json:"uuid"`
+ OwnerUuid string `json:"owner_uuid"`
+ Redundancy int `json:"redundancy"`
+ ModifiedAt string `json:"modified_at"`
+ ManifestText string `json:"manifest_text"`
+}
+
+type SdkCollectionList struct {
+ ItemsAvailable int `json:"items_available"`
+ Items []SdkCollectionInfo `json:"items"`
}
+func init() {
+ flag.StringVar(&heap_profile_filename,
+ "heap-profile",
+ "",
+ "File to write the heap profiles to.")
+}
+
+// // Methods to implement util.SdkListResponse Interface
+// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
+// return s.ItemsAvailable, nil
+// }
+
+// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
+// return len(s.Items), nil
+// }
+
func GetCollections(params GetCollectionsParams) (results ReadCollections) {
if ¶ms.Client == nil {
- log.Fatalf("Received params.Client passed to GetCollections() should " +
+ log.Fatalf("params.Client passed to GetCollections() should " +
"contain a valid ArvadosClient, but instead it is nil.")
}
+ // TODO(misha): move this code somewhere better and make sure it's
+ // only run once
+ if heap_profile_filename != "" {
+ var err error
+ heap_profile, err = os.Create(heap_profile_filename)
+ if err != nil {
+ log.Fatal(err)
+ }
+ }
+
+
+
+
+
fieldsWanted := []string{"manifest_text",
"owner_uuid",
"uuid",
// TODO(misha): Start using the redundancy field.
- "redundancy"}
+ "redundancy",
+ "modified_at"}
- sdkParams := arvadosclient.Dict{"select": fieldsWanted}
- if params.Limit > 0 {
- sdkParams["limit"] = params.Limit
- }
+ sdkParams := arvadosclient.Dict{
+ "select": fieldsWanted,
+ "order": []string{"modified_at ASC"},
+ "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+ // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
+ //"filters": [][]string{[]string{"modified_at", ">=", "2014-11-05T20:44:50Z"}}}
- var collections map[string]interface{}
- err := params.Client.List("collections", sdkParams, &collections)
- if err != nil {
- log.Fatalf("error querying collections: %v", err)
+ if params.BatchSize > 0 {
+ sdkParams["limit"] = params.BatchSize
}
- {
- var numReceived, numAvailable int
- results.ReadAllCollections, numReceived, numAvailable =
- SdkListResponseContainsAllAvailableItems(collections)
+ // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
+ sdkParams["limit"] = 50
+
+ // {
+ // var numReceived, numAvailable int
+ // results.ReadAllCollections, numReceived, numAvailable =
+ // util.ContainsAllAvailableItems(collections)
+
+ // if (!results.ReadAllCollections) {
+ // log.Printf("ERROR: Did not receive all collections.")
+ // }
+ // log.Printf("Received %d of %d available collections.",
+ // numReceived,
+ // numAvailable)
+ // }
+
+ initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
+ // Include a 1% margin for collections added while we're reading so
+ // that we don't have to grow the map in most cases.
+ maxExpectedCollections := int(
+ float64(initialNumberOfCollectionsAvailable) * 1.01)
+ results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
+
+ previousTotalCollections := -1
+ for len(results.UuidToCollection) > previousTotalCollections {
+ // We're still finding new collections
+ log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection))
+
+ // update count
+ previousTotalCollections = len(results.UuidToCollection)
- if (!results.ReadAllCollections) {
- log.Printf("ERROR: Did not receive all collections.")
+ // Write the heap profile for examining memory usage
+ {
+ err := pprof.WriteHeapProfile(heap_profile)
+ if err != nil {
+ log.Fatal(err)
+ }
+ }
+
+ // Get next batch of collections.
+ var collections SdkCollectionList
+ log.Printf("Running with SDK Params: %v", sdkParams)
+ err := params.Client.List("collections", sdkParams, &collections)
+ if err != nil {
+ log.Fatalf("error querying collections: %+v", err)
}
- log.Printf("Received %d of %d available collections.",
- numReceived,
- numAvailable)
+
+ // Process collection and update our date filter.
+ sdkParams["filters"].([][]string)[0][2] = ProcessCollections(collections.Items, results.UuidToCollection)
+ log.Printf("Latest date seen %s", sdkParams["filters"].([][]string)[0][2])
}
+ log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection))
+
+ return
+}
- if value, ok := collections["items"]; ok {
- items := value.([]interface{})
- results.UuidToCollection = make(map[string]Collection)
- for index, item := range items {
- if m := params.LogEveryNthCollectionProcessed; m >0 && (index % m) == 0 {
- log.Printf("Processing collection #%d", index)
- }
- item_map := item.(map[string]interface{})
- collection := Collection{Uuid: item_map["uuid"].(string),
- OwnerUuid: item_map["owner_uuid"].(string),
- BlockDigestToSize: make(map[string]int)}
- manifest := manifest.Manifest{item_map["manifest_text"].(string)}
- blockChannel := manifest.BlockIterWithDuplicates()
- for block := range blockChannel {
- if stored_size, stored := collection.BlockDigestToSize[block.Digest];
- stored && stored_size != block.Size {
- log.Fatalf(
- "Collection %s contains multiple sizes (%d and %d) for block %s",
- collection.Uuid,
- stored_size,
- block.Size,
- block.Digest)
- }
- collection.BlockDigestToSize[block.Digest] = block.Size
+func ProcessCollections(receivedCollections []SdkCollectionInfo,
+ uuidToCollection map[string]Collection) (latestModificationDate string) {
+ for _, sdkCollection := range receivedCollections {
+ collection := Collection{Uuid: sdkCollection.Uuid,
+ OwnerUuid: sdkCollection.OwnerUuid,
+ ReplicationLevel: sdkCollection.Redundancy,
+ BlockDigestToSize: make(map[string]int)}
+ // log.Printf("Seeing modification date, owner_uuid: %s %s",
+ // sdkCollection.ModifiedAt,
+ // sdkCollection.OwnerUuid)
+ if sdkCollection.ModifiedAt > latestModificationDate {
+ latestModificationDate = sdkCollection.ModifiedAt
+ }
+ manifest := manifest.Manifest{sdkCollection.ManifestText}
+ blockChannel := manifest.BlockIterWithDuplicates()
+ for block := range blockChannel {
+ if stored_size, stored := collection.BlockDigestToSize[block.Digest];
+ stored && stored_size != block.Size {
+ log.Fatalf(
+ "Collection %s contains multiple sizes (%d and %d) for block %s",
+ collection.Uuid,
+ stored_size,
+ block.Size,
+ block.Digest)
}
- results.UuidToCollection[collection.Uuid] = collection
+ collection.BlockDigestToSize[block.Digest] = block.Size
+ }
+ collection.TotalSize = 0
+ for _, size := range collection.BlockDigestToSize {
+ collection.TotalSize += size
}
+ uuidToCollection[collection.Uuid] = collection
}
+
return
}
+
+
+func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
+ var collections SdkCollectionList
+ sdkParams := arvadosclient.Dict{"limit": 0}
+ err := client.List("collections", sdkParams, &collections)
+ if err != nil {
+ log.Fatalf("error querying collections for items available: %v", err)
+ }
+
+ return collections.ItemsAvailable
+}