Merge branch '8484-sanity-check-collection-count' closes #8484
[arvados.git] / services / datamanager / collection / collection.go
1 // Deals with parsing Collection responses from API Server.
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "log"
14         "os"
15         "runtime/pprof"
16         "time"
17 )
18
19 var (
20         HeapProfileFilename string
21 )
22
23 // Collection representation
24 type Collection struct {
25         UUID              string
26         OwnerUUID         string
27         ReplicationLevel  int
28         BlockDigestToSize map[blockdigest.BlockDigest]int
29         TotalSize         int
30 }
31
32 // ReadCollections holds information about collections from API server
33 type ReadCollections struct {
34         ReadAllCollections        bool
35         UUIDToCollection          map[string]Collection
36         OwnerToCollectionSize     map[string]int
37         BlockToDesiredReplication map[blockdigest.DigestWithSize]int
38         CollectionUUIDToIndex     map[string]int
39         CollectionIndexToUUID     []string
40         BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
41 }
42
43 // GetCollectionsParams params
44 type GetCollectionsParams struct {
45         Client    arvadosclient.ArvadosClient
46         Logger    *logger.Logger
47         BatchSize int
48 }
49
50 // SdkCollectionInfo holds collection info from api
51 type SdkCollectionInfo struct {
52         UUID         string    `json:"uuid"`
53         OwnerUUID    string    `json:"owner_uuid"`
54         Redundancy   int       `json:"redundancy"`
55         ModifiedAt   time.Time `json:"modified_at"`
56         ManifestText string    `json:"manifest_text"`
57 }
58
59 // SdkCollectionList lists collections from api
60 type SdkCollectionList struct {
61         ItemsAvailable int                 `json:"items_available"`
62         Items          []SdkCollectionInfo `json:"items"`
63 }
64
65 func init() {
66         flag.StringVar(&HeapProfileFilename,
67                 "heap-profile",
68                 "",
69                 "File to write the heap profiles to. Leave blank to skip profiling.")
70 }
71
72 // WriteHeapProfile writes the heap profile to a file for later review.
73 // Since a file is expected to only contain a single heap profile this
74 // function overwrites the previously written profile, so it is safe
75 // to call multiple times in a single run.
76 // Otherwise we would see cumulative numbers as explained here:
77 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
78 func WriteHeapProfile() error {
79         if HeapProfileFilename != "" {
80                 heapProfile, err := os.Create(HeapProfileFilename)
81                 if err != nil {
82                         return err
83                 }
84
85                 defer heapProfile.Close()
86
87                 err = pprof.WriteHeapProfile(heapProfile)
88                 return err
89         }
90
91         return nil
92 }
93
94 // GetCollectionsAndSummarize gets collections from api and summarizes
95 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
96         results, err = GetCollections(params)
97         if err != nil {
98                 return
99         }
100
101         results.Summarize(params.Logger)
102
103         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
104         log.Printf("Read and processed %d collections",
105                 len(results.UUIDToCollection))
106
107         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
108         // lots of behaviors can become warnings (and obviously we can't
109         // write anything).
110         // if !readCollections.ReadAllCollections {
111         //      log.Fatalf("Did not read all collections")
112         // }
113
114         return
115 }
116
117 // GetCollections gets collections from api
118 func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
119         if &params.Client == nil {
120                 err = fmt.Errorf("params.Client passed to GetCollections() should " +
121                         "contain a valid ArvadosClient, but instead it is nil.")
122                 return
123         }
124
125         fieldsWanted := []string{"manifest_text",
126                 "owner_uuid",
127                 "uuid",
128                 "redundancy",
129                 "modified_at"}
130
131         sdkParams := arvadosclient.Dict{
132                 "select":  fieldsWanted,
133                 "order":   []string{"modified_at ASC", "uuid ASC"},
134                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
135                 "offset": 0}
136
137         if params.BatchSize > 0 {
138                 sdkParams["limit"] = params.BatchSize
139         }
140
141         var defaultReplicationLevel int
142         {
143                 var value interface{}
144                 value, err = params.Client.Discovery("defaultCollectionReplication")
145                 if err != nil {
146                         return
147                 }
148
149                 defaultReplicationLevel = int(value.(float64))
150                 if defaultReplicationLevel <= 0 {
151                         err = fmt.Errorf("Default collection replication returned by arvados SDK "+
152                                 "should be a positive integer but instead it was %d.",
153                                 defaultReplicationLevel)
154                         return
155                 }
156         }
157
158         initialNumberOfCollectionsAvailable, err :=
159                 util.NumberItemsAvailable(params.Client, "collections")
160         if err != nil {
161                 return
162         }
163         // Include a 1% margin for collections added while we're reading so
164         // that we don't have to grow the map in most cases.
165         maxExpectedCollections := int(
166                 float64(initialNumberOfCollectionsAvailable) * 1.01)
167         results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
168
169         if params.Logger != nil {
170                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
171                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
172                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
173                         collectionInfo["batch_size"] = params.BatchSize
174                         collectionInfo["default_replication_level"] = defaultReplicationLevel
175                 })
176         }
177
178         // These values are just for getting the loop to run the first time,
179         // afterwards they'll be set to real values.
180         remainingCollections := 1
181         var totalCollections int
182         var previousTotalCollections int
183         for remainingCollections > 0 {
184                 // We're still finding new collections
185
186                 // Write the heap profile for examining memory usage
187                 err = WriteHeapProfile()
188                 if err != nil {
189                         return
190                 }
191
192                 // Get next batch of collections.
193                 var collections SdkCollectionList
194                 err = params.Client.List("collections", sdkParams, &collections)
195                 if err != nil {
196                         return
197                 }
198                 batchCollections := len(collections.Items)
199
200                 // We must always have at least one collection in the batch
201                 if batchCollections < 1 {
202                         err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
203                         return
204                 }
205
206                 // Update count of remaining collections
207                 remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
208
209                 // Process collection and update our date filter.
210                 latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
211                         collections.Items,
212                         defaultReplicationLevel,
213                         results.UUIDToCollection)
214                 if err != nil {
215                         return results, err
216                 }
217                 if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
218                         sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
219                         sdkParams["offset"] = 0
220                 } else {
221                         sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
222                 }
223
224                 // update counts
225                 previousTotalCollections = totalCollections
226                 totalCollections = len(results.UUIDToCollection)
227
228                 log.Printf("%d collections read, %d (%d new) in last batch, "+
229                         "%d remaining, "+
230                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
231                         totalCollections,
232                         batchCollections,
233                         totalCollections-previousTotalCollections,
234                         remainingCollections,
235                         sdkParams["filters"].([][]string)[0][2],
236                         float32(totalManifestSize)/float32(totalCollections),
237                         maxManifestSize, totalManifestSize)
238
239                 if params.Logger != nil {
240                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
241                                 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
242                                 collectionInfo["collections_read"] = totalCollections
243                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
244                                 collectionInfo["total_manifest_size"] = totalManifestSize
245                                 collectionInfo["max_manifest_size"] = maxManifestSize
246                         })
247                 }
248         }
249
250         // Make one final API request to verify that we have processed all collections available up to the latest modification date
251         var collections SdkCollectionList
252         sdkParams["filters"].([][]string)[0][1] = "<="
253         sdkParams["limit"] = 0
254         err = params.Client.List("collections", sdkParams, &collections)
255         if err != nil {
256                 return
257         }
258         finalNumberOfCollectionsAvailable, err :=
259                 util.NumberItemsAvailable(params.Client, "collections")
260         if err != nil {
261                 return
262         }
263         if totalCollections < finalNumberOfCollectionsAvailable {
264                 err = fmt.Errorf("API server indicates a total of %d collections "+
265                                 "available up to %v, but we only retrieved %d. "+
266                                 "Refusing to continue as this could indicate an "+
267                                 "otherwise undetected failure.",
268                                 finalNumberOfCollectionsAvailable, 
269                                 sdkParams["filters"].([][]string)[0][2],
270                                 totalCollections)
271                 return
272         }
273
274         // Write the heap profile for examining memory usage
275         err = WriteHeapProfile()
276
277         return
278 }
279
280 // StrCopy returns a newly allocated string.
281 // It is useful to copy slices so that the garbage collector can reuse
282 // the memory of the longer strings they came from.
283 func StrCopy(s string) string {
284         return string([]byte(s))
285 }
286
287 // ProcessCollections read from api server
288 func ProcessCollections(arvLogger *logger.Logger,
289         receivedCollections []SdkCollectionInfo,
290         defaultReplicationLevel int,
291         UUIDToCollection map[string]Collection,
292 ) (
293         latestModificationDate time.Time,
294         maxManifestSize, totalManifestSize uint64,
295         err error,
296 ) {
297         for _, sdkCollection := range receivedCollections {
298                 collection := Collection{UUID: StrCopy(sdkCollection.UUID),
299                         OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
300                         ReplicationLevel:  sdkCollection.Redundancy,
301                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
302
303                 if sdkCollection.ModifiedAt.IsZero() {
304                         err = fmt.Errorf(
305                                 "Arvados SDK collection returned with unexpected zero "+
306                                         "modification date. This probably means that either we failed to "+
307                                         "parse the modification date or the API server has changed how "+
308                                         "it returns modification dates: %+v",
309                                 collection)
310                         return
311                 }
312
313                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
314                         latestModificationDate = sdkCollection.ModifiedAt
315                 }
316
317                 if collection.ReplicationLevel == 0 {
318                         collection.ReplicationLevel = defaultReplicationLevel
319                 }
320
321                 manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
322                 manifestSize := uint64(len(sdkCollection.ManifestText))
323
324                 if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
325                         totalManifestSize += manifestSize
326                 }
327                 if manifestSize > maxManifestSize {
328                         maxManifestSize = manifestSize
329                 }
330
331                 blockChannel := manifest.BlockIterWithDuplicates()
332                 for block := range blockChannel {
333                         if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
334                                 log.Printf(
335                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
336                                         collection.UUID,
337                                         storedSize,
338                                         block.Size,
339                                         block.Digest)
340                         }
341                         collection.BlockDigestToSize[block.Digest] = block.Size
342                 }
343                 if manifest.Err != nil {
344                         err = manifest.Err
345                         return
346                 }
347
348                 collection.TotalSize = 0
349                 for _, size := range collection.BlockDigestToSize {
350                         collection.TotalSize += size
351                 }
352                 UUIDToCollection[collection.UUID] = collection
353
354                 // Clear out all the manifest strings that we don't need anymore.
355                 // These hopefully form the bulk of our memory usage.
356                 manifest.Text = ""
357                 sdkCollection.ManifestText = ""
358         }
359
360         return
361 }
362
363 // Summarize the collections read
364 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
365         readCollections.OwnerToCollectionSize = make(map[string]int)
366         readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
367         numCollections := len(readCollections.UUIDToCollection)
368         readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
369         readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
370         readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
371
372         for _, coll := range readCollections.UUIDToCollection {
373                 collectionIndex := len(readCollections.CollectionIndexToUUID)
374                 readCollections.CollectionIndexToUUID =
375                         append(readCollections.CollectionIndexToUUID, coll.UUID)
376                 readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
377
378                 readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
379                         readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
380
381                 for block, size := range coll.BlockDigestToSize {
382                         locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
383                         readCollections.BlockToCollectionIndices[locator] =
384                                 append(readCollections.BlockToCollectionIndices[locator],
385                                         collectionIndex)
386                         storedReplication := readCollections.BlockToDesiredReplication[locator]
387                         if coll.ReplicationLevel > storedReplication {
388                                 readCollections.BlockToDesiredReplication[locator] =
389                                         coll.ReplicationLevel
390                         }
391                 }
392         }
393
394         if arvLogger != nil {
395                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
396                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
397                         // Since maps are shallow copied, we run a risk of concurrent
398                         // updates here. By copying results.OwnerToCollectionSize into
399                         // the log, we're assuming that it won't be updated.
400                         collectionInfo["owner_to_collection_size"] =
401                                 readCollections.OwnerToCollectionSize
402                         collectionInfo["distinct_blocks_named"] =
403                                 len(readCollections.BlockToDesiredReplication)
404                 })
405         }
406
407         return
408 }