Started using Logger in data manager and collections.
[arvados.git] / services / datamanager / collection / collection.go
1 /* Deals with parsing Collection responses from API Server. */
2
3 package collection
4
5 import (
6         "flag"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
9         "git.curoverse.com/arvados.git/sdk/go/logger"
10         "git.curoverse.com/arvados.git/sdk/go/manifest"
11         "log"
12         "os"
13         "runtime"
14         "runtime/pprof"
15         "time"
16 )
17
18 var (
19         heap_profile_filename string
20         // globals for debugging
21         totalManifestSize uint64
22         maxManifestSize uint64
23 )
24
25 type Collection struct {
26         Uuid string
27         OwnerUuid string
28         ReplicationLevel int
29         BlockDigestToSize map[blockdigest.BlockDigest]int
30         TotalSize int
31 }
32
33 type ReadCollections struct {
34         ReadAllCollections bool
35         UuidToCollection map[string]Collection
36 }
37
38 type GetCollectionsParams struct {
39         Client arvadosclient.ArvadosClient
40         Logger *logger.Logger
41         BatchSize int
42 }
43
44 type SdkCollectionInfo struct {
45         Uuid           string     `json:"uuid"`
46         OwnerUuid      string     `json:"owner_uuid"`
47         Redundancy     int        `json:"redundancy"`
48         ModifiedAt     time.Time  `json:"modified_at"`
49         ManifestText   string     `json:"manifest_text"`
50 }
51
52 type SdkCollectionList struct {
53         ItemsAvailable   int                   `json:"items_available"`
54         Items            []SdkCollectionInfo   `json:"items"`
55 }
56
57 func init() {
58         flag.StringVar(&heap_profile_filename, 
59                 "heap-profile",
60                 "",
61                 "File to write the heap profiles to.")
62 }
63
64 // // Methods to implement util.SdkListResponse Interface
65 // func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
66 //      return s.ItemsAvailable, nil
67 // }
68
69 // func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
70 //      return len(s.Items), nil
71 // }
72
73 // Write the heap profile to a file for later review.
74 // Since a file is expected to only contain a single heap profile this
75 // function overwrites the previously written profile, so it is safe
76 // to call multiple times in a single run.
77 // Otherwise we would see cumulative numbers as explained here:
78 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
79 func WriteHeapProfile() {
80         if heap_profile_filename != "" {
81
82                 heap_profile, err := os.Create(heap_profile_filename)
83                 if err != nil {
84                         log.Fatal(err)
85                 }
86
87                 defer heap_profile.Close()
88
89                 err = pprof.WriteHeapProfile(heap_profile)
90                 if err != nil {
91                         log.Fatal(err)
92                 }
93         }
94 }
95
96
97 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
98         if &params.Client == nil {
99                 log.Fatalf("params.Client passed to GetCollections() should " +
100                         "contain a valid ArvadosClient, but instead it is nil.")
101         }
102
103         fieldsWanted := []string{"manifest_text",
104                 "owner_uuid",
105                 "uuid",
106                 // TODO(misha): Start using the redundancy field.
107                 "redundancy",
108                 "modified_at"}
109
110         sdkParams := arvadosclient.Dict{
111                 "select": fieldsWanted,
112                 "order": []string{"modified_at ASC"},
113                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
114
115         if params.BatchSize > 0 {
116                 sdkParams["limit"] = params.BatchSize
117         }
118
119         initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
120         // Include a 1% margin for collections added while we're reading so
121         // that we don't have to grow the map in most cases.
122         maxExpectedCollections := int(
123                 float64(initialNumberOfCollectionsAvailable) * 1.01)
124         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
125
126         {
127                 properties,_ := params.Logger.Edit()
128                 properties["num_collections_at_start"] = initialNumberOfCollectionsAvailable
129         }
130         params.Logger.Record()
131
132         // These values are just for getting the loop to run the first time,
133         // afterwards they'll be set to real values.
134         previousTotalCollections := -1
135         totalCollections := 0
136         for totalCollections > previousTotalCollections {
137                 // We're still finding new collections
138
139                 // Write the heap profile for examining memory usage
140                 WriteHeapProfile()
141
142                 // Get next batch of collections.
143                 var collections SdkCollectionList
144                 err := params.Client.List("collections", sdkParams, &collections)
145                 if err != nil {
146                         log.Fatalf("error querying collections: %+v", err)
147                 }
148
149                 // Process collection and update our date filter.
150                 sdkParams["filters"].([][]string)[0][2] =
151                         ProcessCollections(collections.Items, results.UuidToCollection).Format(time.RFC3339)
152
153                 // update counts
154                 previousTotalCollections = totalCollections
155                 totalCollections = len(results.UuidToCollection)
156
157                 log.Printf("%d collections read, %d new in last batch, " +
158                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
159                         totalCollections,
160                         totalCollections - previousTotalCollections,
161                         sdkParams["filters"].([][]string)[0][2],
162                         float32(totalManifestSize)/float32(totalCollections),
163                         maxManifestSize, totalManifestSize)
164
165                 {
166                         properties,_ := params.Logger.Edit()
167                         properties["collections_read"] = totalCollections
168                         properties["latest_modified_date"] = sdkParams["filters"].([][]string)[0][2]
169                         properties["total_manifest_size"] = totalManifestSize
170                         properties["max_manifest_size"] = maxManifestSize
171                 }
172                 params.Logger.Record()
173         }
174
175         // Just in case this lowers the numbers reported in the heap profile.
176         runtime.GC()
177
178         // Write the heap profile for examining memory usage
179         WriteHeapProfile()
180
181         return
182 }
183
184
185 // StrCopy returns a newly allocated string.
186 // It is useful to copy slices so that the garbage collector can reuse
187 // the memory of the longer strings they came from.
188 func StrCopy(s string) string {
189         return string([]byte(s))
190 }
191
192
193 func ProcessCollections(receivedCollections []SdkCollectionInfo,
194         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
195         for _, sdkCollection := range receivedCollections {
196                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
197                         OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
198                         ReplicationLevel: sdkCollection.Redundancy,
199                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
200
201                 if sdkCollection.ModifiedAt.IsZero() {
202                         log.Fatalf(
203                                 "Arvados SDK collection returned with unexpected zero modifcation " +
204                                         "date. This probably means that either we failed to parse the " +
205                                         "modification date or the API server has changed how it returns " +
206                                         "modification dates: %v",
207                                 collection)
208                 }
209                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
210                         latestModificationDate = sdkCollection.ModifiedAt
211                 }
212                 manifest := manifest.Manifest{sdkCollection.ManifestText}
213                 manifestSize := uint64(len(sdkCollection.ManifestText))
214
215                 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
216                         totalManifestSize += manifestSize
217                 }
218                 if manifestSize > maxManifestSize {
219                         maxManifestSize = manifestSize
220                 }
221                 
222                 blockChannel := manifest.BlockIterWithDuplicates()
223                 for block := range blockChannel {
224                         if stored_size, stored := collection.BlockDigestToSize[block.Digest];
225                         stored && stored_size != block.Size {
226                                 log.Fatalf(
227                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
228                                         collection.Uuid,
229                                         stored_size,
230                                         block.Size,
231                                         block.Digest)
232                         }
233                         collection.BlockDigestToSize[block.Digest] = block.Size
234                 }
235                 collection.TotalSize = 0
236                 for _, size := range collection.BlockDigestToSize {
237                         collection.TotalSize += size
238                 }
239                 uuidToCollection[collection.Uuid] = collection
240
241                 // Clear out all the manifest strings that we don't need anymore.
242                 // These hopefully form the bulk of our memory usage.
243                 manifest.Text = ""
244                 sdkCollection.ManifestText = ""
245         }
246
247         return
248 }
249
250
251 func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
252         var collections SdkCollectionList
253         sdkParams := arvadosclient.Dict{"limit": 0}
254         err := client.List("collections", sdkParams, &collections)
255         if err != nil {
256                 log.Fatalf("error querying collections for items available: %v", err)
257         }
258
259         return collections.ItemsAvailable
260 }