Merge branch 'master' into 7490-datamanager-dont-die-return-error
[arvados.git] / services / datamanager / collection / collection.go
1 // Deals with parsing Collection responses from API Server.
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
14         "log"
15         "os"
16         "runtime/pprof"
17         "time"
18 )
19
20 var (
21         heapProfileFilename string
22         // globals for debugging
23         totalManifestSize uint64
24         maxManifestSize   uint64
25 )
26
27 // Collection representation
28 type Collection struct {
29         UUID              string
30         OwnerUUID         string
31         ReplicationLevel  int
32         BlockDigestToSize map[blockdigest.BlockDigest]int
33         TotalSize         int
34 }
35
36 // ReadCollections holds information about collections from API server
37 type ReadCollections struct {
38         ReadAllCollections        bool
39         UUIDToCollection          map[string]Collection
40         OwnerToCollectionSize     map[string]int
41         BlockToDesiredReplication map[blockdigest.DigestWithSize]int
42         CollectionUUIDToIndex     map[string]int
43         CollectionIndexToUUID     []string
44         BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
45 }
46
47 // GetCollectionsParams params
48 type GetCollectionsParams struct {
49         Client    arvadosclient.ArvadosClient
50         Logger    *logger.Logger
51         BatchSize int
52 }
53
54 // SdkCollectionInfo holds collection info from api
55 type SdkCollectionInfo struct {
56         UUID         string    `json:"uuid"`
57         OwnerUUID    string    `json:"owner_uuid"`
58         Redundancy   int       `json:"redundancy"`
59         ModifiedAt   time.Time `json:"modified_at"`
60         ManifestText string    `json:"manifest_text"`
61 }
62
63 // SdkCollectionList lists collections from api
64 type SdkCollectionList struct {
65         ItemsAvailable int                 `json:"items_available"`
66         Items          []SdkCollectionInfo `json:"items"`
67 }
68
69 func init() {
70         flag.StringVar(&heapProfileFilename,
71                 "heap-profile",
72                 "",
73                 "File to write the heap profiles to. Leave blank to skip profiling.")
74 }
75
76 // WriteHeapProfile writes the heap profile to a file for later review.
77 // Since a file is expected to only contain a single heap profile this
78 // function overwrites the previously written profile, so it is safe
79 // to call multiple times in a single run.
80 // Otherwise we would see cumulative numbers as explained here:
81 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
82 func WriteHeapProfile() error {
83         if heapProfileFilename != "" {
84                 heapProfile, err := os.Create(heapProfileFilename)
85                 if err != nil {
86                         return err
87                 }
88
89                 defer heapProfile.Close()
90
91                 err = pprof.WriteHeapProfile(heapProfile)
92                 return err
93         }
94
95         return nil
96 }
97
98 // GetCollectionsAndSummarize gets collections from api and summarizes
99 func GetCollectionsAndSummarize(arvLogger *logger.Logger, params GetCollectionsParams) (results ReadCollections) {
100         results, err := GetCollections(params)
101         if err != nil {
102                 loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetCollections with params %v: %v", params, err))
103         }
104
105         results.Summarize(params.Logger)
106
107         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
108         log.Printf("Read and processed %d collections",
109                 len(results.UUIDToCollection))
110
111         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
112         // lots of behaviors can become warnings (and obviously we can't
113         // write anything).
114         // if !readCollections.ReadAllCollections {
115         //      log.Fatalf("Did not read all collections")
116         // }
117
118         return
119 }
120
121 // GetCollections gets collections from api
122 func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
123         if &params.Client == nil {
124                 err = fmt.Errorf("params.Client passed to GetCollections() should " +
125                         "contain a valid ArvadosClient, but instead it is nil.")
126         }
127
128         fieldsWanted := []string{"manifest_text",
129                 "owner_uuid",
130                 "uuid",
131                 "redundancy",
132                 "modified_at"}
133
134         sdkParams := arvadosclient.Dict{
135                 "select":  fieldsWanted,
136                 "order":   []string{"modified_at ASC"},
137                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
138
139         if params.BatchSize > 0 {
140                 sdkParams["limit"] = params.BatchSize
141         }
142
143         var defaultReplicationLevel int
144         {
145                 var value interface{}
146                 value, err = params.Client.Discovery("defaultCollectionReplication")
147                 if err != nil {
148                         return
149                 }
150
151                 defaultReplicationLevel = int(value.(float64))
152                 if defaultReplicationLevel <= 0 {
153                         err = fmt.Errorf("Default collection replication returned by arvados SDK "+
154                                 "should be a positive integer but instead it was %d.",
155                                 defaultReplicationLevel)
156                         return
157                 }
158         }
159
160         initialNumberOfCollectionsAvailable, err :=
161                 util.NumberItemsAvailable(params.Client, "collections")
162         if err != nil {
163                 return
164         }
165         // Include a 1% margin for collections added while we're reading so
166         // that we don't have to grow the map in most cases.
167         maxExpectedCollections := int(
168                 float64(initialNumberOfCollectionsAvailable) * 1.01)
169         results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
170
171         if params.Logger != nil {
172                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
173                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
174                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
175                         collectionInfo["batch_size"] = params.BatchSize
176                         collectionInfo["default_replication_level"] = defaultReplicationLevel
177                 })
178         }
179
180         // These values are just for getting the loop to run the first time,
181         // afterwards they'll be set to real values.
182         previousTotalCollections := -1
183         totalCollections := 0
184         for totalCollections > previousTotalCollections {
185                 // We're still finding new collections
186
187                 // Write the heap profile for examining memory usage
188                 err = WriteHeapProfile()
189                 if err != nil {
190                         return
191                 }
192
193                 // Get next batch of collections.
194                 var collections SdkCollectionList
195                 err = params.Client.List("collections", sdkParams, &collections)
196                 if err != nil {
197                         return
198                 }
199
200                 // Process collection and update our date filter.
201                 var latestModificationDate time.Time
202                 latestModificationDate, err = ProcessCollections(params.Logger,
203                         collections.Items,
204                         defaultReplicationLevel,
205                         results.UUIDToCollection)
206                 if err != nil {
207                         return
208                 }
209                 sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
210
211                 // update counts
212                 previousTotalCollections = totalCollections
213                 totalCollections = len(results.UUIDToCollection)
214
215                 log.Printf("%d collections read, %d new in last batch, "+
216                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
217                         totalCollections,
218                         totalCollections-previousTotalCollections,
219                         sdkParams["filters"].([][]string)[0][2],
220                         float32(totalManifestSize)/float32(totalCollections),
221                         maxManifestSize, totalManifestSize)
222
223                 if params.Logger != nil {
224                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
225                                 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
226                                 collectionInfo["collections_read"] = totalCollections
227                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
228                                 collectionInfo["total_manifest_size"] = totalManifestSize
229                                 collectionInfo["max_manifest_size"] = maxManifestSize
230                         })
231                 }
232         }
233
234         // Write the heap profile for examining memory usage
235         WriteHeapProfile()
236
237         return
238 }
239
240 // StrCopy returns a newly allocated string.
241 // It is useful to copy slices so that the garbage collector can reuse
242 // the memory of the longer strings they came from.
243 func StrCopy(s string) string {
244         return string([]byte(s))
245 }
246
247 // ProcessCollections read from api server
248 func ProcessCollections(arvLogger *logger.Logger,
249         receivedCollections []SdkCollectionInfo,
250         defaultReplicationLevel int,
251         UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
252         for _, sdkCollection := range receivedCollections {
253                 collection := Collection{UUID: StrCopy(sdkCollection.UUID),
254                         OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
255                         ReplicationLevel:  sdkCollection.Redundancy,
256                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
257
258                 if sdkCollection.ModifiedAt.IsZero() {
259                         return latestModificationDate, fmt.Errorf(
260                                 "Arvados SDK collection returned with unexpected zero "+
261                                         "modification date. This probably means that either we failed to "+
262                                         "parse the modification date or the API server has changed how "+
263                                         "it returns modification dates: %+v",
264                                 collection)
265                 }
266
267                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
268                         latestModificationDate = sdkCollection.ModifiedAt
269                 }
270
271                 if collection.ReplicationLevel == 0 {
272                         collection.ReplicationLevel = defaultReplicationLevel
273                 }
274
275                 manifest := manifest.Manifest{sdkCollection.ManifestText}
276                 manifestSize := uint64(len(sdkCollection.ManifestText))
277
278                 if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
279                         totalManifestSize += manifestSize
280                 }
281                 if manifestSize > maxManifestSize {
282                         maxManifestSize = manifestSize
283                 }
284
285                 blockChannel := manifest.BlockIterWithDuplicates()
286                 for block := range blockChannel {
287                         if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
288                                 err = fmt.Errorf(
289                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
290                                         collection.UUID,
291                                         storedSize,
292                                         block.Size,
293                                         block.Digest)
294                                 return
295                         }
296                         collection.BlockDigestToSize[block.Digest] = block.Size
297                 }
298                 collection.TotalSize = 0
299                 for _, size := range collection.BlockDigestToSize {
300                         collection.TotalSize += size
301                 }
302                 UUIDToCollection[collection.UUID] = collection
303
304                 // Clear out all the manifest strings that we don't need anymore.
305                 // These hopefully form the bulk of our memory usage.
306                 manifest.Text = ""
307                 sdkCollection.ManifestText = ""
308         }
309
310         return
311 }
312
313 // Summarize the collections read
314 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
315         readCollections.OwnerToCollectionSize = make(map[string]int)
316         readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
317         numCollections := len(readCollections.UUIDToCollection)
318         readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
319         readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
320         readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
321
322         for _, coll := range readCollections.UUIDToCollection {
323                 collectionIndex := len(readCollections.CollectionIndexToUUID)
324                 readCollections.CollectionIndexToUUID =
325                         append(readCollections.CollectionIndexToUUID, coll.UUID)
326                 readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
327
328                 readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
329                         readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
330
331                 for block, size := range coll.BlockDigestToSize {
332                         locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
333                         readCollections.BlockToCollectionIndices[locator] =
334                                 append(readCollections.BlockToCollectionIndices[locator],
335                                         collectionIndex)
336                         storedReplication := readCollections.BlockToDesiredReplication[locator]
337                         if coll.ReplicationLevel > storedReplication {
338                                 readCollections.BlockToDesiredReplication[locator] =
339                                         coll.ReplicationLevel
340                         }
341                 }
342         }
343
344         if arvLogger != nil {
345                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
346                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
347                         // Since maps are shallow copied, we run a risk of concurrent
348                         // updates here. By copying results.OwnerToCollectionSize into
349                         // the log, we're assuming that it won't be updated.
350                         collectionInfo["owner_to_collection_size"] =
351                                 readCollections.OwnerToCollectionSize
352                         collectionInfo["distinct_blocks_named"] =
353                                 len(readCollections.BlockToDesiredReplication)
354                 })
355         }
356
357         return
358 }