1 // Deals with parsing Collection responses from API Server.
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10 "git.curoverse.com/arvados.git/sdk/go/logger"
11 "git.curoverse.com/arvados.git/sdk/go/manifest"
12 "git.curoverse.com/arvados.git/sdk/go/util"
20 HeapProfileFilename string
23 // Collection representation
24 type Collection struct {
28 BlockDigestToSize map[blockdigest.BlockDigest]int
32 // ReadCollections holds information about collections from API server
33 type ReadCollections struct {
34 ReadAllCollections bool
35 UUIDToCollection map[string]Collection
36 OwnerToCollectionSize map[string]int
37 BlockToDesiredReplication map[blockdigest.DigestWithSize]int
38 CollectionUUIDToIndex map[string]int
39 CollectionIndexToUUID []string
40 BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
43 // GetCollectionsParams params
44 type GetCollectionsParams struct {
45 Client arvadosclient.ArvadosClient
50 // SdkCollectionInfo holds collection info from api
51 type SdkCollectionInfo struct {
52 UUID string `json:"uuid"`
53 OwnerUUID string `json:"owner_uuid"`
54 ReplicationDesired int `json:"replication_desired"`
55 ModifiedAt time.Time `json:"modified_at"`
56 ManifestText string `json:"manifest_text"`
59 // SdkCollectionList lists collections from api
60 type SdkCollectionList struct {
61 ItemsAvailable int `json:"items_available"`
62 Items []SdkCollectionInfo `json:"items"`
66 flag.StringVar(&HeapProfileFilename,
69 "File to write the heap profiles to. Leave blank to skip profiling.")
72 // WriteHeapProfile writes the heap profile to a file for later review.
73 // Since a file is expected to only contain a single heap profile this
74 // function overwrites the previously written profile, so it is safe
75 // to call multiple times in a single run.
76 // Otherwise we would see cumulative numbers as explained here:
77 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
78 func WriteHeapProfile() error {
79 if HeapProfileFilename != "" {
80 heapProfile, err := os.Create(HeapProfileFilename)
85 defer heapProfile.Close()
87 err = pprof.WriteHeapProfile(heapProfile)
94 // GetCollectionsAndSummarize gets collections from api and summarizes
95 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
96 results, err = GetCollections(params)
101 results.Summarize(params.Logger)
103 log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
104 log.Printf("Read and processed %d collections",
105 len(results.UUIDToCollection))
107 // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
108 // lots of behaviors can become warnings (and obviously we can't
110 // if !readCollections.ReadAllCollections {
111 // log.Fatalf("Did not read all collections")
117 // GetCollections gets collections from api
118 func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
119 if ¶ms.Client == nil {
120 err = fmt.Errorf("params.Client passed to GetCollections() should " +
121 "contain a valid ArvadosClient, but instead it is nil.")
125 fieldsWanted := []string{"manifest_text",
128 "replication_desired",
131 sdkParams := arvadosclient.Dict{
132 "select": fieldsWanted,
133 "order": []string{"modified_at ASC", "uuid ASC"},
134 "filters": [][]string{{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
137 if params.BatchSize > 0 {
138 sdkParams["limit"] = params.BatchSize
141 var defaultReplicationLevel int
143 var value interface{}
144 value, err = params.Client.Discovery("defaultCollectionReplication")
149 defaultReplicationLevel = int(value.(float64))
150 if defaultReplicationLevel <= 0 {
151 err = fmt.Errorf("Default collection replication returned by arvados SDK "+
152 "should be a positive integer but instead it was %d.",
153 defaultReplicationLevel)
158 initialNumberOfCollectionsAvailable, err :=
159 util.NumberItemsAvailable(params.Client, "collections")
163 // Include a 1% margin for collections added while we're reading so
164 // that we don't have to grow the map in most cases.
165 maxExpectedCollections := int(
166 float64(initialNumberOfCollectionsAvailable) * 1.01)
167 results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
169 if params.Logger != nil {
170 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
171 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
172 collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
173 collectionInfo["batch_size"] = params.BatchSize
174 collectionInfo["default_replication_level"] = defaultReplicationLevel
178 // These values are just for getting the loop to run the first time,
179 // afterwards they'll be set to real values.
180 remainingCollections := 1
181 var totalCollections int
182 var previousTotalCollections int
183 for remainingCollections > 0 {
184 // We're still finding new collections
186 // Write the heap profile for examining memory usage
187 err = WriteHeapProfile()
192 // Get next batch of collections.
193 var collections SdkCollectionList
194 err = params.Client.List("collections", sdkParams, &collections)
198 batchCollections := len(collections.Items)
200 // We must always have at least one collection in the batch
201 if batchCollections < 1 {
202 err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
206 // Update count of remaining collections
207 remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
209 // Process collection and update our date filter.
210 latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
212 defaultReplicationLevel,
213 results.UUIDToCollection)
217 if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
218 sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
219 sdkParams["offset"] = 0
221 sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
225 previousTotalCollections = totalCollections
226 totalCollections = len(results.UUIDToCollection)
228 log.Printf("%d collections read, %d (%d new) in last batch, "+
230 "%s latest modified date, %.0f %d %d avg,max,total manifest size",
233 totalCollections-previousTotalCollections,
234 remainingCollections,
235 sdkParams["filters"].([][]string)[0][2],
236 float32(totalManifestSize)/float32(totalCollections),
237 maxManifestSize, totalManifestSize)
239 if params.Logger != nil {
240 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
241 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
242 collectionInfo["collections_read"] = totalCollections
243 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
244 collectionInfo["total_manifest_size"] = totalManifestSize
245 collectionInfo["max_manifest_size"] = maxManifestSize
250 // Make one final API request to verify that we have processed all collections available up to the latest modification date
251 var collections SdkCollectionList
252 sdkParams["filters"].([][]string)[0][1] = "<="
253 sdkParams["limit"] = 0
254 err = params.Client.List("collections", sdkParams, &collections)
258 finalNumberOfCollectionsAvailable, err :=
259 util.NumberItemsAvailable(params.Client, "collections")
263 if totalCollections < finalNumberOfCollectionsAvailable {
264 err = fmt.Errorf("API server indicates a total of %d collections "+
265 "available up to %v, but we only retrieved %d. "+
266 "Refusing to continue as this could indicate an "+
267 "otherwise undetected failure.",
268 finalNumberOfCollectionsAvailable,
269 sdkParams["filters"].([][]string)[0][2],
274 // Write the heap profile for examining memory usage
275 err = WriteHeapProfile()
280 // StrCopy returns a newly allocated string.
281 // It is useful to copy slices so that the garbage collector can reuse
282 // the memory of the longer strings they came from.
283 func StrCopy(s string) string {
284 return string([]byte(s))
287 // ProcessCollections read from api server
288 func ProcessCollections(arvLogger *logger.Logger,
289 receivedCollections []SdkCollectionInfo,
290 defaultReplicationLevel int,
291 UUIDToCollection map[string]Collection,
293 latestModificationDate time.Time,
294 maxManifestSize, totalManifestSize uint64,
297 for _, sdkCollection := range receivedCollections {
298 collection := Collection{UUID: StrCopy(sdkCollection.UUID),
299 OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
300 ReplicationLevel: sdkCollection.ReplicationDesired,
301 BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
303 if sdkCollection.ModifiedAt.IsZero() {
305 "Arvados SDK collection returned with unexpected zero "+
306 "modification date. This probably means that either we failed to "+
307 "parse the modification date or the API server has changed how "+
308 "it returns modification dates: %+v",
313 if sdkCollection.ModifiedAt.After(latestModificationDate) {
314 latestModificationDate = sdkCollection.ModifiedAt
317 if collection.ReplicationLevel == 0 {
318 collection.ReplicationLevel = defaultReplicationLevel
321 manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
322 manifestSize := uint64(len(sdkCollection.ManifestText))
324 if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
325 totalManifestSize += manifestSize
327 if manifestSize > maxManifestSize {
328 maxManifestSize = manifestSize
331 blockChannel := manifest.BlockIterWithDuplicates()
332 for block := range blockChannel {
333 if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
335 "Collection %s contains multiple sizes (%d and %d) for block %s",
341 collection.BlockDigestToSize[block.Digest] = block.Size
343 if manifest.Err != nil {
348 collection.TotalSize = 0
349 for _, size := range collection.BlockDigestToSize {
350 collection.TotalSize += size
352 UUIDToCollection[collection.UUID] = collection
354 // Clear out all the manifest strings that we don't need anymore.
355 // These hopefully form the bulk of our memory usage.
357 sdkCollection.ManifestText = ""
363 // Summarize the collections read
364 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
365 readCollections.OwnerToCollectionSize = make(map[string]int)
366 readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
367 numCollections := len(readCollections.UUIDToCollection)
368 readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
369 readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
370 readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
372 for _, coll := range readCollections.UUIDToCollection {
373 collectionIndex := len(readCollections.CollectionIndexToUUID)
374 readCollections.CollectionIndexToUUID =
375 append(readCollections.CollectionIndexToUUID, coll.UUID)
376 readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
378 readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
379 readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
381 for block, size := range coll.BlockDigestToSize {
382 locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
383 readCollections.BlockToCollectionIndices[locator] =
384 append(readCollections.BlockToCollectionIndices[locator],
386 storedReplication := readCollections.BlockToDesiredReplication[locator]
387 if coll.ReplicationLevel > storedReplication {
388 readCollections.BlockToDesiredReplication[locator] =
389 coll.ReplicationLevel
394 if arvLogger != nil {
395 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
396 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
397 // Since maps are shallow copied, we run a risk of concurrent
398 // updates here. By copying results.OwnerToCollectionSize into
399 // the log, we're assuming that it won't be updated.
400 collectionInfo["owner_to_collection_size"] =
401 readCollections.OwnerToCollectionSize
402 collectionInfo["distinct_blocks_named"] =
403 len(readCollections.BlockToDesiredReplication)