closes #5197
[arvados.git] / services / datamanager / collection / collection.go
1 /* Deals with parsing Collection responses from API Server. */
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
14         "log"
15         "os"
16         "runtime"
17         "runtime/pprof"
18         "time"
19 )
20
21 var (
22         heap_profile_filename string
23         // globals for debugging
24         totalManifestSize uint64
25         maxManifestSize   uint64
26 )
27
28 type Collection struct {
29         Uuid              string
30         OwnerUuid         string
31         ReplicationLevel  int
32         BlockDigestToSize map[blockdigest.BlockDigest]int
33         TotalSize         int
34 }
35
36 type ReadCollections struct {
37         ReadAllCollections    bool
38         UuidToCollection      map[string]Collection
39         OwnerToCollectionSize map[string]int
40 }
41
42 type GetCollectionsParams struct {
43         Client    arvadosclient.ArvadosClient
44         Logger    *logger.Logger
45         BatchSize int
46 }
47
48 type SdkCollectionInfo struct {
49         Uuid         string    `json:"uuid"`
50         OwnerUuid    string    `json:"owner_uuid"`
51         Redundancy   int       `json:"redundancy"`
52         ModifiedAt   time.Time `json:"modified_at"`
53         ManifestText string    `json:"manifest_text"`
54 }
55
56 type SdkCollectionList struct {
57         ItemsAvailable int                 `json:"items_available"`
58         Items          []SdkCollectionInfo `json:"items"`
59 }
60
61 func init() {
62         flag.StringVar(&heap_profile_filename,
63                 "heap-profile",
64                 "",
65                 "File to write the heap profiles to. Leave blank to skip profiling.")
66 }
67
68 // Write the heap profile to a file for later review.
69 // Since a file is expected to only contain a single heap profile this
70 // function overwrites the previously written profile, so it is safe
71 // to call multiple times in a single run.
72 // Otherwise we would see cumulative numbers as explained here:
73 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
74 func WriteHeapProfile() {
75         if heap_profile_filename != "" {
76
77                 heap_profile, err := os.Create(heap_profile_filename)
78                 if err != nil {
79                         log.Fatal(err)
80                 }
81
82                 defer heap_profile.Close()
83
84                 err = pprof.WriteHeapProfile(heap_profile)
85                 if err != nil {
86                         log.Fatal(err)
87                 }
88         }
89 }
90
91 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
92         results = GetCollections(params)
93         ComputeSizeOfOwnedCollections(&results)
94
95         if params.Logger != nil {
96                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
97                         collectionInfo := p["collection_info"].(map[string]interface{})
98                         // Since maps are shallow copied, we run a risk of concurrent
99                         // updates here. By copying results.OwnerToCollectionSize into
100                         // the log, we're assuming that it won't be updated.
101                         collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
102                 })
103         }
104
105         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
106         log.Printf("Read and processed %d collections",
107                 len(results.UuidToCollection))
108
109         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
110         // lots of behaviors can become warnings (and obviously we can't
111         // write anything).
112         // if !readCollections.ReadAllCollections {
113         //      log.Fatalf("Did not read all collections")
114         // }
115
116         return
117 }
118
119 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
120         if &params.Client == nil {
121                 log.Fatalf("params.Client passed to GetCollections() should " +
122                         "contain a valid ArvadosClient, but instead it is nil.")
123         }
124
125         fieldsWanted := []string{"manifest_text",
126                 "owner_uuid",
127                 "uuid",
128                 // TODO(misha): Start using the redundancy field.
129                 "redundancy",
130                 "modified_at"}
131
132         sdkParams := arvadosclient.Dict{
133                 "select":  fieldsWanted,
134                 "order":   []string{"modified_at ASC"},
135                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
136
137         if params.BatchSize > 0 {
138                 sdkParams["limit"] = params.BatchSize
139         }
140
141         initialNumberOfCollectionsAvailable, err :=
142                 util.NumberItemsAvailable(params.Client, "collections")
143         if err != nil {
144                 loggerutil.FatalWithMessage(params.Logger,
145                         fmt.Sprintf("Error querying collection count: %v", err))
146         }
147         // Include a 1% margin for collections added while we're reading so
148         // that we don't have to grow the map in most cases.
149         maxExpectedCollections := int(
150                 float64(initialNumberOfCollectionsAvailable) * 1.01)
151         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
152
153         if params.Logger != nil {
154                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
155                         collectionInfo := make(map[string]interface{})
156                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
157                         collectionInfo["batch_size"] = params.BatchSize
158                         p["collection_info"] = collectionInfo
159                 })
160         }
161
162         // These values are just for getting the loop to run the first time,
163         // afterwards they'll be set to real values.
164         previousTotalCollections := -1
165         totalCollections := 0
166         for totalCollections > previousTotalCollections {
167                 // We're still finding new collections
168
169                 // Write the heap profile for examining memory usage
170                 WriteHeapProfile()
171
172                 // Get next batch of collections.
173                 var collections SdkCollectionList
174                 err := params.Client.List("collections", sdkParams, &collections)
175                 if err != nil {
176                         loggerutil.FatalWithMessage(params.Logger,
177                                 fmt.Sprintf("Error querying collections: %v", err))
178                 }
179
180                 // Process collection and update our date filter.
181                 sdkParams["filters"].([][]string)[0][2] =
182                         ProcessCollections(params.Logger,
183                                 collections.Items,
184                                 results.UuidToCollection).Format(time.RFC3339)
185
186                 // update counts
187                 previousTotalCollections = totalCollections
188                 totalCollections = len(results.UuidToCollection)
189
190                 log.Printf("%d collections read, %d new in last batch, "+
191                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
192                         totalCollections,
193                         totalCollections-previousTotalCollections,
194                         sdkParams["filters"].([][]string)[0][2],
195                         float32(totalManifestSize)/float32(totalCollections),
196                         maxManifestSize, totalManifestSize)
197
198                 if params.Logger != nil {
199                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
200                                 collectionInfo := p["collection_info"].(map[string]interface{})
201                                 collectionInfo["collections_read"] = totalCollections
202                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
203                                 collectionInfo["total_manifest_size"] = totalManifestSize
204                                 collectionInfo["max_manifest_size"] = maxManifestSize
205                         })
206                 }
207         }
208
209         // Just in case this lowers the numbers reported in the heap profile.
210         runtime.GC()
211
212         // Write the heap profile for examining memory usage
213         WriteHeapProfile()
214
215         return
216 }
217
218 // StrCopy returns a newly allocated string.
219 // It is useful to copy slices so that the garbage collector can reuse
220 // the memory of the longer strings they came from.
221 func StrCopy(s string) string {
222         return string([]byte(s))
223 }
224
225 func ProcessCollections(arvLogger *logger.Logger,
226         receivedCollections []SdkCollectionInfo,
227         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
228         for _, sdkCollection := range receivedCollections {
229                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
230                         OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
231                         ReplicationLevel:  sdkCollection.Redundancy,
232                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
233
234                 if sdkCollection.ModifiedAt.IsZero() {
235                         loggerutil.FatalWithMessage(arvLogger,
236                                 fmt.Sprintf(
237                                         "Arvados SDK collection returned with unexpected zero "+
238                                                 "modifcation date. This probably means that either we failed to "+
239                                                 "parse the modification date or the API server has changed how "+
240                                                 "it returns modification dates: %v",
241                                         collection))
242                 }
243
244                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
245                         latestModificationDate = sdkCollection.ModifiedAt
246                 }
247                 manifest := manifest.Manifest{sdkCollection.ManifestText}
248                 manifestSize := uint64(len(sdkCollection.ManifestText))
249
250                 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
251                         totalManifestSize += manifestSize
252                 }
253                 if manifestSize > maxManifestSize {
254                         maxManifestSize = manifestSize
255                 }
256
257                 blockChannel := manifest.BlockIterWithDuplicates()
258                 for block := range blockChannel {
259                         if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
260                                 message := fmt.Sprintf(
261                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
262                                         collection.Uuid,
263                                         stored_size,
264                                         block.Size,
265                                         block.Digest)
266                                 loggerutil.FatalWithMessage(arvLogger, message)
267                         }
268                         collection.BlockDigestToSize[block.Digest] = block.Size
269                 }
270                 collection.TotalSize = 0
271                 for _, size := range collection.BlockDigestToSize {
272                         collection.TotalSize += size
273                 }
274                 uuidToCollection[collection.Uuid] = collection
275
276                 // Clear out all the manifest strings that we don't need anymore.
277                 // These hopefully form the bulk of our memory usage.
278                 manifest.Text = ""
279                 sdkCollection.ManifestText = ""
280         }
281
282         return
283 }
284
285 func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
286         readCollections.OwnerToCollectionSize = make(map[string]int)
287         for _, coll := range readCollections.UuidToCollection {
288                 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
289                         readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
290         }
291
292         return
293 }