Changes GetCollection loop to more reliably fetch all collections
[arvados.git] / services / datamanager / collection / collection.go
index a800ce8aff3c4e379536b5e8a9bff790ffdd5f3f..1d41a7087a3fc1e73d0668c6c01533624c045d9a 100644 (file)
-/* Deals with parsing Collection responses from API Server. */
+// Deals with parsing Collection responses from API Server.
 
 package collection
 
 import (
-       "errors"
+       "flag"
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+       "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "git.curoverse.com/arvados.git/sdk/go/util"
        "log"
+       "os"
+       "runtime/pprof"
+       "time"
 )
 
+var (
+       HeapProfileFilename string
+)
+
+// Collection representation
 type Collection struct {
-       BlockDigestToSize map[string]int
-       ReplicationLevel int
-       Uuid string
-       OwnerUuid string
+       UUID              string
+       OwnerUUID         string
+       ReplicationLevel  int
+       BlockDigestToSize map[blockdigest.BlockDigest]int
+       TotalSize         int
 }
 
+// ReadCollections holds information about collections from API server
 type ReadCollections struct {
-       ReadAllCollections bool
-       UuidToCollection map[string]Collection
+       ReadAllCollections        bool
+       UUIDToCollection          map[string]Collection
+       OwnerToCollectionSize     map[string]int
+       BlockToDesiredReplication map[blockdigest.DigestWithSize]int
+       CollectionUUIDToIndex     map[string]int
+       CollectionIndexToUUID     []string
+       BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
 }
 
+// GetCollectionsParams params
 type GetCollectionsParams struct {
-       Client arvadosclient.ArvadosClient
-       Limit int
-       LogEveryNthCollectionProcessed int  // 0 means don't report any
+       Client    arvadosclient.ArvadosClient
+       Logger    *logger.Logger
+       BatchSize int
 }
 
-// TODO(misha): Move this method somewhere more central
-func SdkListResponseContainsAllAvailableItems(response map[string]interface{}) (containsAll bool, numContained int, numAvailable int) {
-       if value, ok := response["items"]; ok {
-               items := value.([]interface{})
-               {
-                       var itemsAvailable interface{}
-                       if itemsAvailable, ok = response["items_available"]; !ok {
-                               // TODO(misha): Consider returning an error here (and above if
-                               // we can't find items) so that callers can recover.
-                               log.Fatalf("API server did not return the number of items available")
-                       }
-                       numContained = len(items)
-                       numAvailable = int(itemsAvailable.(float64))
-                       // If we never entered this block, allAvailable would be false by
-                       // default, which is what we want
-                       containsAll = numContained == numAvailable
+// SdkCollectionInfo holds collection info from api
+type SdkCollectionInfo struct {
+       UUID         string    `json:"uuid"`
+       OwnerUUID    string    `json:"owner_uuid"`
+       Redundancy   int       `json:"redundancy"`
+       ModifiedAt   time.Time `json:"modified_at"`
+       ManifestText string    `json:"manifest_text"`
+}
+
+// SdkCollectionList lists collections from api
+type SdkCollectionList struct {
+       ItemsAvailable int                 `json:"items_available"`
+       Items          []SdkCollectionInfo `json:"items"`
+}
+
+func init() {
+       flag.StringVar(&HeapProfileFilename,
+               "heap-profile",
+               "",
+               "File to write the heap profiles to. Leave blank to skip profiling.")
+}
+
+// WriteHeapProfile writes the heap profile to a file for later review.
+// Since a file is expected to only contain a single heap profile this
+// function overwrites the previously written profile, so it is safe
+// to call multiple times in a single run.
+// Otherwise we would see cumulative numbers as explained here:
+// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
+func WriteHeapProfile() error {
+       if HeapProfileFilename != "" {
+               heapProfile, err := os.Create(HeapProfileFilename)
+               if err != nil {
+                       return err
                }
+
+               defer heapProfile.Close()
+
+               err = pprof.WriteHeapProfile(heapProfile)
+               return err
        }
-       return
+
+       return nil
 }
 
-func IterateSdkListItems(response map[string]interface{}) (c <-chan map[string]interface{}, err error) {
-       if value, ok := response["items"]; ok {
-               ch := make(chan map[string]interface{})
-               c = ch
-               items := value.([]interface{})
-               go func() {
-                       for _, item := range items {
-                               ch <- item.(map[string]interface{})
-                       }
-                       close(ch)
-               }()
-       } else {
-               err = errors.New("Could not find \"items\" field in response " +
-                       "passed to IterateSdkListItems()")
+// GetCollectionsAndSummarize gets collections from api and summarizes
+func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
+       results, err = GetCollections(params)
+       if err != nil {
+               return
        }
-       return
-}
 
+       results.Summarize(params.Logger)
+
+       log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
+       log.Printf("Read and processed %d collections",
+               len(results.UUIDToCollection))
+
+       // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
+       // lots of behaviors can become warnings (and obviously we can't
+       // write anything).
+       // if !readCollections.ReadAllCollections {
+       //      log.Fatalf("Did not read all collections")
+       // }
 
+       return
+}
 
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+// GetCollections gets collections from api
+func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
        if &params.Client == nil {
-               log.Fatalf("Received params.Client passed to GetCollections() should " +
+               err = fmt.Errorf("params.Client passed to GetCollections() should " +
                        "contain a valid ArvadosClient, but instead it is nil.")
+               return
        }
 
        fieldsWanted := []string{"manifest_text",
                "owner_uuid",
                "uuid",
-               // TODO(misha): Start using the redundancy field.
-               "redundancy"}
+               "redundancy",
+               "modified_at"}
+
+       sdkParams := arvadosclient.Dict{
+               "select":  fieldsWanted,
+               "order":   []string{"modified_at ASC"},
+               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+               "offset": 0}
+
+       if params.BatchSize > 0 {
+               sdkParams["limit"] = params.BatchSize
+       }
 
-       sdkParams := arvadosclient.Dict{"select": fieldsWanted}
-       if params.Limit > 0 {
-               sdkParams["limit"] = params.Limit
+       var defaultReplicationLevel int
+       {
+               var value interface{}
+               value, err = params.Client.Discovery("defaultCollectionReplication")
+               if err != nil {
+                       return
+               }
+
+               defaultReplicationLevel = int(value.(float64))
+               if defaultReplicationLevel <= 0 {
+                       err = fmt.Errorf("Default collection replication returned by arvados SDK "+
+                               "should be a positive integer but instead it was %d.",
+                               defaultReplicationLevel)
+                       return
+               }
        }
 
-       var collections map[string]interface{}
-       err := params.Client.List("collections", sdkParams, &collections)
+       initialNumberOfCollectionsAvailable, err :=
+               util.NumberItemsAvailable(params.Client, "collections")
        if err != nil {
-               log.Fatalf("error querying collections: %v", err)
+               return
        }
+       // Include a 1% margin for collections added while we're reading so
+       // that we don't have to grow the map in most cases.
+       maxExpectedCollections := int(
+               float64(initialNumberOfCollectionsAvailable) * 1.01)
+       results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
 
-       {
-               var numReceived, numAvailable int
-               results.ReadAllCollections, numReceived, numAvailable =
-                       SdkListResponseContainsAllAvailableItems(collections)
+       if params.Logger != nil {
+               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+                       collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
+                       collectionInfo["batch_size"] = params.BatchSize
+                       collectionInfo["default_replication_level"] = defaultReplicationLevel
+               })
+       }
+
+       // These values are just for getting the loop to run the first time,
+       // afterwards they'll be set to real values.
+       remainingCollections := 1
+       var totalCollections int
+       var previousTotalCollections int
+       for remainingCollections > 0 {
+               // We're still finding new collections
 
-               if (!results.ReadAllCollections) {
-                       log.Printf("ERROR: Did not receive all collections.")
+               // Write the heap profile for examining memory usage
+               err = WriteHeapProfile()
+               if err != nil {
+                       return
+               }
+
+               // Get next batch of collections.
+               var collections SdkCollectionList
+               err = params.Client.List("collections", sdkParams, &collections)
+               if err != nil {
+                       return
+               }
+
+               // Update count of remaining collections
+               remainingCollections = collections.ItemsAvailable - params.BatchSize - sdkParams["offset"].(int)
+               if remainingCollections < 0 {
+                       remainingCollections = 0
+               }
+
+               // Process collection and update our date filter.
+               latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
+                       collections.Items,
+                       defaultReplicationLevel,
+                       results.UUIDToCollection)
+               if err != nil {
+                       return results, err
+               }
+               if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
+                       sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
+                       sdkParams["offset"] = 0
+               } else {
+                       sdkParams["offset"] = sdkParams["offset"].(int) + params.BatchSize
+               }
+
+               // update counts
+               previousTotalCollections = totalCollections
+               totalCollections = len(results.UUIDToCollection)
+
+               log.Printf("%d collections read, %d new in last batch, "+
+                       "%d remaining, "+
+                       "%s latest modified date, %.0f %d %d avg,max,total manifest size",
+                       totalCollections,
+                       totalCollections-previousTotalCollections,
+                       remainingCollections,
+                       sdkParams["filters"].([][]string)[0][2],
+                       float32(totalManifestSize)/float32(totalCollections),
+                       maxManifestSize, totalManifestSize)
+
+               if params.Logger != nil {
+                       params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                               collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+                               collectionInfo["collections_read"] = totalCollections
+                               collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
+                               collectionInfo["total_manifest_size"] = totalManifestSize
+                               collectionInfo["max_manifest_size"] = maxManifestSize
+                       })
                }
-               log.Printf("Received %d of %d available collections.",
-                       numReceived,
-                       numAvailable)
        }
 
-       if collectionChannel, err := IterateSdkListItems(collections); err != nil {
-               log.Fatalf("Error trying to iterate collections returned by SDK: %v", err)
-       } else {
-               index := 0
-               results.UuidToCollection = make(map[string]Collection)
-               for item_map := range collectionChannel {
-                       index += 1
-                       if m := params.LogEveryNthCollectionProcessed; m >0 && (index % m) == 0 {
-                               log.Printf("Processing collection #%d", index)
+       // Write the heap profile for examining memory usage
+       err = WriteHeapProfile()
+
+       return
+}
+
+// StrCopy returns a newly allocated string.
+// It is useful to copy slices so that the garbage collector can reuse
+// the memory of the longer strings they came from.
+func StrCopy(s string) string {
+       return string([]byte(s))
+}
+
+// ProcessCollections read from api server
+func ProcessCollections(arvLogger *logger.Logger,
+       receivedCollections []SdkCollectionInfo,
+       defaultReplicationLevel int,
+       UUIDToCollection map[string]Collection,
+) (
+       latestModificationDate time.Time,
+       maxManifestSize, totalManifestSize uint64,
+       err error,
+) {
+       for _, sdkCollection := range receivedCollections {
+               collection := Collection{UUID: StrCopy(sdkCollection.UUID),
+                       OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
+                       ReplicationLevel:  sdkCollection.Redundancy,
+                       BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
+
+               if sdkCollection.ModifiedAt.IsZero() {
+                       err = fmt.Errorf(
+                               "Arvados SDK collection returned with unexpected zero "+
+                                       "modification date. This probably means that either we failed to "+
+                                       "parse the modification date or the API server has changed how "+
+                                       "it returns modification dates: %+v",
+                               collection)
+                       return
+               }
+
+               if sdkCollection.ModifiedAt.After(latestModificationDate) {
+                       latestModificationDate = sdkCollection.ModifiedAt
+               }
+
+               if collection.ReplicationLevel == 0 {
+                       collection.ReplicationLevel = defaultReplicationLevel
+               }
+
+               manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
+               manifestSize := uint64(len(sdkCollection.ManifestText))
+
+               if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
+                       totalManifestSize += manifestSize
+               }
+               if manifestSize > maxManifestSize {
+                       maxManifestSize = manifestSize
+               }
+
+               blockChannel := manifest.BlockIterWithDuplicates()
+               for block := range blockChannel {
+                       if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
+                               log.Printf(
+                                       "Collection %s contains multiple sizes (%d and %d) for block %s",
+                                       collection.UUID,
+                                       storedSize,
+                                       block.Size,
+                                       block.Digest)
                        }
-                       collection := Collection{Uuid: item_map["uuid"].(string),
-                               OwnerUuid: item_map["owner_uuid"].(string),
-                               BlockDigestToSize: make(map[string]int)}
-                       manifest := manifest.Manifest{item_map["manifest_text"].(string)}
-                       blockChannel := manifest.BlockIterWithDuplicates()
-                       for block := range blockChannel {
-                               if stored_size, stored := collection.BlockDigestToSize[block.Digest];
-                               stored && stored_size != block.Size {
-                                       log.Fatalf(
-                                               "Collection %s contains multiple sizes (%d and %d) for block %s",
-                                               collection.Uuid,
-                                               stored_size,
-                                               block.Size,
-                                               block.Digest)
-                               }
-                               collection.BlockDigestToSize[block.Digest] = block.Size
+                       collection.BlockDigestToSize[block.Digest] = block.Size
+               }
+               if manifest.Err != nil {
+                       err = manifest.Err
+                       return
+               }
+
+               collection.TotalSize = 0
+               for _, size := range collection.BlockDigestToSize {
+                       collection.TotalSize += size
+               }
+               UUIDToCollection[collection.UUID] = collection
+
+               // Clear out all the manifest strings that we don't need anymore.
+               // These hopefully form the bulk of our memory usage.
+               manifest.Text = ""
+               sdkCollection.ManifestText = ""
+       }
+
+       return
+}
+
+// Summarize the collections read
+func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
+       readCollections.OwnerToCollectionSize = make(map[string]int)
+       readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
+       numCollections := len(readCollections.UUIDToCollection)
+       readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
+       readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
+       readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
+
+       for _, coll := range readCollections.UUIDToCollection {
+               collectionIndex := len(readCollections.CollectionIndexToUUID)
+               readCollections.CollectionIndexToUUID =
+                       append(readCollections.CollectionIndexToUUID, coll.UUID)
+               readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
+
+               readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
+                       readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
+
+               for block, size := range coll.BlockDigestToSize {
+                       locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
+                       readCollections.BlockToCollectionIndices[locator] =
+                               append(readCollections.BlockToCollectionIndices[locator],
+                                       collectionIndex)
+                       storedReplication := readCollections.BlockToDesiredReplication[locator]
+                       if coll.ReplicationLevel > storedReplication {
+                               readCollections.BlockToDesiredReplication[locator] =
+                                       coll.ReplicationLevel
                        }
-                       results.UuidToCollection[collection.Uuid] = collection
                }
        }
+
+       if arvLogger != nil {
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
+                       // Since maps are shallow copied, we run a risk of concurrent
+                       // updates here. By copying results.OwnerToCollectionSize into
+                       // the log, we're assuming that it won't be updated.
+                       collectionInfo["owner_to_collection_size"] =
+                               readCollections.OwnerToCollectionSize
+                       collectionInfo["distinct_blocks_named"] =
+                               len(readCollections.BlockToDesiredReplication)
+               })
+       }
+
        return
 }