Long overdue checkin of data manager. Current code runs, but uses way too much memory...
authormishaz <misha@curoverse.com>
Tue, 23 Dec 2014 19:33:07 +0000 (19:33 +0000)
committerTom Clegg <tom@curoverse.com>
Fri, 13 Feb 2015 21:23:50 +0000 (16:23 -0500)
sdk/go/manifest/manifest_test.go
services/datamanager/collection/collection.go
services/datamanager/datamanager.go

index b108870d612e1c19557e0cb73a967806141d67d8..7a0a641c4fcf3594d85af77e0b218cb3ee9c296a 100644 (file)
@@ -115,7 +115,7 @@ func TestParseManifestLineSimple(t *testing.T) {
 }
 
 func TestParseBlockLocatorSimple(t *testing.T) {
-       b, err := parseBlockLocator("365f83f5f808896ec834c8b595288735+2310+K@qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf")
+       b, err := ParseBlockLocator("365f83f5f808896ec834c8b595288735+2310+K@qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf")
        if err != nil {
                t.Fatalf("Unexpected error parsing block locator: %v", err)
        }
index 07e8b7ed4c20ef81f3b1a3a9ef0b7cf50629144b..159b5684ba992ccae0d3a4f416871f189d98954b 100644 (file)
@@ -3,10 +3,18 @@
 package collection
 
 import (
+       "flag"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "git.curoverse.com/arvados.git/sdk/go/util"
+       //"git.curoverse.com/arvados.git/sdk/go/util"
        "log"
+       "os"
+       "runtime/pprof"
+)
+
+var (
+       heap_profile_filename string
+       heap_profile *os.File
 )
 
 type Collection struct {
@@ -24,14 +32,14 @@ type ReadCollections struct {
 
 type GetCollectionsParams struct {
        Client arvadosclient.ArvadosClient
-       Limit int
-       LogEveryNthCollectionProcessed int  // 0 means don't report any
+       BatchSize int
 }
 
 type SdkCollectionInfo struct {
        Uuid           string   `json:"uuid"`
        OwnerUuid      string   `json:"owner_uuid"`
        Redundancy     int      `json:"redundancy"`
+       ModifiedAt     string   `json:"modified_at"`
        ManifestText   string   `json:"manifest_text"`
 }
 
@@ -40,14 +48,21 @@ type SdkCollectionList struct {
        Items            []SdkCollectionInfo   `json:"items"`
 }
 
-// Methods to implement util.SdkListResponse Interface
-func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
-       return s.ItemsAvailable, nil
+func init() {
+       flag.StringVar(&heap_profile_filename, 
+               "heap-profile",
+               "",
+               "File to write the heap profiles to.")
 }
 
-func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
-       return len(s.Items), nil
-}
+// // Methods to implement util.SdkListResponse Interface
+// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
+//     return s.ItemsAvailable, nil
+// }
+
+// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
+//     return len(s.Items), nil
+// }
 
 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
        if &params.Client == nil {
@@ -55,46 +70,108 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                        "contain a valid ArvadosClient, but instead it is nil.")
        }
 
+       // TODO(misha): move this code somewhere better and make sure it's
+       // only run once
+       if heap_profile_filename != "" {
+               var err error
+               heap_profile, err = os.Create(heap_profile_filename)
+               if err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+
+
+
+
        fieldsWanted := []string{"manifest_text",
                "owner_uuid",
                "uuid",
                // TODO(misha): Start using the redundancy field.
-               "redundancy"}
-
-       sdkParams := arvadosclient.Dict{"select": fieldsWanted}
-       if params.Limit > 0 {
-               sdkParams["limit"] = params.Limit
-       }
-
-       var collections SdkCollectionList
-       err := params.Client.List("collections", sdkParams, &collections)
-       if err != nil {
-               log.Fatalf("error querying collections: %v", err)
+               "redundancy",
+               "modified_at"}
+
+       sdkParams := arvadosclient.Dict{
+               "select": fieldsWanted,
+               "order": []string{"modified_at ASC"},
+               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
+               // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
+               //"filters": [][]string{[]string{"modified_at", ">=", "2014-11-05T20:44:50Z"}}}
+
+       if params.BatchSize > 0 {
+               sdkParams["limit"] = params.BatchSize
        }
 
-       {
-               var numReceived, numAvailable int
-               results.ReadAllCollections, numReceived, numAvailable =
-                       util.ContainsAllAvailableItems(collections)
+       // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
+       sdkParams["limit"] = 50
+
+       // {
+       //      var numReceived, numAvailable int
+       //      results.ReadAllCollections, numReceived, numAvailable =
+       //              util.ContainsAllAvailableItems(collections)
+
+       //      if (!results.ReadAllCollections) {
+       //              log.Printf("ERROR: Did not receive all collections.")
+       //      }
+       //      log.Printf("Received %d of %d available collections.",
+       //              numReceived,
+       //              numAvailable)
+       // }
+
+       initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
+       // Include a 1% margin for collections added while we're reading so
+       // that we don't have to grow the map in most cases.
+       maxExpectedCollections := int(
+               float64(initialNumberOfCollectionsAvailable) * 1.01)
+       results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
+
+       previousTotalCollections := -1
+       for len(results.UuidToCollection) > previousTotalCollections {
+               // We're still finding new collections
+               log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection))
+
+               // update count
+               previousTotalCollections = len(results.UuidToCollection)
+
+               // Write the heap profile for examining memory usage
+               {
+                       err := pprof.WriteHeapProfile(heap_profile)
+                       if err != nil {
+                               log.Fatal(err)
+                       }
+               }
 
-               if (!results.ReadAllCollections) {
-                       log.Printf("ERROR: Did not receive all collections.")
+               // Get next batch of collections.
+               var collections SdkCollectionList
+               log.Printf("Running with SDK Params: %v", sdkParams)
+               err := params.Client.List("collections", sdkParams, &collections)
+               if err != nil {
+                       log.Fatalf("error querying collections: %+v", err)
                }
-               log.Printf("Received %d of %d available collections.",
-                       numReceived,
-                       numAvailable)
+
+               // Process collection and update our date filter.
+               sdkParams["filters"].([][]string)[0][2] = ProcessCollections(collections.Items, results.UuidToCollection)
+               log.Printf("Latest date seen %s", sdkParams["filters"].([][]string)[0][2])
        }
+       log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection))
 
-       results.UuidToCollection = make(map[string]Collection)
-       for i, sdkCollection := range collections.Items {
-               count := i + 1
-               if m := params.LogEveryNthCollectionProcessed; m >0 && (count % m) == 0 {
-                       log.Printf("Processing collection #%d", count)
-               }
+       return
+}
+
+
+func ProcessCollections(receivedCollections []SdkCollectionInfo,
+       uuidToCollection map[string]Collection) (latestModificationDate string) {
+       for _, sdkCollection := range receivedCollections {
                collection := Collection{Uuid: sdkCollection.Uuid,
                        OwnerUuid: sdkCollection.OwnerUuid,
                        ReplicationLevel: sdkCollection.Redundancy,
                        BlockDigestToSize: make(map[string]int)}
+               // log.Printf("Seeing modification date, owner_uuid: %s %s",
+               //      sdkCollection.ModifiedAt,
+               //      sdkCollection.OwnerUuid)
+               if sdkCollection.ModifiedAt > latestModificationDate {
+                       latestModificationDate = sdkCollection.ModifiedAt
+               }
                manifest := manifest.Manifest{sdkCollection.ManifestText}
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
@@ -113,8 +190,20 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                for _, size := range collection.BlockDigestToSize {
                        collection.TotalSize += size
                }
-               results.UuidToCollection[collection.Uuid] = collection
+               uuidToCollection[collection.Uuid] = collection
        }
 
        return
 }
+
+
+func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
+       var collections SdkCollectionList
+       sdkParams := arvadosclient.Dict{"limit": 0}
+       err := client.List("collections", sdkParams, &collections)
+       if err != nil {
+               log.Fatalf("error querying collections for items available: %v", err)
+       }
+
+       return collections.ItemsAvailable
+}
index cd3e010c4639cde5bc7e2f981d7b2dd555e4d8f7..6393787e01748d2cffbc5897fbb67c248dcfb229 100644 (file)
@@ -7,7 +7,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/util"
        "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
+//     "git.curoverse.com/arvados.git/services/datamanager/keep"
        "log"
 )
 
@@ -29,9 +29,9 @@ func main() {
 
        readCollections := collection.GetCollections(
                collection.GetCollectionsParams{
-                       Client: arv, Limit: 50, LogEveryNthCollectionProcessed: 10})
+                       Client: arv, BatchSize: 500})
 
-       log.Printf("Read Collections: %v", readCollections)
+       //log.Printf("Read Collections: %v", readCollections)
 
        UserUsage := ComputeSizeOfOwnedCollections(readCollections)
        log.Printf("Uuid to Size used: %v", UserUsage)
@@ -46,10 +46,10 @@ func main() {
        log.Printf("Read and processed %d collections",
                len(readCollections.UuidToCollection))
 
-       readServers := keep.GetKeepServers(
-               keep.GetKeepServersParams{Client: arv, Limit: 1000})
+       // readServers := keep.GetKeepServers(
+       //      keep.GetKeepServersParams{Client: arv, Limit: 1000})
 
-       log.Printf("Returned %d keep disks", len(readServers.AddressToContents))
+       // log.Printf("Returned %d keep disks", len(readServers.AddressToContents))
 }
 
 func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) (