Merge branch 'crunch-dispatch-docker-bin-env' of https://github.com/wtsi-hgi/arvados...
[arvados.git] / services / datamanager / collection / collection.go
1 // Deals with parsing Collection responses from API Server.
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
14         "log"
15         "os"
16         "runtime/pprof"
17         "time"
18 )
19
20 var (
21         heapProfileFilename string
22         // globals for debugging
23         totalManifestSize uint64
24         maxManifestSize   uint64
25 )
26
27 type Collection struct {
28         Uuid              string
29         OwnerUuid         string
30         ReplicationLevel  int
31         BlockDigestToSize map[blockdigest.BlockDigest]int
32         TotalSize         int
33 }
34
35 type ReadCollections struct {
36         ReadAllCollections        bool
37         UuidToCollection          map[string]Collection
38         OwnerToCollectionSize     map[string]int
39         BlockToDesiredReplication map[blockdigest.DigestWithSize]int
40         CollectionUuidToIndex     map[string]int
41         CollectionIndexToUuid     []string
42         BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
43 }
44
45 type GetCollectionsParams struct {
46         Client    arvadosclient.ArvadosClient
47         Logger    *logger.Logger
48         BatchSize int
49 }
50
51 type SdkCollectionInfo struct {
52         Uuid         string    `json:"uuid"`
53         OwnerUuid    string    `json:"owner_uuid"`
54         Redundancy   int       `json:"redundancy"`
55         ModifiedAt   time.Time `json:"modified_at"`
56         ManifestText string    `json:"manifest_text"`
57 }
58
59 type SdkCollectionList struct {
60         ItemsAvailable int                 `json:"items_available"`
61         Items          []SdkCollectionInfo `json:"items"`
62 }
63
64 func init() {
65         flag.StringVar(&heapProfileFilename,
66                 "heap-profile",
67                 "",
68                 "File to write the heap profiles to. Leave blank to skip profiling.")
69 }
70
71 // Write the heap profile to a file for later review.
72 // Since a file is expected to only contain a single heap profile this
73 // function overwrites the previously written profile, so it is safe
74 // to call multiple times in a single run.
75 // Otherwise we would see cumulative numbers as explained here:
76 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
77 func WriteHeapProfile() {
78         if heapProfileFilename != "" {
79
80                 heap_profile, err := os.Create(heapProfileFilename)
81                 if err != nil {
82                         log.Fatal(err)
83                 }
84
85                 defer heap_profile.Close()
86
87                 err = pprof.WriteHeapProfile(heap_profile)
88                 if err != nil {
89                         log.Fatal(err)
90                 }
91         }
92 }
93
94 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
95         results = GetCollections(params)
96         results.Summarize(params.Logger)
97
98         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
99         log.Printf("Read and processed %d collections",
100                 len(results.UuidToCollection))
101
102         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
103         // lots of behaviors can become warnings (and obviously we can't
104         // write anything).
105         // if !readCollections.ReadAllCollections {
106         //      log.Fatalf("Did not read all collections")
107         // }
108
109         return
110 }
111
112 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
113         if &params.Client == nil {
114                 log.Fatalf("params.Client passed to GetCollections() should " +
115                         "contain a valid ArvadosClient, but instead it is nil.")
116         }
117
118         fieldsWanted := []string{"manifest_text",
119                 "owner_uuid",
120                 "uuid",
121                 "redundancy",
122                 "modified_at"}
123
124         sdkParams := arvadosclient.Dict{
125                 "select":  fieldsWanted,
126                 "order":   []string{"modified_at ASC"},
127                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
128
129         if params.BatchSize > 0 {
130                 sdkParams["limit"] = params.BatchSize
131         }
132
133         var defaultReplicationLevel int
134         {
135                 value, err := params.Client.Discovery("defaultCollectionReplication")
136                 if err != nil {
137                         loggerutil.FatalWithMessage(params.Logger,
138                                 fmt.Sprintf("Error querying default collection replication: %v", err))
139                 }
140
141                 defaultReplicationLevel = int(value.(float64))
142                 if defaultReplicationLevel <= 0 {
143                         loggerutil.FatalWithMessage(params.Logger,
144                                 fmt.Sprintf("Default collection replication returned by arvados SDK "+
145                                         "should be a positive integer but instead it was %d.",
146                                         defaultReplicationLevel))
147                 }
148         }
149
150         initialNumberOfCollectionsAvailable, err :=
151                 util.NumberItemsAvailable(params.Client, "collections")
152         if err != nil {
153                 loggerutil.FatalWithMessage(params.Logger,
154                         fmt.Sprintf("Error querying collection count: %v", err))
155         }
156         // Include a 1% margin for collections added while we're reading so
157         // that we don't have to grow the map in most cases.
158         maxExpectedCollections := int(
159                 float64(initialNumberOfCollectionsAvailable) * 1.01)
160         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
161
162         if params.Logger != nil {
163                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
164                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
165                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
166                         collectionInfo["batch_size"] = params.BatchSize
167                         collectionInfo["default_replication_level"] = defaultReplicationLevel
168                 })
169         }
170
171         // These values are just for getting the loop to run the first time,
172         // afterwards they'll be set to real values.
173         previousTotalCollections := -1
174         totalCollections := 0
175         for totalCollections > previousTotalCollections {
176                 // We're still finding new collections
177
178                 // Write the heap profile for examining memory usage
179                 WriteHeapProfile()
180
181                 // Get next batch of collections.
182                 var collections SdkCollectionList
183                 err := params.Client.List("collections", sdkParams, &collections)
184                 if err != nil {
185                         loggerutil.FatalWithMessage(params.Logger,
186                                 fmt.Sprintf("Error querying collections: %v", err))
187                 }
188
189                 // Process collection and update our date filter.
190                 sdkParams["filters"].([][]string)[0][2] =
191                         ProcessCollections(params.Logger,
192                                 collections.Items,
193                                 defaultReplicationLevel,
194                                 results.UuidToCollection).Format(time.RFC3339)
195
196                 // update counts
197                 previousTotalCollections = totalCollections
198                 totalCollections = len(results.UuidToCollection)
199
200                 log.Printf("%d collections read, %d new in last batch, "+
201                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
202                         totalCollections,
203                         totalCollections-previousTotalCollections,
204                         sdkParams["filters"].([][]string)[0][2],
205                         float32(totalManifestSize)/float32(totalCollections),
206                         maxManifestSize, totalManifestSize)
207
208                 if params.Logger != nil {
209                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
210                                 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
211                                 collectionInfo["collections_read"] = totalCollections
212                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
213                                 collectionInfo["total_manifest_size"] = totalManifestSize
214                                 collectionInfo["max_manifest_size"] = maxManifestSize
215                         })
216                 }
217         }
218
219         // Write the heap profile for examining memory usage
220         WriteHeapProfile()
221
222         return
223 }
224
225 // StrCopy returns a newly allocated string.
226 // It is useful to copy slices so that the garbage collector can reuse
227 // the memory of the longer strings they came from.
228 func StrCopy(s string) string {
229         return string([]byte(s))
230 }
231
232 func ProcessCollections(arvLogger *logger.Logger,
233         receivedCollections []SdkCollectionInfo,
234         defaultReplicationLevel int,
235         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
236         for _, sdkCollection := range receivedCollections {
237                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
238                         OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
239                         ReplicationLevel:  sdkCollection.Redundancy,
240                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
241
242                 if sdkCollection.ModifiedAt.IsZero() {
243                         loggerutil.FatalWithMessage(arvLogger,
244                                 fmt.Sprintf(
245                                         "Arvados SDK collection returned with unexpected zero "+
246                                                 "modification date. This probably means that either we failed to "+
247                                                 "parse the modification date or the API server has changed how "+
248                                                 "it returns modification dates: %+v",
249                                         collection))
250                 }
251
252                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
253                         latestModificationDate = sdkCollection.ModifiedAt
254                 }
255
256                 if collection.ReplicationLevel == 0 {
257                         collection.ReplicationLevel = defaultReplicationLevel
258                 }
259
260                 manifest := manifest.Manifest{sdkCollection.ManifestText}
261                 manifestSize := uint64(len(sdkCollection.ManifestText))
262
263                 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
264                         totalManifestSize += manifestSize
265                 }
266                 if manifestSize > maxManifestSize {
267                         maxManifestSize = manifestSize
268                 }
269
270                 blockChannel := manifest.BlockIterWithDuplicates()
271                 for block := range blockChannel {
272                         if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
273                                 message := fmt.Sprintf(
274                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
275                                         collection.Uuid,
276                                         stored_size,
277                                         block.Size,
278                                         block.Digest)
279                                 loggerutil.FatalWithMessage(arvLogger, message)
280                         }
281                         collection.BlockDigestToSize[block.Digest] = block.Size
282                 }
283                 collection.TotalSize = 0
284                 for _, size := range collection.BlockDigestToSize {
285                         collection.TotalSize += size
286                 }
287                 uuidToCollection[collection.Uuid] = collection
288
289                 // Clear out all the manifest strings that we don't need anymore.
290                 // These hopefully form the bulk of our memory usage.
291                 manifest.Text = ""
292                 sdkCollection.ManifestText = ""
293         }
294
295         return
296 }
297
298 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
299         readCollections.OwnerToCollectionSize = make(map[string]int)
300         readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
301         numCollections := len(readCollections.UuidToCollection)
302         readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
303         readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
304         readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
305
306         for _, coll := range readCollections.UuidToCollection {
307                 collectionIndex := len(readCollections.CollectionIndexToUuid)
308                 readCollections.CollectionIndexToUuid =
309                         append(readCollections.CollectionIndexToUuid, coll.Uuid)
310                 readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
311
312                 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
313                         readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
314
315                 for block, size := range coll.BlockDigestToSize {
316                         locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
317                         readCollections.BlockToCollectionIndices[locator] =
318                                 append(readCollections.BlockToCollectionIndices[locator],
319                                         collectionIndex)
320                         storedReplication := readCollections.BlockToDesiredReplication[locator]
321                         if coll.ReplicationLevel > storedReplication {
322                                 readCollections.BlockToDesiredReplication[locator] =
323                                         coll.ReplicationLevel
324                         }
325                 }
326         }
327
328         if arvLogger != nil {
329                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
330                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
331                         // Since maps are shallow copied, we run a risk of concurrent
332                         // updates here. By copying results.OwnerToCollectionSize into
333                         // the log, we're assuming that it won't be updated.
334                         collectionInfo["owner_to_collection_size"] =
335                                 readCollections.OwnerToCollectionSize
336                         collectionInfo["distinct_blocks_named"] =
337                                 len(readCollections.BlockToDesiredReplication)
338                 })
339         }
340
341         return
342 }