Removed a comment that was accidentally copied in.
[arvados.git] / services / datamanager / collection / collection.go
1 // Deals with parsing Collection responses from API Server.
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
14         "log"
15         "os"
16         "runtime/pprof"
17         "time"
18 )
19
20 var (
21         heapProfileFilename string
22         // globals for debugging
23         totalManifestSize uint64
24         maxManifestSize   uint64
25 )
26
27 const (
28         DefaultReplicationLevel = 2
29 )
30
31 type Collection struct {
32         Uuid              string
33         OwnerUuid         string
34         ReplicationLevel  int
35         BlockDigestToSize map[blockdigest.BlockDigest]int
36         TotalSize         int
37 }
38
39 type ReadCollections struct {
40         ReadAllCollections       bool
41         UuidToCollection         map[string]Collection
42         OwnerToCollectionSize    map[string]int
43         BlockToReplication       map[blockdigest.BlockDigest]int
44         CollectionUuidToIndex    map[string]int
45         CollectionIndexToUuid    []string
46         BlockToCollectionIndices map[blockdigest.BlockDigest][]int
47 }
48
49 type GetCollectionsParams struct {
50         Client    arvadosclient.ArvadosClient
51         Logger    *logger.Logger
52         BatchSize int
53 }
54
55 type SdkCollectionInfo struct {
56         Uuid         string    `json:"uuid"`
57         OwnerUuid    string    `json:"owner_uuid"`
58         Redundancy   int       `json:"redundancy"`
59         ModifiedAt   time.Time `json:"modified_at"`
60         ManifestText string    `json:"manifest_text"`
61 }
62
63 type SdkCollectionList struct {
64         ItemsAvailable int                 `json:"items_available"`
65         Items          []SdkCollectionInfo `json:"items"`
66 }
67
68 func init() {
69         flag.StringVar(&heapProfileFilename,
70                 "heap-profile",
71                 "",
72                 "File to write the heap profiles to. Leave blank to skip profiling.")
73 }
74
75 // Write the heap profile to a file for later review.
76 // Since a file is expected to only contain a single heap profile this
77 // function overwrites the previously written profile, so it is safe
78 // to call multiple times in a single run.
79 // Otherwise we would see cumulative numbers as explained here:
80 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
81 func WriteHeapProfile() {
82         if heapProfileFilename != "" {
83
84                 heap_profile, err := os.Create(heapProfileFilename)
85                 if err != nil {
86                         log.Fatal(err)
87                 }
88
89                 defer heap_profile.Close()
90
91                 err = pprof.WriteHeapProfile(heap_profile)
92                 if err != nil {
93                         log.Fatal(err)
94                 }
95         }
96 }
97
98 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
99         results = GetCollections(params)
100         results.Summarize(params.Logger)
101
102         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
103         log.Printf("Read and processed %d collections",
104                 len(results.UuidToCollection))
105
106         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
107         // lots of behaviors can become warnings (and obviously we can't
108         // write anything).
109         // if !readCollections.ReadAllCollections {
110         //      log.Fatalf("Did not read all collections")
111         // }
112
113         return
114 }
115
116 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
117         if &params.Client == nil {
118                 log.Fatalf("params.Client passed to GetCollections() should " +
119                         "contain a valid ArvadosClient, but instead it is nil.")
120         }
121
122         fieldsWanted := []string{"manifest_text",
123                 "owner_uuid",
124                 "uuid",
125                 "redundancy",
126                 "modified_at"}
127
128         sdkParams := arvadosclient.Dict{
129                 "select":  fieldsWanted,
130                 "order":   []string{"modified_at ASC"},
131                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
132
133         if params.BatchSize > 0 {
134                 sdkParams["limit"] = params.BatchSize
135         }
136
137         initialNumberOfCollectionsAvailable, err :=
138                 util.NumberItemsAvailable(params.Client, "collections")
139         if err != nil {
140                 loggerutil.FatalWithMessage(params.Logger,
141                         fmt.Sprintf("Error querying collection count: %v", err))
142         }
143         // Include a 1% margin for collections added while we're reading so
144         // that we don't have to grow the map in most cases.
145         maxExpectedCollections := int(
146                 float64(initialNumberOfCollectionsAvailable) * 1.01)
147         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
148
149         if params.Logger != nil {
150                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
151                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
152                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
153                         collectionInfo["batch_size"] = params.BatchSize
154                 })
155         }
156
157         // These values are just for getting the loop to run the first time,
158         // afterwards they'll be set to real values.
159         previousTotalCollections := -1
160         totalCollections := 0
161         for totalCollections > previousTotalCollections {
162                 // We're still finding new collections
163
164                 // Write the heap profile for examining memory usage
165                 WriteHeapProfile()
166
167                 // Get next batch of collections.
168                 var collections SdkCollectionList
169                 err := params.Client.List("collections", sdkParams, &collections)
170                 if err != nil {
171                         loggerutil.FatalWithMessage(params.Logger,
172                                 fmt.Sprintf("Error querying collections: %v", err))
173                 }
174
175                 // Process collection and update our date filter.
176                 sdkParams["filters"].([][]string)[0][2] =
177                         ProcessCollections(params.Logger,
178                                 collections.Items,
179                                 results.UuidToCollection).Format(time.RFC3339)
180
181                 // update counts
182                 previousTotalCollections = totalCollections
183                 totalCollections = len(results.UuidToCollection)
184
185                 log.Printf("%d collections read, %d new in last batch, "+
186                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
187                         totalCollections,
188                         totalCollections-previousTotalCollections,
189                         sdkParams["filters"].([][]string)[0][2],
190                         float32(totalManifestSize)/float32(totalCollections),
191                         maxManifestSize, totalManifestSize)
192
193                 if params.Logger != nil {
194                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
195                                 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
196                                 collectionInfo["collections_read"] = totalCollections
197                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
198                                 collectionInfo["total_manifest_size"] = totalManifestSize
199                                 collectionInfo["max_manifest_size"] = maxManifestSize
200                         })
201                 }
202         }
203
204         // Write the heap profile for examining memory usage
205         WriteHeapProfile()
206
207         return
208 }
209
210 // StrCopy returns a newly allocated string.
211 // It is useful to copy slices so that the garbage collector can reuse
212 // the memory of the longer strings they came from.
213 func StrCopy(s string) string {
214         return string([]byte(s))
215 }
216
217 func ProcessCollections(arvLogger *logger.Logger,
218         receivedCollections []SdkCollectionInfo,
219         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
220         for _, sdkCollection := range receivedCollections {
221                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
222                         OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
223                         ReplicationLevel:  sdkCollection.Redundancy,
224                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
225
226                 if sdkCollection.ModifiedAt.IsZero() {
227                         loggerutil.FatalWithMessage(arvLogger,
228                                 fmt.Sprintf(
229                                         "Arvados SDK collection returned with unexpected zero "+
230                                                 "modification date. This probably means that either we failed to "+
231                                                 "parse the modification date or the API server has changed how "+
232                                                 "it returns modification dates: %+v",
233                                         collection))
234                 }
235
236                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
237                         latestModificationDate = sdkCollection.ModifiedAt
238                 }
239
240                 if collection.ReplicationLevel == 0 {
241                         collection.ReplicationLevel = DefaultReplicationLevel
242                 }
243
244                 manifest := manifest.Manifest{sdkCollection.ManifestText}
245                 manifestSize := uint64(len(sdkCollection.ManifestText))
246
247                 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
248                         totalManifestSize += manifestSize
249                 }
250                 if manifestSize > maxManifestSize {
251                         maxManifestSize = manifestSize
252                 }
253
254                 blockChannel := manifest.BlockIterWithDuplicates()
255                 for block := range blockChannel {
256                         if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
257                                 message := fmt.Sprintf(
258                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
259                                         collection.Uuid,
260                                         stored_size,
261                                         block.Size,
262                                         block.Digest)
263                                 loggerutil.FatalWithMessage(arvLogger, message)
264                         }
265                         collection.BlockDigestToSize[block.Digest] = block.Size
266                 }
267                 collection.TotalSize = 0
268                 for _, size := range collection.BlockDigestToSize {
269                         collection.TotalSize += size
270                 }
271                 uuidToCollection[collection.Uuid] = collection
272
273                 // Clear out all the manifest strings that we don't need anymore.
274                 // These hopefully form the bulk of our memory usage.
275                 manifest.Text = ""
276                 sdkCollection.ManifestText = ""
277         }
278
279         return
280 }
281
282 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
283         readCollections.OwnerToCollectionSize = make(map[string]int)
284         readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
285         numCollections := len(readCollections.UuidToCollection)
286         readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
287         readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
288         readCollections.BlockToCollectionIndices = make(map[blockdigest.BlockDigest][]int)
289
290         for _, coll := range readCollections.UuidToCollection {
291                 collectionIndex := len(readCollections.CollectionIndexToUuid)
292                 readCollections.CollectionIndexToUuid =
293                         append(readCollections.CollectionIndexToUuid, coll.Uuid)
294                 readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
295
296                 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
297                         readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
298
299                 for block, _ := range coll.BlockDigestToSize {
300                         readCollections.BlockToCollectionIndices[block] =
301                                 append(readCollections.BlockToCollectionIndices[block], collectionIndex)
302                         storedReplication := readCollections.BlockToReplication[block]
303                         if coll.ReplicationLevel > storedReplication {
304                                 readCollections.BlockToReplication[block] = coll.ReplicationLevel
305                         }
306                 }
307         }
308
309         if arvLogger != nil {
310                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
311                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
312                         // Since maps are shallow copied, we run a risk of concurrent
313                         // updates here. By copying results.OwnerToCollectionSize into
314                         // the log, we're assuming that it won't be updated.
315                         collectionInfo["owner_to_collection_size"] =
316                                 readCollections.OwnerToCollectionSize
317                         collectionInfo["distinct_blocks_named"] =
318                                 len(readCollections.BlockToReplication)
319                 })
320         }
321
322         return
323 }