7286: Use ex_customdata to put the ping URL on the node instead of
[arvados.git] / services / datamanager / collection / collection.go
1 // Deals with parsing Collection responses from API Server.
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
14         "log"
15         "os"
16         "runtime/pprof"
17         "time"
18 )
19
20 var (
21         heapProfileFilename string
22         // globals for debugging
23         totalManifestSize uint64
24         maxManifestSize   uint64
25 )
26
27 // Collection representation
28 type Collection struct {
29         UUID              string
30         OwnerUUID         string
31         ReplicationLevel  int
32         BlockDigestToSize map[blockdigest.BlockDigest]int
33         TotalSize         int
34 }
35
36 // ReadCollections holds information about collections from API server
37 type ReadCollections struct {
38         ReadAllCollections        bool
39         UUIDToCollection          map[string]Collection
40         OwnerToCollectionSize     map[string]int
41         BlockToDesiredReplication map[blockdigest.DigestWithSize]int
42         CollectionUUIDToIndex     map[string]int
43         CollectionIndexToUUID     []string
44         BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
45 }
46
47 // GetCollectionsParams params
48 type GetCollectionsParams struct {
49         Client    arvadosclient.ArvadosClient
50         Logger    *logger.Logger
51         BatchSize int
52 }
53
54 // SdkCollectionInfo holds collection info from api
55 type SdkCollectionInfo struct {
56         UUID         string    `json:"uuid"`
57         OwnerUUID    string    `json:"owner_uuid"`
58         Redundancy   int       `json:"redundancy"`
59         ModifiedAt   time.Time `json:"modified_at"`
60         ManifestText string    `json:"manifest_text"`
61 }
62
63 // SdkCollectionList lists collections from api
64 type SdkCollectionList struct {
65         ItemsAvailable int                 `json:"items_available"`
66         Items          []SdkCollectionInfo `json:"items"`
67 }
68
69 func init() {
70         flag.StringVar(&heapProfileFilename,
71                 "heap-profile",
72                 "",
73                 "File to write the heap profiles to. Leave blank to skip profiling.")
74 }
75
76 // WriteHeapProfile writes the heap profile to a file for later review.
77 // Since a file is expected to only contain a single heap profile this
78 // function overwrites the previously written profile, so it is safe
79 // to call multiple times in a single run.
80 // Otherwise we would see cumulative numbers as explained here:
81 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
82 func WriteHeapProfile() {
83         if heapProfileFilename != "" {
84
85                 heapProfile, err := os.Create(heapProfileFilename)
86                 if err != nil {
87                         log.Fatal(err)
88                 }
89
90                 defer heapProfile.Close()
91
92                 err = pprof.WriteHeapProfile(heapProfile)
93                 if err != nil {
94                         log.Fatal(err)
95                 }
96         }
97 }
98
99 // GetCollectionsAndSummarize gets collections from api and summarizes
100 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
101         results = GetCollections(params)
102         results.Summarize(params.Logger)
103
104         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
105         log.Printf("Read and processed %d collections",
106                 len(results.UUIDToCollection))
107
108         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
109         // lots of behaviors can become warnings (and obviously we can't
110         // write anything).
111         // if !readCollections.ReadAllCollections {
112         //      log.Fatalf("Did not read all collections")
113         // }
114
115         return
116 }
117
118 // GetCollections gets collections from api
119 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
120         if &params.Client == nil {
121                 log.Fatalf("params.Client passed to GetCollections() should " +
122                         "contain a valid ArvadosClient, but instead it is nil.")
123         }
124
125         fieldsWanted := []string{"manifest_text",
126                 "owner_uuid",
127                 "uuid",
128                 "redundancy",
129                 "modified_at"}
130
131         sdkParams := arvadosclient.Dict{
132                 "select":  fieldsWanted,
133                 "order":   []string{"modified_at ASC"},
134                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
135
136         if params.BatchSize > 0 {
137                 sdkParams["limit"] = params.BatchSize
138         }
139
140         var defaultReplicationLevel int
141         {
142                 value, err := params.Client.Discovery("defaultCollectionReplication")
143                 if err != nil {
144                         loggerutil.FatalWithMessage(params.Logger,
145                                 fmt.Sprintf("Error querying default collection replication: %v", err))
146                 }
147
148                 defaultReplicationLevel = int(value.(float64))
149                 if defaultReplicationLevel <= 0 {
150                         loggerutil.FatalWithMessage(params.Logger,
151                                 fmt.Sprintf("Default collection replication returned by arvados SDK "+
152                                         "should be a positive integer but instead it was %d.",
153                                         defaultReplicationLevel))
154                 }
155         }
156
157         initialNumberOfCollectionsAvailable, err :=
158                 util.NumberItemsAvailable(params.Client, "collections")
159         if err != nil {
160                 loggerutil.FatalWithMessage(params.Logger,
161                         fmt.Sprintf("Error querying collection count: %v", err))
162         }
163         // Include a 1% margin for collections added while we're reading so
164         // that we don't have to grow the map in most cases.
165         maxExpectedCollections := int(
166                 float64(initialNumberOfCollectionsAvailable) * 1.01)
167         results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
168
169         if params.Logger != nil {
170                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
171                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
172                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
173                         collectionInfo["batch_size"] = params.BatchSize
174                         collectionInfo["default_replication_level"] = defaultReplicationLevel
175                 })
176         }
177
178         // These values are just for getting the loop to run the first time,
179         // afterwards they'll be set to real values.
180         previousTotalCollections := -1
181         totalCollections := 0
182         for totalCollections > previousTotalCollections {
183                 // We're still finding new collections
184
185                 // Write the heap profile for examining memory usage
186                 WriteHeapProfile()
187
188                 // Get next batch of collections.
189                 var collections SdkCollectionList
190                 err := params.Client.List("collections", sdkParams, &collections)
191                 if err != nil {
192                         loggerutil.FatalWithMessage(params.Logger,
193                                 fmt.Sprintf("Error querying collections: %v", err))
194                 }
195
196                 // Process collection and update our date filter.
197                 sdkParams["filters"].([][]string)[0][2] =
198                         ProcessCollections(params.Logger,
199                                 collections.Items,
200                                 defaultReplicationLevel,
201                                 results.UUIDToCollection).Format(time.RFC3339)
202
203                 // update counts
204                 previousTotalCollections = totalCollections
205                 totalCollections = len(results.UUIDToCollection)
206
207                 log.Printf("%d collections read, %d new in last batch, "+
208                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
209                         totalCollections,
210                         totalCollections-previousTotalCollections,
211                         sdkParams["filters"].([][]string)[0][2],
212                         float32(totalManifestSize)/float32(totalCollections),
213                         maxManifestSize, totalManifestSize)
214
215                 if params.Logger != nil {
216                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
217                                 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
218                                 collectionInfo["collections_read"] = totalCollections
219                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
220                                 collectionInfo["total_manifest_size"] = totalManifestSize
221                                 collectionInfo["max_manifest_size"] = maxManifestSize
222                         })
223                 }
224         }
225
226         // Write the heap profile for examining memory usage
227         WriteHeapProfile()
228
229         return
230 }
231
232 // StrCopy returns a newly allocated string.
233 // It is useful to copy slices so that the garbage collector can reuse
234 // the memory of the longer strings they came from.
235 func StrCopy(s string) string {
236         return string([]byte(s))
237 }
238
239 // ProcessCollections read from api server
240 func ProcessCollections(arvLogger *logger.Logger,
241         receivedCollections []SdkCollectionInfo,
242         defaultReplicationLevel int,
243         UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
244         for _, sdkCollection := range receivedCollections {
245                 collection := Collection{UUID: StrCopy(sdkCollection.UUID),
246                         OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
247                         ReplicationLevel:  sdkCollection.Redundancy,
248                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
249
250                 if sdkCollection.ModifiedAt.IsZero() {
251                         loggerutil.FatalWithMessage(arvLogger,
252                                 fmt.Sprintf(
253                                         "Arvados SDK collection returned with unexpected zero "+
254                                                 "modification date. This probably means that either we failed to "+
255                                                 "parse the modification date or the API server has changed how "+
256                                                 "it returns modification dates: %+v",
257                                         collection))
258                 }
259
260                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
261                         latestModificationDate = sdkCollection.ModifiedAt
262                 }
263
264                 if collection.ReplicationLevel == 0 {
265                         collection.ReplicationLevel = defaultReplicationLevel
266                 }
267
268                 manifest := manifest.Manifest{sdkCollection.ManifestText}
269                 manifestSize := uint64(len(sdkCollection.ManifestText))
270
271                 if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
272                         totalManifestSize += manifestSize
273                 }
274                 if manifestSize > maxManifestSize {
275                         maxManifestSize = manifestSize
276                 }
277
278                 blockChannel := manifest.BlockIterWithDuplicates()
279                 for block := range blockChannel {
280                         if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
281                                 message := fmt.Sprintf(
282                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
283                                         collection.UUID,
284                                         storedSize,
285                                         block.Size,
286                                         block.Digest)
287                                 loggerutil.FatalWithMessage(arvLogger, message)
288                         }
289                         collection.BlockDigestToSize[block.Digest] = block.Size
290                 }
291                 collection.TotalSize = 0
292                 for _, size := range collection.BlockDigestToSize {
293                         collection.TotalSize += size
294                 }
295                 UUIDToCollection[collection.UUID] = collection
296
297                 // Clear out all the manifest strings that we don't need anymore.
298                 // These hopefully form the bulk of our memory usage.
299                 manifest.Text = ""
300                 sdkCollection.ManifestText = ""
301         }
302
303         return
304 }
305
306 // Summarize the collections read
307 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
308         readCollections.OwnerToCollectionSize = make(map[string]int)
309         readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
310         numCollections := len(readCollections.UUIDToCollection)
311         readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
312         readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
313         readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
314
315         for _, coll := range readCollections.UUIDToCollection {
316                 collectionIndex := len(readCollections.CollectionIndexToUUID)
317                 readCollections.CollectionIndexToUUID =
318                         append(readCollections.CollectionIndexToUUID, coll.UUID)
319                 readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
320
321                 readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
322                         readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
323
324                 for block, size := range coll.BlockDigestToSize {
325                         locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
326                         readCollections.BlockToCollectionIndices[locator] =
327                                 append(readCollections.BlockToCollectionIndices[locator],
328                                         collectionIndex)
329                         storedReplication := readCollections.BlockToDesiredReplication[locator]
330                         if coll.ReplicationLevel > storedReplication {
331                                 readCollections.BlockToDesiredReplication[locator] =
332                                         coll.ReplicationLevel
333                         }
334                 }
335         }
336
337         if arvLogger != nil {
338                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
339                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
340                         // Since maps are shallow copied, we run a risk of concurrent
341                         // updates here. By copying results.OwnerToCollectionSize into
342                         // the log, we're assuming that it won't be updated.
343                         collectionInfo["owner_to_collection_size"] =
344                                 readCollections.OwnerToCollectionSize
345                         collectionInfo["distinct_blocks_named"] =
346                                 len(readCollections.BlockToDesiredReplication)
347                 })
348         }
349
350         return
351 }