Added recording of fatal errors to logger.
[arvados.git] / services / datamanager / collection / collection.go
1 /* Deals with parsing Collection responses from API Server. */
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "log"
13         "os"
14         "runtime"
15         "runtime/pprof"
16         "time"
17 )
18
19 var (
20         heap_profile_filename string
21         // globals for debugging
22         totalManifestSize uint64
23         maxManifestSize uint64
24 )
25
26 type Collection struct {
27         Uuid string
28         OwnerUuid string
29         ReplicationLevel int
30         BlockDigestToSize map[blockdigest.BlockDigest]int
31         TotalSize int
32 }
33
34 type ReadCollections struct {
35         ReadAllCollections bool
36         UuidToCollection map[string]Collection
37 }
38
39 type GetCollectionsParams struct {
40         Client arvadosclient.ArvadosClient
41         Logger *logger.Logger
42         BatchSize int
43 }
44
45 type SdkCollectionInfo struct {
46         Uuid           string     `json:"uuid"`
47         OwnerUuid      string     `json:"owner_uuid"`
48         Redundancy     int        `json:"redundancy"`
49         ModifiedAt     time.Time  `json:"modified_at"`
50         ManifestText   string     `json:"manifest_text"`
51 }
52
53 type SdkCollectionList struct {
54         ItemsAvailable   int                   `json:"items_available"`
55         Items            []SdkCollectionInfo   `json:"items"`
56 }
57
58 func init() {
59         flag.StringVar(&heap_profile_filename, 
60                 "heap-profile",
61                 "",
62                 "File to write the heap profiles to. Leave blank to skip profiling.")
63 }
64
65 // // Methods to implement util.SdkListResponse Interface
66 // func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
67 //      return s.ItemsAvailable, nil
68 // }
69
70 // func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
71 //      return len(s.Items), nil
72 // }
73
74 // Write the heap profile to a file for later review.
75 // Since a file is expected to only contain a single heap profile this
76 // function overwrites the previously written profile, so it is safe
77 // to call multiple times in a single run.
78 // Otherwise we would see cumulative numbers as explained here:
79 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
80 func WriteHeapProfile() {
81         if heap_profile_filename != "" {
82
83                 heap_profile, err := os.Create(heap_profile_filename)
84                 if err != nil {
85                         log.Fatal(err)
86                 }
87
88                 defer heap_profile.Close()
89
90                 err = pprof.WriteHeapProfile(heap_profile)
91                 if err != nil {
92                         log.Fatal(err)
93                 }
94         }
95 }
96
97
98 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
99         if &params.Client == nil {
100                 log.Fatalf("params.Client passed to GetCollections() should " +
101                         "contain a valid ArvadosClient, but instead it is nil.")
102         }
103
104         fieldsWanted := []string{"manifest_text",
105                 "owner_uuid",
106                 "uuid",
107                 // TODO(misha): Start using the redundancy field.
108                 "redundancy",
109                 "modified_at"}
110
111         sdkParams := arvadosclient.Dict{
112                 "select": fieldsWanted,
113                 "order": []string{"modified_at ASC"},
114                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
115
116         if params.BatchSize > 0 {
117                 sdkParams["limit"] = params.BatchSize
118         }
119
120         initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
121         // Include a 1% margin for collections added while we're reading so
122         // that we don't have to grow the map in most cases.
123         maxExpectedCollections := int(
124                 float64(initialNumberOfCollectionsAvailable) * 1.01)
125         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
126
127         if params.Logger != nil {
128                 properties,_ := params.Logger.Edit()
129                 collectionInfo := make(map[string]interface{})
130                 collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
131                 collectionInfo["batch_size"] = params.BatchSize
132                 properties["collection_info"] = collectionInfo
133                 params.Logger.Record()
134         }
135
136         // These values are just for getting the loop to run the first time,
137         // afterwards they'll be set to real values.
138         previousTotalCollections := -1
139         totalCollections := 0
140         for totalCollections > previousTotalCollections {
141                 // We're still finding new collections
142
143                 // Write the heap profile for examining memory usage
144                 WriteHeapProfile()
145
146                 // Get next batch of collections.
147                 var collections SdkCollectionList
148                 err := params.Client.List("collections", sdkParams, &collections)
149                 if err != nil {
150                         fatalWithMessage(params.Logger,
151                                 fmt.Sprintf("Error querying collections: %v", err))
152                 }
153
154                 // Process collection and update our date filter.
155                 sdkParams["filters"].([][]string)[0][2] =
156                         ProcessCollections(params.Logger,
157                         collections.Items,
158                         results.UuidToCollection).Format(time.RFC3339)
159
160                 // update counts
161                 previousTotalCollections = totalCollections
162                 totalCollections = len(results.UuidToCollection)
163
164                 log.Printf("%d collections read, %d new in last batch, " +
165                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
166                         totalCollections,
167                         totalCollections - previousTotalCollections,
168                         sdkParams["filters"].([][]string)[0][2],
169                         float32(totalManifestSize)/float32(totalCollections),
170                         maxManifestSize, totalManifestSize)
171
172                 if params.Logger != nil {
173                         properties,_ := params.Logger.Edit()
174                         collectionInfo := properties["collection_info"].(map[string]interface{})
175                         collectionInfo["collections_read"] = totalCollections
176                         collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
177                         collectionInfo["total_manifest_size"] = totalManifestSize
178                         collectionInfo["max_manifest_size"] = maxManifestSize
179                         params.Logger.Record()
180                 }
181         }
182
183         // Just in case this lowers the numbers reported in the heap profile.
184         runtime.GC()
185
186         // Write the heap profile for examining memory usage
187         WriteHeapProfile()
188
189         return
190 }
191
192
193 // StrCopy returns a newly allocated string.
194 // It is useful to copy slices so that the garbage collector can reuse
195 // the memory of the longer strings they came from.
196 func StrCopy(s string) string {
197         return string([]byte(s))
198 }
199
200
201 func ProcessCollections(arvLogger *logger.Logger,
202         receivedCollections []SdkCollectionInfo,
203         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
204         for _, sdkCollection := range receivedCollections {
205                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
206                         OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
207                         ReplicationLevel: sdkCollection.Redundancy,
208                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
209
210                 if sdkCollection.ModifiedAt.IsZero() {
211                         fatalWithMessage(arvLogger,
212                                 fmt.Sprintf(
213                                         "Arvados SDK collection returned with unexpected zero " +
214                                                 "modifcation date. This probably means that either we failed to " +
215                                                 "parse the modification date or the API server has changed how " +
216                                                 "it returns modification dates: %v",
217                                         collection))
218                 }
219
220                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
221                         latestModificationDate = sdkCollection.ModifiedAt
222                 }
223                 manifest := manifest.Manifest{sdkCollection.ManifestText}
224                 manifestSize := uint64(len(sdkCollection.ManifestText))
225
226                 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
227                         totalManifestSize += manifestSize
228                 }
229                 if manifestSize > maxManifestSize {
230                         maxManifestSize = manifestSize
231                 }
232                 
233                 blockChannel := manifest.BlockIterWithDuplicates()
234                 for block := range blockChannel {
235                         if stored_size, stored := collection.BlockDigestToSize[block.Digest];
236                         stored && stored_size != block.Size {
237                                 message := fmt.Sprintf(
238                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
239                                         collection.Uuid,
240                                         stored_size,
241                                         block.Size,
242                                         block.Digest)
243                                 fatalWithMessage(arvLogger, message)
244                         }
245                         collection.BlockDigestToSize[block.Digest] = block.Size
246                 }
247                 collection.TotalSize = 0
248                 for _, size := range collection.BlockDigestToSize {
249                         collection.TotalSize += size
250                 }
251                 uuidToCollection[collection.Uuid] = collection
252
253                 // Clear out all the manifest strings that we don't need anymore.
254                 // These hopefully form the bulk of our memory usage.
255                 manifest.Text = ""
256                 sdkCollection.ManifestText = ""
257         }
258
259         return
260 }
261
262
263 func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
264         var collections SdkCollectionList
265         sdkParams := arvadosclient.Dict{"limit": 0}
266         err := client.List("collections", sdkParams, &collections)
267         if err != nil {
268                 log.Fatalf("error querying collections for items available: %v", err)
269         }
270
271         return collections.ItemsAvailable
272 }
273
274
275 // Assumes you haven't already called arvLogger.Edit()!
276 // If you have called arvLogger.Edit() this method will hang waiting
277 // for the lock you're already holding.
278 func fatalWithMessage(arvLogger *logger.Logger, message string) {
279         if arvLogger != nil {
280                 properties,_ := arvLogger.Edit()
281                 properties["FATAL"] = message
282                 properties["run_info"].(map[string]interface{})["end_time"] = time.Now()
283                 arvLogger.ForceRecord()
284         }
285
286         log.Fatalf(message)
287 }