Started reading collections and keep data in parallel. Moved some logic from datamana...
[arvados.git] / services / datamanager / collection / collection.go
index 927b88819d7869a2ec4f03e203181f35d193f27c..f9e2a3dc82110a35c254e6ea6475bbebdd941dfa 100644 (file)
@@ -4,18 +4,23 @@ package collection
 
 import (
        "flag"
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+       "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
-       //"git.curoverse.com/arvados.git/sdk/go/util"
        "log"
        "os"
+       "runtime"
        "runtime/pprof"
+       "time"
 )
 
 var (
        heap_profile_filename string
-       heap_profile *os.File
+       // globals for debugging
+       totalManifestSize uint64
+       maxManifestSize uint64
 )
 
 type Collection struct {
@@ -29,19 +34,21 @@ type Collection struct {
 type ReadCollections struct {
        ReadAllCollections bool
        UuidToCollection map[string]Collection
+       OwnerToCollectionSize map[string]int
 }
 
 type GetCollectionsParams struct {
        Client arvadosclient.ArvadosClient
+       Logger *logger.Logger
        BatchSize int
 }
 
 type SdkCollectionInfo struct {
-       Uuid           string   `json:"uuid"`
-       OwnerUuid      string   `json:"owner_uuid"`
-       Redundancy     int      `json:"redundancy"`
-       ModifiedAt     string   `json:"modified_at"`
-       ManifestText   string   `json:"manifest_text"`
+       Uuid           string     `json:"uuid"`
+       OwnerUuid      string     `json:"owner_uuid"`
+       Redundancy     int        `json:"redundancy"`
+       ModifiedAt     time.Time  `json:"modified_at"`
+       ManifestText   string     `json:"manifest_text"`
 }
 
 type SdkCollectionList struct {
@@ -53,7 +60,7 @@ func init() {
        flag.StringVar(&heap_profile_filename, 
                "heap-profile",
                "",
-               "File to write the heap profiles to.")
+               "File to write the heap profiles to. Leave blank to skip profiling.")
 }
 
 // // Methods to implement util.SdkListResponse Interface
@@ -65,21 +72,60 @@ func init() {
 //     return len(s.Items), nil
 // }
 
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
-       if &params.Client == nil {
-               log.Fatalf("params.Client passed to GetCollections() should " +
-                       "contain a valid ArvadosClient, but instead it is nil.")
-       }
-
-       // TODO(misha): move this code somewhere better and make sure it's
-       // only run once
+// Write the heap profile to a file for later review.
+// Since a file is expected to only contain a single heap profile this
+// function overwrites the previously written profile, so it is safe
+// to call multiple times in a single run.
+// Otherwise we would see cumulative numbers as explained here:
+// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
+func WriteHeapProfile() {
        if heap_profile_filename != "" {
-               var err error
-               heap_profile, err = os.Create(heap_profile_filename)
+
+               heap_profile, err := os.Create(heap_profile_filename)
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               defer heap_profile.Close()
+
+               err = pprof.WriteHeapProfile(heap_profile)
                if err != nil {
                        log.Fatal(err)
                }
        }
+}
+
+
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
+       results = GetCollections(params)
+       ComputeSizeOfOwnedCollections(&results)
+
+       if params.Logger != nil {
+               properties,_ := params.Logger.Edit()
+               collectionInfo := properties["collection_info"].(map[string]interface{})
+               collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
+               params.Logger.Record()
+       }
+
+       log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
+       log.Printf("Read and processed %d collections",
+               len(results.UuidToCollection))
+
+       // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
+       // lots of behaviors can become warnings (and obviously we can't
+       // write anything).
+       // if !readCollections.ReadAllCollections {
+       //      log.Fatalf("Did not read all collections")
+       // }
+
+       return
+}
+
+func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+       if &params.Client == nil {
+               log.Fatalf("params.Client passed to GetCollections() should " +
+                       "contain a valid ArvadosClient, but instead it is nil.")
+       }
 
        fieldsWanted := []string{"manifest_text",
                "owner_uuid",
@@ -92,29 +138,11 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                "select": fieldsWanted,
                "order": []string{"modified_at ASC"},
                "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
-               // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
-               //"filters": [][]string{[]string{"modified_at", ">=", "2014-11-05T20:44:50Z"}}}
 
        if params.BatchSize > 0 {
                sdkParams["limit"] = params.BatchSize
        }
 
-       // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
-       sdkParams["limit"] = 50
-
-       // {
-       //      var numReceived, numAvailable int
-       //      results.ReadAllCollections, numReceived, numAvailable =
-       //              util.ContainsAllAvailableItems(collections)
-
-       //      if (!results.ReadAllCollections) {
-       //              log.Printf("ERROR: Did not receive all collections.")
-       //      }
-       //      log.Printf("Received %d of %d available collections.",
-       //              numReceived,
-       //              numAvailable)
-       // }
-
        initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
        // Include a 1% margin for collections added while we're reading so
        // that we don't have to grow the map in most cases.
@@ -122,72 +150,123 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                float64(initialNumberOfCollectionsAvailable) * 1.01)
        results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
 
+       if params.Logger != nil {
+               properties,_ := params.Logger.Edit()
+               collectionInfo := make(map[string]interface{})
+               collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
+               collectionInfo["batch_size"] = params.BatchSize
+               properties["collection_info"] = collectionInfo
+               params.Logger.Record()
+       }
+
+       // These values are just for getting the loop to run the first time,
+       // afterwards they'll be set to real values.
        previousTotalCollections := -1
-       for len(results.UuidToCollection) > previousTotalCollections {
+       totalCollections := 0
+       for totalCollections > previousTotalCollections {
                // We're still finding new collections
-               log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection))
-
-               // update count
-               previousTotalCollections = len(results.UuidToCollection)
 
                // Write the heap profile for examining memory usage
-               if heap_profile != nil {
-                       err := pprof.WriteHeapProfile(heap_profile)
-                       if err != nil {
-                               log.Fatal(err)
-                       }
-               }
+               WriteHeapProfile()
 
                // Get next batch of collections.
                var collections SdkCollectionList
-               log.Printf("Running with SDK Params: %v", sdkParams)
                err := params.Client.List("collections", sdkParams, &collections)
                if err != nil {
-                       log.Fatalf("error querying collections: %+v", err)
+                       fatalWithMessage(params.Logger,
+                               fmt.Sprintf("Error querying collections: %v", err))
                }
 
                // Process collection and update our date filter.
-               sdkParams["filters"].([][]string)[0][2] = ProcessCollections(collections.Items, results.UuidToCollection)
-               log.Printf("Latest date seen %s", sdkParams["filters"].([][]string)[0][2])
+               sdkParams["filters"].([][]string)[0][2] =
+                       ProcessCollections(params.Logger,
+                       collections.Items,
+                       results.UuidToCollection).Format(time.RFC3339)
+
+               // update counts
+               previousTotalCollections = totalCollections
+               totalCollections = len(results.UuidToCollection)
+
+               log.Printf("%d collections read, %d new in last batch, " +
+                       "%s latest modified date, %.0f %d %d avg,max,total manifest size",
+                       totalCollections,
+                       totalCollections - previousTotalCollections,
+                       sdkParams["filters"].([][]string)[0][2],
+                       float32(totalManifestSize)/float32(totalCollections),
+                       maxManifestSize, totalManifestSize)
+
+               if params.Logger != nil {
+                       properties,_ := params.Logger.Edit()
+                       collectionInfo := properties["collection_info"].(map[string]interface{})
+                       collectionInfo["collections_read"] = totalCollections
+                       collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+                       collectionInfo["total_manifest_size"] = totalManifestSize
+                       collectionInfo["max_manifest_size"] = maxManifestSize
+                       params.Logger.Record()
+               }
        }
-       log.Printf("previous, current: %d %d", previousTotalCollections, len(results.UuidToCollection))
+
+       // Just in case this lowers the numbers reported in the heap profile.
+       runtime.GC()
 
        // Write the heap profile for examining memory usage
-       if heap_profile != nil {
-               err := pprof.WriteHeapProfile(heap_profile)
-               if err != nil {
-                       log.Fatal(err)
-               }
-       }
+       WriteHeapProfile()
 
        return
 }
 
 
-func ProcessCollections(receivedCollections []SdkCollectionInfo,
-       uuidToCollection map[string]Collection) (latestModificationDate string) {
+// StrCopy returns a newly allocated string.
+// It is useful to copy slices so that the garbage collector can reuse
+// the memory of the longer strings they came from.
+func StrCopy(s string) string {
+       return string([]byte(s))
+}
+
+
+func ProcessCollections(arvLogger *logger.Logger,
+       receivedCollections []SdkCollectionInfo,
+       uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
        for _, sdkCollection := range receivedCollections {
-               collection := Collection{Uuid: sdkCollection.Uuid,
-                       OwnerUuid: sdkCollection.OwnerUuid,
+               collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
+                       OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
                        ReplicationLevel: sdkCollection.Redundancy,
                        BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
-               // log.Printf("Seeing modification date, owner_uuid: %s %s",
-               //      sdkCollection.ModifiedAt,
-               //      sdkCollection.OwnerUuid)
-               if sdkCollection.ModifiedAt > latestModificationDate {
+
+               if sdkCollection.ModifiedAt.IsZero() {
+                       fatalWithMessage(arvLogger,
+                               fmt.Sprintf(
+                                       "Arvados SDK collection returned with unexpected zero " +
+                                               "modifcation date. This probably means that either we failed to " +
+                                               "parse the modification date or the API server has changed how " +
+                                               "it returns modification dates: %v",
+                                       collection))
+               }
+
+               if sdkCollection.ModifiedAt.After(latestModificationDate) {
                        latestModificationDate = sdkCollection.ModifiedAt
                }
                manifest := manifest.Manifest{sdkCollection.ManifestText}
+               manifestSize := uint64(len(sdkCollection.ManifestText))
+
+               if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
+                       totalManifestSize += manifestSize
+               }
+               if manifestSize > maxManifestSize {
+                       maxManifestSize = manifestSize
+               }
+               
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
                        if stored_size, stored := collection.BlockDigestToSize[block.Digest];
                        stored && stored_size != block.Size {
-                               log.Fatalf(
+                               message := fmt.Sprintf(
                                        "Collection %s contains multiple sizes (%d and %d) for block %s",
                                        collection.Uuid,
                                        stored_size,
                                        block.Size,
                                        block.Digest)
+                               fatalWithMessage(arvLogger, message)
                        }
                        collection.BlockDigestToSize[block.Digest] = block.Size
                }
@@ -217,3 +296,29 @@ func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
 
        return collections.ItemsAvailable
 }
+
+
+func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
+       readCollections.OwnerToCollectionSize = make(map[string]int)
+       for _, coll := range readCollections.UuidToCollection {
+               readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
+                       readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
+       }
+
+       return
+}
+
+
+// Assumes you haven't already called arvLogger.Edit()!
+// If you have called arvLogger.Edit() this method will hang waiting
+// for the lock you're already holding.
+func fatalWithMessage(arvLogger *logger.Logger, message string) {
+       if arvLogger != nil {
+               properties,_ := arvLogger.Edit()
+               properties["FATAL"] = message
+               properties["run_info"].(map[string]interface{})["end_time"] = time.Now()
+               arvLogger.ForceRecord()
+       }
+
+       log.Fatalf(message)
+}