Started reading collections and keep data in parallel. Moved some logic from datamana...
[arvados.git] / services / datamanager / collection / collection.go
1 /* Deals with parsing Collection responses from API Server. */
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "log"
13         "os"
14         "runtime"
15         "runtime/pprof"
16         "time"
17 )
18
19 var (
20         heap_profile_filename string
21         // globals for debugging
22         totalManifestSize uint64
23         maxManifestSize uint64
24 )
25
26 type Collection struct {
27         Uuid string
28         OwnerUuid string
29         ReplicationLevel int
30         BlockDigestToSize map[blockdigest.BlockDigest]int
31         TotalSize int
32 }
33
34 type ReadCollections struct {
35         ReadAllCollections bool
36         UuidToCollection map[string]Collection
37         OwnerToCollectionSize map[string]int
38 }
39
40 type GetCollectionsParams struct {
41         Client arvadosclient.ArvadosClient
42         Logger *logger.Logger
43         BatchSize int
44 }
45
46 type SdkCollectionInfo struct {
47         Uuid           string     `json:"uuid"`
48         OwnerUuid      string     `json:"owner_uuid"`
49         Redundancy     int        `json:"redundancy"`
50         ModifiedAt     time.Time  `json:"modified_at"`
51         ManifestText   string     `json:"manifest_text"`
52 }
53
54 type SdkCollectionList struct {
55         ItemsAvailable   int                   `json:"items_available"`
56         Items            []SdkCollectionInfo   `json:"items"`
57 }
58
59 func init() {
60         flag.StringVar(&heap_profile_filename, 
61                 "heap-profile",
62                 "",
63                 "File to write the heap profiles to. Leave blank to skip profiling.")
64 }
65
66 // // Methods to implement util.SdkListResponse Interface
67 // func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
68 //      return s.ItemsAvailable, nil
69 // }
70
71 // func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
72 //      return len(s.Items), nil
73 // }
74
75 // Write the heap profile to a file for later review.
76 // Since a file is expected to only contain a single heap profile this
77 // function overwrites the previously written profile, so it is safe
78 // to call multiple times in a single run.
79 // Otherwise we would see cumulative numbers as explained here:
80 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
81 func WriteHeapProfile() {
82         if heap_profile_filename != "" {
83
84                 heap_profile, err := os.Create(heap_profile_filename)
85                 if err != nil {
86                         log.Fatal(err)
87                 }
88
89                 defer heap_profile.Close()
90
91                 err = pprof.WriteHeapProfile(heap_profile)
92                 if err != nil {
93                         log.Fatal(err)
94                 }
95         }
96 }
97
98
99 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
100         results = GetCollections(params)
101         ComputeSizeOfOwnedCollections(&results)
102
103         if params.Logger != nil {
104                 properties,_ := params.Logger.Edit()
105                 collectionInfo := properties["collection_info"].(map[string]interface{})
106                 collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
107                 params.Logger.Record()
108         }
109
110         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
111         log.Printf("Read and processed %d collections",
112                 len(results.UuidToCollection))
113
114         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
115         // lots of behaviors can become warnings (and obviously we can't
116         // write anything).
117         // if !readCollections.ReadAllCollections {
118         //      log.Fatalf("Did not read all collections")
119         // }
120
121         return
122 }
123
124 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
125         if &params.Client == nil {
126                 log.Fatalf("params.Client passed to GetCollections() should " +
127                         "contain a valid ArvadosClient, but instead it is nil.")
128         }
129
130         fieldsWanted := []string{"manifest_text",
131                 "owner_uuid",
132                 "uuid",
133                 // TODO(misha): Start using the redundancy field.
134                 "redundancy",
135                 "modified_at"}
136
137         sdkParams := arvadosclient.Dict{
138                 "select": fieldsWanted,
139                 "order": []string{"modified_at ASC"},
140                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
141
142         if params.BatchSize > 0 {
143                 sdkParams["limit"] = params.BatchSize
144         }
145
146         initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
147         // Include a 1% margin for collections added while we're reading so
148         // that we don't have to grow the map in most cases.
149         maxExpectedCollections := int(
150                 float64(initialNumberOfCollectionsAvailable) * 1.01)
151         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
152
153         if params.Logger != nil {
154                 properties,_ := params.Logger.Edit()
155                 collectionInfo := make(map[string]interface{})
156                 collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
157                 collectionInfo["batch_size"] = params.BatchSize
158                 properties["collection_info"] = collectionInfo
159                 params.Logger.Record()
160         }
161
162         // These values are just for getting the loop to run the first time,
163         // afterwards they'll be set to real values.
164         previousTotalCollections := -1
165         totalCollections := 0
166         for totalCollections > previousTotalCollections {
167                 // We're still finding new collections
168
169                 // Write the heap profile for examining memory usage
170                 WriteHeapProfile()
171
172                 // Get next batch of collections.
173                 var collections SdkCollectionList
174                 err := params.Client.List("collections", sdkParams, &collections)
175                 if err != nil {
176                         fatalWithMessage(params.Logger,
177                                 fmt.Sprintf("Error querying collections: %v", err))
178                 }
179
180                 // Process collection and update our date filter.
181                 sdkParams["filters"].([][]string)[0][2] =
182                         ProcessCollections(params.Logger,
183                         collections.Items,
184                         results.UuidToCollection).Format(time.RFC3339)
185
186                 // update counts
187                 previousTotalCollections = totalCollections
188                 totalCollections = len(results.UuidToCollection)
189
190                 log.Printf("%d collections read, %d new in last batch, " +
191                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
192                         totalCollections,
193                         totalCollections - previousTotalCollections,
194                         sdkParams["filters"].([][]string)[0][2],
195                         float32(totalManifestSize)/float32(totalCollections),
196                         maxManifestSize, totalManifestSize)
197
198                 if params.Logger != nil {
199                         properties,_ := params.Logger.Edit()
200                         collectionInfo := properties["collection_info"].(map[string]interface{})
201                         collectionInfo["collections_read"] = totalCollections
202                         collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
203                         collectionInfo["total_manifest_size"] = totalManifestSize
204                         collectionInfo["max_manifest_size"] = maxManifestSize
205                         params.Logger.Record()
206                 }
207         }
208
209         // Just in case this lowers the numbers reported in the heap profile.
210         runtime.GC()
211
212         // Write the heap profile for examining memory usage
213         WriteHeapProfile()
214
215         return
216 }
217
218
219 // StrCopy returns a newly allocated string.
220 // It is useful to copy slices so that the garbage collector can reuse
221 // the memory of the longer strings they came from.
222 func StrCopy(s string) string {
223         return string([]byte(s))
224 }
225
226
227 func ProcessCollections(arvLogger *logger.Logger,
228         receivedCollections []SdkCollectionInfo,
229         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
230         for _, sdkCollection := range receivedCollections {
231                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
232                         OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
233                         ReplicationLevel: sdkCollection.Redundancy,
234                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
235
236                 if sdkCollection.ModifiedAt.IsZero() {
237                         fatalWithMessage(arvLogger,
238                                 fmt.Sprintf(
239                                         "Arvados SDK collection returned with unexpected zero " +
240                                                 "modifcation date. This probably means that either we failed to " +
241                                                 "parse the modification date or the API server has changed how " +
242                                                 "it returns modification dates: %v",
243                                         collection))
244                 }
245
246                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
247                         latestModificationDate = sdkCollection.ModifiedAt
248                 }
249                 manifest := manifest.Manifest{sdkCollection.ManifestText}
250                 manifestSize := uint64(len(sdkCollection.ManifestText))
251
252                 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
253                         totalManifestSize += manifestSize
254                 }
255                 if manifestSize > maxManifestSize {
256                         maxManifestSize = manifestSize
257                 }
258                 
259                 blockChannel := manifest.BlockIterWithDuplicates()
260                 for block := range blockChannel {
261                         if stored_size, stored := collection.BlockDigestToSize[block.Digest];
262                         stored && stored_size != block.Size {
263                                 message := fmt.Sprintf(
264                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
265                                         collection.Uuid,
266                                         stored_size,
267                                         block.Size,
268                                         block.Digest)
269                                 fatalWithMessage(arvLogger, message)
270                         }
271                         collection.BlockDigestToSize[block.Digest] = block.Size
272                 }
273                 collection.TotalSize = 0
274                 for _, size := range collection.BlockDigestToSize {
275                         collection.TotalSize += size
276                 }
277                 uuidToCollection[collection.Uuid] = collection
278
279                 // Clear out all the manifest strings that we don't need anymore.
280                 // These hopefully form the bulk of our memory usage.
281                 manifest.Text = ""
282                 sdkCollection.ManifestText = ""
283         }
284
285         return
286 }
287
288
289 func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
290         var collections SdkCollectionList
291         sdkParams := arvadosclient.Dict{"limit": 0}
292         err := client.List("collections", sdkParams, &collections)
293         if err != nil {
294                 log.Fatalf("error querying collections for items available: %v", err)
295         }
296
297         return collections.ItemsAvailable
298 }
299
300
301 func ComputeSizeOfOwnedCollections(readCollections *ReadCollections) {
302         readCollections.OwnerToCollectionSize = make(map[string]int)
303         for _, coll := range readCollections.UuidToCollection {
304                 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
305                         readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
306         }
307
308         return
309 }
310
311
312 // Assumes you haven't already called arvLogger.Edit()!
313 // If you have called arvLogger.Edit() this method will hang waiting
314 // for the lock you're already holding.
315 func fatalWithMessage(arvLogger *logger.Logger, message string) {
316         if arvLogger != nil {
317                 properties,_ := arvLogger.Edit()
318                 properties["FATAL"] = message
319                 properties["run_info"].(map[string]interface{})["end_time"] = time.Now()
320                 arvLogger.ForceRecord()
321         }
322
323         log.Fatalf(message)
324 }