From: mishaz Date: Tue, 23 Dec 2014 19:33:07 +0000 (+0000) Subject: Long overdue checkin of data manager. Current code runs, but uses way too much memory... X-Git-Tag: 1.1.0~1505^2~73 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/fb62ab318be2202b9d403e65d6dc86a9d7e72a3a Long overdue checkin of data manager. Current code runs, but uses way too much memory and eventually crashes. This checkin includes heap profiling to track down memory usage. --- diff --git a/sdk/go/manifest/manifest_test.go b/sdk/go/manifest/manifest_test.go index b108870d61..7a0a641c4f 100644 --- a/sdk/go/manifest/manifest_test.go +++ b/sdk/go/manifest/manifest_test.go @@ -115,7 +115,7 @@ func TestParseManifestLineSimple(t *testing.T) { } func TestParseBlockLocatorSimple(t *testing.T) { - b, err := parseBlockLocator("365f83f5f808896ec834c8b595288735+2310+K@qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf") + b, err := ParseBlockLocator("365f83f5f808896ec834c8b595288735+2310+K@qr1hi+Af0c9a66381f3b028677411926f0be1c6282fe67c@542b5ddf") if err != nil { t.Fatalf("Unexpected error parsing block locator: %v", err) } diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go index 07e8b7ed4c..159b5684ba 100644 --- a/services/datamanager/collection/collection.go +++ b/services/datamanager/collection/collection.go @@ -3,10 +3,18 @@ package collection import ( + "flag" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/manifest" - "git.curoverse.com/arvados.git/sdk/go/util" + //"git.curoverse.com/arvados.git/sdk/go/util" "log" + "os" + "runtime/pprof" +) + +var ( + heap_profile_filename string + heap_profile *os.File ) type Collection struct { @@ -24,14 +32,14 @@ type ReadCollections struct { type GetCollectionsParams struct { Client arvadosclient.ArvadosClient - Limit int - LogEveryNthCollectionProcessed int // 0 means don't report any + BatchSize int } type SdkCollectionInfo struct { Uuid string `json:"uuid"` OwnerUuid string `json:"owner_uuid"` Redundancy int `json:"redundancy"` + ModifiedAt string `json:"modified_at"` ManifestText string `json:"manifest_text"` } @@ -40,14 +48,21 @@ type SdkCollectionList struct { Items []SdkCollectionInfo `json:"items"` } -// Methods to implement util.SdkListResponse Interface -func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) { - return s.ItemsAvailable, nil +func init() { + flag.StringVar(&heap_profile_filename, + "heap-profile", + "", + "File to write the heap profiles to.") } -func (s SdkCollectionList) NumItemsContained() (numContained int, err error) { - return len(s.Items), nil -} +// // Methods to implement util.SdkListResponse Interface +// func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) { +// return s.ItemsAvailable, nil +// } + +// func (s SdkCollectionList) NumItemsContained() (numContained int, err error) { +// return len(s.Items), nil +// } func GetCollections(params GetCollectionsParams) (results ReadCollections) { if ¶ms.Client == nil { @@ -55,46 +70,108 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { "contain a valid ArvadosClient, but instead it is nil.") } + // TODO(misha): move this code somewhere better and make sure it's + // only run once + if heap_profile_filename != "" { + var err error + heap_profile, err = os.Create(heap_profile_filename) + if err != nil { + log.Fatal(err) + } + } + + + + + fieldsWanted := []string{"manifest_text", "owner_uuid", "uuid", // TODO(misha): Start using the redundancy field. - "redundancy"} - - sdkParams := arvadosclient.Dict{"select": fieldsWanted} - if params.Limit > 0 { - sdkParams["limit"] = params.Limit - } - - var collections SdkCollectionList - err := params.Client.List("collections", sdkParams, &collections) - if err != nil { - log.Fatalf("error querying collections: %v", err) + "redundancy", + "modified_at"} + + sdkParams := arvadosclient.Dict{ + "select": fieldsWanted, + "order": []string{"modified_at ASC"}, + "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}} + // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG! + //"filters": [][]string{[]string{"modified_at", ">=", "2014-11-05T20:44:50Z"}}} + + if params.BatchSize > 0 { + sdkParams["limit"] = params.BatchSize } - { - var numReceived, numAvailable int - results.ReadAllCollections, numReceived, numAvailable = - util.ContainsAllAvailableItems(collections) + // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG! + sdkParams["limit"] = 50 + + // { + // var numReceived, numAvailable int + // results.ReadAllCollections, numReceived, numAvailable = + // util.ContainsAllAvailableItems(collections) + + // if (!results.ReadAllCollections) { + // log.Printf("ERROR: Did not receive all collections.") + // } + // log.Printf("Received %d of %d available collections.", + // numReceived, + // numAvailable) + // } + + initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client) + // Include a 1% margin for collections added while we're reading so + // that we don't have to grow the map in most cases. + maxExpectedCollections := int( + float64(initialNumberOfCollectionsAvailable) * 1.01) + results.UuidToCollection = make(map[string]Collection, maxExpectedCollections) + + previousTotalCollections := -1 + for len(results.UuidToCollection) > previousTotalCollections { + // We're still finding new collections + log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection)) + + // update count + previousTotalCollections = len(results.UuidToCollection) + + // Write the heap profile for examining memory usage + { + err := pprof.WriteHeapProfile(heap_profile) + if err != nil { + log.Fatal(err) + } + } - if (!results.ReadAllCollections) { - log.Printf("ERROR: Did not receive all collections.") + // Get next batch of collections. + var collections SdkCollectionList + log.Printf("Running with SDK Params: %v", sdkParams) + err := params.Client.List("collections", sdkParams, &collections) + if err != nil { + log.Fatalf("error querying collections: %+v", err) } - log.Printf("Received %d of %d available collections.", - numReceived, - numAvailable) + + // Process collection and update our date filter. + sdkParams["filters"].([][]string)[0][2] = ProcessCollections(collections.Items, results.UuidToCollection) + log.Printf("Latest date seen %s", sdkParams["filters"].([][]string)[0][2]) } + log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection)) - results.UuidToCollection = make(map[string]Collection) - for i, sdkCollection := range collections.Items { - count := i + 1 - if m := params.LogEveryNthCollectionProcessed; m >0 && (count % m) == 0 { - log.Printf("Processing collection #%d", count) - } + return +} + + +func ProcessCollections(receivedCollections []SdkCollectionInfo, + uuidToCollection map[string]Collection) (latestModificationDate string) { + for _, sdkCollection := range receivedCollections { collection := Collection{Uuid: sdkCollection.Uuid, OwnerUuid: sdkCollection.OwnerUuid, ReplicationLevel: sdkCollection.Redundancy, BlockDigestToSize: make(map[string]int)} + // log.Printf("Seeing modification date, owner_uuid: %s %s", + // sdkCollection.ModifiedAt, + // sdkCollection.OwnerUuid) + if sdkCollection.ModifiedAt > latestModificationDate { + latestModificationDate = sdkCollection.ModifiedAt + } manifest := manifest.Manifest{sdkCollection.ManifestText} blockChannel := manifest.BlockIterWithDuplicates() for block := range blockChannel { @@ -113,8 +190,20 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) { for _, size := range collection.BlockDigestToSize { collection.TotalSize += size } - results.UuidToCollection[collection.Uuid] = collection + uuidToCollection[collection.Uuid] = collection } return } + + +func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) { + var collections SdkCollectionList + sdkParams := arvadosclient.Dict{"limit": 0} + err := client.List("collections", sdkParams, &collections) + if err != nil { + log.Fatalf("error querying collections for items available: %v", err) + } + + return collections.ItemsAvailable +} diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go index cd3e010c46..6393787e01 100644 --- a/services/datamanager/datamanager.go +++ b/services/datamanager/datamanager.go @@ -7,7 +7,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/util" "git.curoverse.com/arvados.git/services/datamanager/collection" - "git.curoverse.com/arvados.git/services/datamanager/keep" +// "git.curoverse.com/arvados.git/services/datamanager/keep" "log" ) @@ -29,9 +29,9 @@ func main() { readCollections := collection.GetCollections( collection.GetCollectionsParams{ - Client: arv, Limit: 50, LogEveryNthCollectionProcessed: 10}) + Client: arv, BatchSize: 500}) - log.Printf("Read Collections: %v", readCollections) + //log.Printf("Read Collections: %v", readCollections) UserUsage := ComputeSizeOfOwnedCollections(readCollections) log.Printf("Uuid to Size used: %v", UserUsage) @@ -46,10 +46,10 @@ func main() { log.Printf("Read and processed %d collections", len(readCollections.UuidToCollection)) - readServers := keep.GetKeepServers( - keep.GetKeepServersParams{Client: arv, Limit: 1000}) + // readServers := keep.GetKeepServers( + // keep.GetKeepServersParams{Client: arv, Limit: 1000}) - log.Printf("Returned %d keep disks", len(readServers.AddressToContents)) + // log.Printf("Returned %d keep disks", len(readServers.AddressToContents)) } func ComputeSizeOfOwnedCollections(readCollections collection.ReadCollections) (