7490: Add Err to collection.ReadCollections and keep.ServerResponse so that the error...
[arvados.git] / services / datamanager / collection / collection.go
1 // Deals with parsing Collection responses from API Server.
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "log"
14         "os"
15         "runtime/pprof"
16         "time"
17 )
18
19 var (
20         HeapProfileFilename string
21         // globals for debugging
22         totalManifestSize uint64
23         maxManifestSize   uint64
24 )
25
26 // Collection representation
27 type Collection struct {
28         UUID              string
29         OwnerUUID         string
30         ReplicationLevel  int
31         BlockDigestToSize map[blockdigest.BlockDigest]int
32         TotalSize         int
33 }
34
35 // ReadCollections holds information about collections from API server
36 type ReadCollections struct {
37         ReadAllCollections        bool
38         UUIDToCollection          map[string]Collection
39         OwnerToCollectionSize     map[string]int
40         BlockToDesiredReplication map[blockdigest.DigestWithSize]int
41         CollectionUUIDToIndex     map[string]int
42         CollectionIndexToUUID     []string
43         BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
44         Err                       error
45 }
46
47 // GetCollectionsParams params
48 type GetCollectionsParams struct {
49         Client    arvadosclient.ArvadosClient
50         Logger    *logger.Logger
51         BatchSize int
52 }
53
54 // SdkCollectionInfo holds collection info from api
55 type SdkCollectionInfo struct {
56         UUID         string    `json:"uuid"`
57         OwnerUUID    string    `json:"owner_uuid"`
58         Redundancy   int       `json:"redundancy"`
59         ModifiedAt   time.Time `json:"modified_at"`
60         ManifestText string    `json:"manifest_text"`
61 }
62
63 // SdkCollectionList lists collections from api
64 type SdkCollectionList struct {
65         ItemsAvailable int                 `json:"items_available"`
66         Items          []SdkCollectionInfo `json:"items"`
67 }
68
69 func init() {
70         flag.StringVar(&HeapProfileFilename,
71                 "heap-profile",
72                 "",
73                 "File to write the heap profiles to. Leave blank to skip profiling.")
74 }
75
76 // WriteHeapProfile writes the heap profile to a file for later review.
77 // Since a file is expected to only contain a single heap profile this
78 // function overwrites the previously written profile, so it is safe
79 // to call multiple times in a single run.
80 // Otherwise we would see cumulative numbers as explained here:
81 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
82 func WriteHeapProfile() error {
83         if HeapProfileFilename != "" {
84                 heapProfile, err := os.Create(HeapProfileFilename)
85                 if err != nil {
86                         return err
87                 }
88
89                 defer heapProfile.Close()
90
91                 err = pprof.WriteHeapProfile(heapProfile)
92                 return err
93         }
94
95         return nil
96 }
97
98 // GetCollectionsAndSummarize gets collections from api and summarizes
99 func GetCollectionsAndSummarize(arvLogger *logger.Logger, params GetCollectionsParams) (results ReadCollections) {
100         results, err := GetCollections(params)
101         if err != nil {
102                 results.Err = err
103                 return
104         }
105
106         results.Summarize(params.Logger)
107
108         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
109         log.Printf("Read and processed %d collections",
110                 len(results.UUIDToCollection))
111
112         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
113         // lots of behaviors can become warnings (and obviously we can't
114         // write anything).
115         // if !readCollections.ReadAllCollections {
116         //      log.Fatalf("Did not read all collections")
117         // }
118
119         return
120 }
121
122 // GetCollections gets collections from api
123 func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
124         if &params.Client == nil {
125                 err = fmt.Errorf("params.Client passed to GetCollections() should " +
126                         "contain a valid ArvadosClient, but instead it is nil.")
127                 return
128         }
129
130         fieldsWanted := []string{"manifest_text",
131                 "owner_uuid",
132                 "uuid",
133                 "redundancy",
134                 "modified_at"}
135
136         sdkParams := arvadosclient.Dict{
137                 "select":  fieldsWanted,
138                 "order":   []string{"modified_at ASC"},
139                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
140
141         if params.BatchSize > 0 {
142                 sdkParams["limit"] = params.BatchSize
143         }
144
145         var defaultReplicationLevel int
146         {
147                 var value interface{}
148                 value, err = params.Client.Discovery("defaultCollectionReplication")
149                 if err != nil {
150                         return
151                 }
152
153                 defaultReplicationLevel = int(value.(float64))
154                 if defaultReplicationLevel <= 0 {
155                         err = fmt.Errorf("Default collection replication returned by arvados SDK "+
156                                 "should be a positive integer but instead it was %d.",
157                                 defaultReplicationLevel)
158                         return
159                 }
160         }
161
162         initialNumberOfCollectionsAvailable, err :=
163                 util.NumberItemsAvailable(params.Client, "collections")
164         if err != nil {
165                 return
166         }
167         // Include a 1% margin for collections added while we're reading so
168         // that we don't have to grow the map in most cases.
169         maxExpectedCollections := int(
170                 float64(initialNumberOfCollectionsAvailable) * 1.01)
171         results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
172
173         if params.Logger != nil {
174                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
175                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
176                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
177                         collectionInfo["batch_size"] = params.BatchSize
178                         collectionInfo["default_replication_level"] = defaultReplicationLevel
179                 })
180         }
181
182         // These values are just for getting the loop to run the first time,
183         // afterwards they'll be set to real values.
184         previousTotalCollections := -1
185         totalCollections := 0
186         for totalCollections > previousTotalCollections {
187                 // We're still finding new collections
188
189                 // Write the heap profile for examining memory usage
190                 err = WriteHeapProfile()
191                 if err != nil {
192                         return
193                 }
194
195                 // Get next batch of collections.
196                 var collections SdkCollectionList
197                 err = params.Client.List("collections", sdkParams, &collections)
198                 if err != nil {
199                         return
200                 }
201
202                 // Process collection and update our date filter.
203                 var latestModificationDate time.Time
204                 latestModificationDate, err = ProcessCollections(params.Logger,
205                         collections.Items,
206                         defaultReplicationLevel,
207                         results.UUIDToCollection)
208                 if err != nil {
209                         return
210                 }
211                 sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
212
213                 // update counts
214                 previousTotalCollections = totalCollections
215                 totalCollections = len(results.UUIDToCollection)
216
217                 log.Printf("%d collections read, %d new in last batch, "+
218                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
219                         totalCollections,
220                         totalCollections-previousTotalCollections,
221                         sdkParams["filters"].([][]string)[0][2],
222                         float32(totalManifestSize)/float32(totalCollections),
223                         maxManifestSize, totalManifestSize)
224
225                 if params.Logger != nil {
226                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
227                                 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
228                                 collectionInfo["collections_read"] = totalCollections
229                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
230                                 collectionInfo["total_manifest_size"] = totalManifestSize
231                                 collectionInfo["max_manifest_size"] = maxManifestSize
232                         })
233                 }
234         }
235
236         // Write the heap profile for examining memory usage
237         err = WriteHeapProfile()
238
239         return
240 }
241
242 // StrCopy returns a newly allocated string.
243 // It is useful to copy slices so that the garbage collector can reuse
244 // the memory of the longer strings they came from.
245 func StrCopy(s string) string {
246         return string([]byte(s))
247 }
248
249 // ProcessCollections read from api server
250 func ProcessCollections(arvLogger *logger.Logger,
251         receivedCollections []SdkCollectionInfo,
252         defaultReplicationLevel int,
253         UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
254         for _, sdkCollection := range receivedCollections {
255                 collection := Collection{UUID: StrCopy(sdkCollection.UUID),
256                         OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
257                         ReplicationLevel:  sdkCollection.Redundancy,
258                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
259
260                 if sdkCollection.ModifiedAt.IsZero() {
261                         return latestModificationDate, fmt.Errorf(
262                                 "Arvados SDK collection returned with unexpected zero "+
263                                         "modification date. This probably means that either we failed to "+
264                                         "parse the modification date or the API server has changed how "+
265                                         "it returns modification dates: %+v",
266                                 collection)
267                 }
268
269                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
270                         latestModificationDate = sdkCollection.ModifiedAt
271                 }
272
273                 if collection.ReplicationLevel == 0 {
274                         collection.ReplicationLevel = defaultReplicationLevel
275                 }
276
277                 manifest := manifest.Manifest{sdkCollection.ManifestText}
278                 manifestSize := uint64(len(sdkCollection.ManifestText))
279
280                 if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
281                         totalManifestSize += manifestSize
282                 }
283                 if manifestSize > maxManifestSize {
284                         maxManifestSize = manifestSize
285                 }
286
287                 blockChannel := manifest.BlockIterWithDuplicates()
288                 for block := range blockChannel {
289                         if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
290                                 err = fmt.Errorf(
291                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
292                                         collection.UUID,
293                                         storedSize,
294                                         block.Size,
295                                         block.Digest)
296                                 return
297                         }
298                         collection.BlockDigestToSize[block.Digest] = block.Size
299                 }
300                 collection.TotalSize = 0
301                 for _, size := range collection.BlockDigestToSize {
302                         collection.TotalSize += size
303                 }
304                 UUIDToCollection[collection.UUID] = collection
305
306                 // Clear out all the manifest strings that we don't need anymore.
307                 // These hopefully form the bulk of our memory usage.
308                 manifest.Text = ""
309                 sdkCollection.ManifestText = ""
310         }
311
312         return
313 }
314
315 // Summarize the collections read
316 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
317         readCollections.OwnerToCollectionSize = make(map[string]int)
318         readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
319         numCollections := len(readCollections.UUIDToCollection)
320         readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
321         readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
322         readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
323
324         for _, coll := range readCollections.UUIDToCollection {
325                 collectionIndex := len(readCollections.CollectionIndexToUUID)
326                 readCollections.CollectionIndexToUUID =
327                         append(readCollections.CollectionIndexToUUID, coll.UUID)
328                 readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
329
330                 readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
331                         readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
332
333                 for block, size := range coll.BlockDigestToSize {
334                         locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
335                         readCollections.BlockToCollectionIndices[locator] =
336                                 append(readCollections.BlockToCollectionIndices[locator],
337                                         collectionIndex)
338                         storedReplication := readCollections.BlockToDesiredReplication[locator]
339                         if coll.ReplicationLevel > storedReplication {
340                                 readCollections.BlockToDesiredReplication[locator] =
341                                         coll.ReplicationLevel
342                         }
343                 }
344         }
345
346         if arvLogger != nil {
347                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
348                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
349                         // Since maps are shallow copied, we run a risk of concurrent
350                         // updates here. By copying results.OwnerToCollectionSize into
351                         // the log, we're assuming that it won't be updated.
352                         collectionInfo["owner_to_collection_size"] =
353                                 readCollections.OwnerToCollectionSize
354                         collectionInfo["distinct_blocks_named"] =
355                                 len(readCollections.BlockToDesiredReplication)
356                 })
357         }
358
359         return
360 }