Added BlockToReplication field to collection.ReadCollections.
[arvados.git] / services / datamanager / collection / collection.go
1 /* Deals with parsing Collection responses from API Server. */
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
14         "log"
15         "os"
16         "runtime"
17         "runtime/pprof"
18         "time"
19 )
20
21 var (
22         heapProfileFilename string
23         // globals for debugging
24         totalManifestSize uint64
25         maxManifestSize   uint64
26 )
27
28 const (
29         DefaultReplicationLevel = 2
30 )
31
32 type Collection struct {
33         Uuid              string
34         OwnerUuid         string
35         ReplicationLevel  int
36         BlockDigestToSize map[blockdigest.BlockDigest]int
37         TotalSize         int
38 }
39
40 type ReadCollections struct {
41         ReadAllCollections    bool
42         UuidToCollection      map[string]Collection
43         OwnerToCollectionSize map[string]int
44         BlockToReplication    map[blockdigest.BlockDigest]int
45 }
46
47 type GetCollectionsParams struct {
48         Client    arvadosclient.ArvadosClient
49         Logger    *logger.Logger
50         BatchSize int
51 }
52
53 type SdkCollectionInfo struct {
54         Uuid         string    `json:"uuid"`
55         OwnerUuid    string    `json:"owner_uuid"`
56         Redundancy   int       `json:"redundancy"`
57         ModifiedAt   time.Time `json:"modified_at"`
58         ManifestText string    `json:"manifest_text"`
59 }
60
61 type SdkCollectionList struct {
62         ItemsAvailable int                 `json:"items_available"`
63         Items          []SdkCollectionInfo `json:"items"`
64 }
65
66 func init() {
67         flag.StringVar(&heapProfileFilename,
68                 "heap-profile",
69                 "",
70                 "File to write the heap profiles to. Leave blank to skip profiling.")
71 }
72
73 // Write the heap profile to a file for later review.
74 // Since a file is expected to only contain a single heap profile this
75 // function overwrites the previously written profile, so it is safe
76 // to call multiple times in a single run.
77 // Otherwise we would see cumulative numbers as explained here:
78 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
79 func WriteHeapProfile() {
80         if heapProfileFilename != "" {
81
82                 heap_profile, err := os.Create(heapProfileFilename)
83                 if err != nil {
84                         log.Fatal(err)
85                 }
86
87                 defer heap_profile.Close()
88
89                 err = pprof.WriteHeapProfile(heap_profile)
90                 if err != nil {
91                         log.Fatal(err)
92                 }
93         }
94 }
95
96 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
97         results = GetCollections(params)
98         Summarize(&results)
99
100         if params.Logger != nil {
101                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
102                         collectionInfo := p["collection_info"].(map[string]interface{})
103                         // Since maps are shallow copied, we run a risk of concurrent
104                         // updates here. By copying results.OwnerToCollectionSize into
105                         // the log, we're assuming that it won't be updated.
106                         collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
107                 })
108         }
109
110         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
111         log.Printf("Read and processed %d collections",
112                 len(results.UuidToCollection))
113
114         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
115         // lots of behaviors can become warnings (and obviously we can't
116         // write anything).
117         // if !readCollections.ReadAllCollections {
118         //      log.Fatalf("Did not read all collections")
119         // }
120
121         return
122 }
123
124 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
125         if &params.Client == nil {
126                 log.Fatalf("params.Client passed to GetCollections() should " +
127                         "contain a valid ArvadosClient, but instead it is nil.")
128         }
129
130         fieldsWanted := []string{"manifest_text",
131                 "owner_uuid",
132                 "uuid",
133                 // TODO(misha): Start using the redundancy field.
134                 "redundancy",
135                 "modified_at"}
136
137         sdkParams := arvadosclient.Dict{
138                 "select":  fieldsWanted,
139                 "order":   []string{"modified_at ASC"},
140                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
141
142         if params.BatchSize > 0 {
143                 sdkParams["limit"] = params.BatchSize
144         }
145
146         initialNumberOfCollectionsAvailable, err :=
147                 util.NumberItemsAvailable(params.Client, "collections")
148         if err != nil {
149                 loggerutil.FatalWithMessage(params.Logger,
150                         fmt.Sprintf("Error querying collection count: %v", err))
151         }
152         // Include a 1% margin for collections added while we're reading so
153         // that we don't have to grow the map in most cases.
154         maxExpectedCollections := int(
155                 float64(initialNumberOfCollectionsAvailable) * 1.01)
156         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
157
158         if params.Logger != nil {
159                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
160                         collectionInfo := make(map[string]interface{})
161                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
162                         collectionInfo["batch_size"] = params.BatchSize
163                         p["collection_info"] = collectionInfo
164                 })
165         }
166
167         // These values are just for getting the loop to run the first time,
168         // afterwards they'll be set to real values.
169         previousTotalCollections := -1
170         totalCollections := 0
171         for totalCollections > previousTotalCollections {
172                 // We're still finding new collections
173
174                 // Write the heap profile for examining memory usage
175                 WriteHeapProfile()
176
177                 // Get next batch of collections.
178                 var collections SdkCollectionList
179                 err := params.Client.List("collections", sdkParams, &collections)
180                 if err != nil {
181                         loggerutil.FatalWithMessage(params.Logger,
182                                 fmt.Sprintf("Error querying collections: %v", err))
183                 }
184
185                 // Process collection and update our date filter.
186                 sdkParams["filters"].([][]string)[0][2] =
187                         ProcessCollections(params.Logger,
188                                 collections.Items,
189                                 results.UuidToCollection).Format(time.RFC3339)
190
191                 // update counts
192                 previousTotalCollections = totalCollections
193                 totalCollections = len(results.UuidToCollection)
194
195                 log.Printf("%d collections read, %d new in last batch, "+
196                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
197                         totalCollections,
198                         totalCollections-previousTotalCollections,
199                         sdkParams["filters"].([][]string)[0][2],
200                         float32(totalManifestSize)/float32(totalCollections),
201                         maxManifestSize, totalManifestSize)
202
203                 if params.Logger != nil {
204                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
205                                 collectionInfo := p["collection_info"].(map[string]interface{})
206                                 collectionInfo["collections_read"] = totalCollections
207                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
208                                 collectionInfo["total_manifest_size"] = totalManifestSize
209                                 collectionInfo["max_manifest_size"] = maxManifestSize
210                         })
211                 }
212         }
213
214         // Just in case this lowers the numbers reported in the heap profile.
215         runtime.GC()
216
217         // Write the heap profile for examining memory usage
218         WriteHeapProfile()
219
220         return
221 }
222
223 // StrCopy returns a newly allocated string.
224 // It is useful to copy slices so that the garbage collector can reuse
225 // the memory of the longer strings they came from.
226 func StrCopy(s string) string {
227         return string([]byte(s))
228 }
229
230 func ProcessCollections(arvLogger *logger.Logger,
231         receivedCollections []SdkCollectionInfo,
232         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
233         for _, sdkCollection := range receivedCollections {
234                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
235                         OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
236                         ReplicationLevel:  sdkCollection.Redundancy,
237                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
238
239                 if sdkCollection.ModifiedAt.IsZero() {
240                         loggerutil.FatalWithMessage(arvLogger,
241                                 fmt.Sprintf(
242                                         "Arvados SDK collection returned with unexpected zero "+
243                                                 "modifcation date. This probably means that either we failed to "+
244                                                 "parse the modification date or the API server has changed how "+
245                                                 "it returns modification dates: %+v",
246                                         collection))
247                 }
248
249                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
250                         latestModificationDate = sdkCollection.ModifiedAt
251                 }
252
253                 if collection.ReplicationLevel == 0 {
254                         collection.ReplicationLevel = DefaultReplicationLevel
255                 }
256
257                 manifest := manifest.Manifest{sdkCollection.ManifestText}
258                 manifestSize := uint64(len(sdkCollection.ManifestText))
259
260                 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
261                         totalManifestSize += manifestSize
262                 }
263                 if manifestSize > maxManifestSize {
264                         maxManifestSize = manifestSize
265                 }
266
267                 blockChannel := manifest.BlockIterWithDuplicates()
268                 for block := range blockChannel {
269                         if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
270                                 message := fmt.Sprintf(
271                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
272                                         collection.Uuid,
273                                         stored_size,
274                                         block.Size,
275                                         block.Digest)
276                                 loggerutil.FatalWithMessage(arvLogger, message)
277                         }
278                         collection.BlockDigestToSize[block.Digest] = block.Size
279                 }
280                 collection.TotalSize = 0
281                 for _, size := range collection.BlockDigestToSize {
282                         collection.TotalSize += size
283                 }
284                 uuidToCollection[collection.Uuid] = collection
285
286                 // Clear out all the manifest strings that we don't need anymore.
287                 // These hopefully form the bulk of our memory usage.
288                 manifest.Text = ""
289                 sdkCollection.ManifestText = ""
290         }
291
292         return
293 }
294
295 func Summarize(readCollections *ReadCollections) {
296         readCollections.OwnerToCollectionSize = make(map[string]int)
297         readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
298
299         for _, coll := range readCollections.UuidToCollection {
300                 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
301                         readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
302
303                 for block, _ := range coll.BlockDigestToSize {
304                         storedReplication := readCollections.BlockToReplication[block]
305                         if coll.ReplicationLevel > storedReplication {
306                                 readCollections.BlockToReplication[block] = coll.ReplicationLevel
307                         }
308                 }
309         }
310
311         return
312 }