gofmt cleanup.
[arvados.git] / services / datamanager / collection / collection.go
1 // Deals with parsing Collection responses from API Server.
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
14         "log"
15         "os"
16         "runtime"
17         "runtime/pprof"
18         "time"
19 )
20
21 var (
22         heapProfileFilename string
23         // globals for debugging
24         totalManifestSize uint64
25         maxManifestSize   uint64
26 )
27
28 const (
29         DefaultReplicationLevel = 2
30 )
31
32 type Collection struct {
33         Uuid              string
34         OwnerUuid         string
35         ReplicationLevel  int
36         BlockDigestToSize map[blockdigest.BlockDigest]int
37         TotalSize         int
38 }
39
40 type ReadCollections struct {
41         ReadAllCollections       bool
42         UuidToCollection         map[string]Collection
43         OwnerToCollectionSize    map[string]int
44         BlockToReplication       map[blockdigest.BlockDigest]int
45         CollectionUuidToIndex    map[string]int
46         CollectionIndexToUuid    []string
47         BlockToCollectionIndices map[blockdigest.BlockDigest][]int
48 }
49
50 type GetCollectionsParams struct {
51         Client    arvadosclient.ArvadosClient
52         Logger    *logger.Logger
53         BatchSize int
54 }
55
56 type SdkCollectionInfo struct {
57         Uuid         string    `json:"uuid"`
58         OwnerUuid    string    `json:"owner_uuid"`
59         Redundancy   int       `json:"redundancy"`
60         ModifiedAt   time.Time `json:"modified_at"`
61         ManifestText string    `json:"manifest_text"`
62 }
63
64 type SdkCollectionList struct {
65         ItemsAvailable int                 `json:"items_available"`
66         Items          []SdkCollectionInfo `json:"items"`
67 }
68
69 func init() {
70         flag.StringVar(&heapProfileFilename,
71                 "heap-profile",
72                 "",
73                 "File to write the heap profiles to. Leave blank to skip profiling.")
74 }
75
76 // Write the heap profile to a file for later review.
77 // Since a file is expected to only contain a single heap profile this
78 // function overwrites the previously written profile, so it is safe
79 // to call multiple times in a single run.
80 // Otherwise we would see cumulative numbers as explained here:
81 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
82 func WriteHeapProfile() {
83         if heapProfileFilename != "" {
84
85                 heap_profile, err := os.Create(heapProfileFilename)
86                 if err != nil {
87                         log.Fatal(err)
88                 }
89
90                 defer heap_profile.Close()
91
92                 err = pprof.WriteHeapProfile(heap_profile)
93                 if err != nil {
94                         log.Fatal(err)
95                 }
96         }
97 }
98
99 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
100         results = GetCollections(params)
101         results.Summarize()
102
103         if params.Logger != nil {
104                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
105                         collectionInfo := p["collection_info"].(map[string]interface{})
106                         // Since maps are shallow copied, we run a risk of concurrent
107                         // updates here. By copying results.OwnerToCollectionSize into
108                         // the log, we're assuming that it won't be updated.
109                         collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
110                 })
111         }
112
113         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
114         log.Printf("Read and processed %d collections",
115                 len(results.UuidToCollection))
116
117         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
118         // lots of behaviors can become warnings (and obviously we can't
119         // write anything).
120         // if !readCollections.ReadAllCollections {
121         //      log.Fatalf("Did not read all collections")
122         // }
123
124         return
125 }
126
127 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
128         if &params.Client == nil {
129                 log.Fatalf("params.Client passed to GetCollections() should " +
130                         "contain a valid ArvadosClient, but instead it is nil.")
131         }
132
133         fieldsWanted := []string{"manifest_text",
134                 "owner_uuid",
135                 "uuid",
136                 // TODO(misha): Start using the redundancy field.
137                 "redundancy",
138                 "modified_at"}
139
140         sdkParams := arvadosclient.Dict{
141                 "select":  fieldsWanted,
142                 "order":   []string{"modified_at ASC"},
143                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
144
145         if params.BatchSize > 0 {
146                 sdkParams["limit"] = params.BatchSize
147         }
148
149         initialNumberOfCollectionsAvailable, err :=
150                 util.NumberItemsAvailable(params.Client, "collections")
151         if err != nil {
152                 loggerutil.FatalWithMessage(params.Logger,
153                         fmt.Sprintf("Error querying collection count: %v", err))
154         }
155         // Include a 1% margin for collections added while we're reading so
156         // that we don't have to grow the map in most cases.
157         maxExpectedCollections := int(
158                 float64(initialNumberOfCollectionsAvailable) * 1.01)
159         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
160
161         if params.Logger != nil {
162                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
163                         collectionInfo := make(map[string]interface{})
164                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
165                         collectionInfo["batch_size"] = params.BatchSize
166                         p["collection_info"] = collectionInfo
167                 })
168         }
169
170         // These values are just for getting the loop to run the first time,
171         // afterwards they'll be set to real values.
172         previousTotalCollections := -1
173         totalCollections := 0
174         for totalCollections > previousTotalCollections {
175                 // We're still finding new collections
176
177                 // Write the heap profile for examining memory usage
178                 WriteHeapProfile()
179
180                 // Get next batch of collections.
181                 var collections SdkCollectionList
182                 err := params.Client.List("collections", sdkParams, &collections)
183                 if err != nil {
184                         loggerutil.FatalWithMessage(params.Logger,
185                                 fmt.Sprintf("Error querying collections: %v", err))
186                 }
187
188                 // Process collection and update our date filter.
189                 sdkParams["filters"].([][]string)[0][2] =
190                         ProcessCollections(params.Logger,
191                                 collections.Items,
192                                 results.UuidToCollection).Format(time.RFC3339)
193
194                 // update counts
195                 previousTotalCollections = totalCollections
196                 totalCollections = len(results.UuidToCollection)
197
198                 log.Printf("%d collections read, %d new in last batch, "+
199                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
200                         totalCollections,
201                         totalCollections-previousTotalCollections,
202                         sdkParams["filters"].([][]string)[0][2],
203                         float32(totalManifestSize)/float32(totalCollections),
204                         maxManifestSize, totalManifestSize)
205
206                 if params.Logger != nil {
207                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
208                                 collectionInfo := p["collection_info"].(map[string]interface{})
209                                 collectionInfo["collections_read"] = totalCollections
210                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
211                                 collectionInfo["total_manifest_size"] = totalManifestSize
212                                 collectionInfo["max_manifest_size"] = maxManifestSize
213                         })
214                 }
215         }
216
217         // Just in case this lowers the numbers reported in the heap profile.
218         runtime.GC()
219
220         // Write the heap profile for examining memory usage
221         WriteHeapProfile()
222
223         return
224 }
225
226 // StrCopy returns a newly allocated string.
227 // It is useful to copy slices so that the garbage collector can reuse
228 // the memory of the longer strings they came from.
229 func StrCopy(s string) string {
230         return string([]byte(s))
231 }
232
233 func ProcessCollections(arvLogger *logger.Logger,
234         receivedCollections []SdkCollectionInfo,
235         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
236         for _, sdkCollection := range receivedCollections {
237                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
238                         OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
239                         ReplicationLevel:  sdkCollection.Redundancy,
240                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
241
242                 if sdkCollection.ModifiedAt.IsZero() {
243                         loggerutil.FatalWithMessage(arvLogger,
244                                 fmt.Sprintf(
245                                         "Arvados SDK collection returned with unexpected zero "+
246                                                 "modifcation date. This probably means that either we failed to "+
247                                                 "parse the modification date or the API server has changed how "+
248                                                 "it returns modification dates: %+v",
249                                         collection))
250                 }
251
252                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
253                         latestModificationDate = sdkCollection.ModifiedAt
254                 }
255
256                 if collection.ReplicationLevel == 0 {
257                         collection.ReplicationLevel = DefaultReplicationLevel
258                 }
259
260                 manifest := manifest.Manifest{sdkCollection.ManifestText}
261                 manifestSize := uint64(len(sdkCollection.ManifestText))
262
263                 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
264                         totalManifestSize += manifestSize
265                 }
266                 if manifestSize > maxManifestSize {
267                         maxManifestSize = manifestSize
268                 }
269
270                 blockChannel := manifest.BlockIterWithDuplicates()
271                 for block := range blockChannel {
272                         if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
273                                 message := fmt.Sprintf(
274                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
275                                         collection.Uuid,
276                                         stored_size,
277                                         block.Size,
278                                         block.Digest)
279                                 loggerutil.FatalWithMessage(arvLogger, message)
280                         }
281                         collection.BlockDigestToSize[block.Digest] = block.Size
282                 }
283                 collection.TotalSize = 0
284                 for _, size := range collection.BlockDigestToSize {
285                         collection.TotalSize += size
286                 }
287                 uuidToCollection[collection.Uuid] = collection
288
289                 // Clear out all the manifest strings that we don't need anymore.
290                 // These hopefully form the bulk of our memory usage.
291                 manifest.Text = ""
292                 sdkCollection.ManifestText = ""
293         }
294
295         return
296 }
297
298 func (readCollections *ReadCollections) Summarize() {
299         readCollections.OwnerToCollectionSize = make(map[string]int)
300         readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
301         numCollections := len(readCollections.UuidToCollection)
302         readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
303         readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
304         readCollections.BlockToCollectionIndices = make(map[blockdigest.BlockDigest][]int)
305
306         for _, coll := range readCollections.UuidToCollection {
307                 collectionIndex := len(readCollections.CollectionIndexToUuid)
308                 readCollections.CollectionIndexToUuid =
309                         append(readCollections.CollectionIndexToUuid, coll.Uuid)
310                 readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
311
312                 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
313                         readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
314
315                 for block, _ := range coll.BlockDigestToSize {
316                         readCollections.BlockToCollectionIndices[block] =
317                                 append(readCollections.BlockToCollectionIndices[block], collectionIndex)
318                         storedReplication := readCollections.BlockToReplication[block]
319                         if coll.ReplicationLevel > storedReplication {
320                                 readCollections.BlockToReplication[block] = coll.ReplicationLevel
321                         }
322                 }
323         }
324
325         return
326 }