Renamed BlockToReplication BlockToDesiredReplication.
[arvados.git] / services / datamanager / collection / collection.go
1 // Deals with parsing Collection responses from API Server.
2
3 package collection
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/sdk/go/manifest"
12         "git.curoverse.com/arvados.git/sdk/go/util"
13         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
14         "log"
15         "os"
16         "runtime/pprof"
17         "time"
18 )
19
20 var (
21         heapProfileFilename string
22         // globals for debugging
23         totalManifestSize uint64
24         maxManifestSize   uint64
25 )
26
27 const (
28         // TODO(misha): Read this value from the SDK once support is added
29         // as suggested in https://arvados.org/issues/3408#note-31
30         DefaultReplicationLevel = 2
31 )
32
33 type Collection struct {
34         Uuid              string
35         OwnerUuid         string
36         ReplicationLevel  int
37         BlockDigestToSize map[blockdigest.BlockDigest]int
38         TotalSize         int
39 }
40
41 type ReadCollections struct {
42         ReadAllCollections        bool
43         UuidToCollection          map[string]Collection
44         OwnerToCollectionSize     map[string]int
45         BlockToDesiredReplication map[blockdigest.DigestWithSize]int
46         CollectionUuidToIndex     map[string]int
47         CollectionIndexToUuid     []string
48         BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
49 }
50
51 type GetCollectionsParams struct {
52         Client    arvadosclient.ArvadosClient
53         Logger    *logger.Logger
54         BatchSize int
55 }
56
57 type SdkCollectionInfo struct {
58         Uuid         string    `json:"uuid"`
59         OwnerUuid    string    `json:"owner_uuid"`
60         Redundancy   int       `json:"redundancy"`
61         ModifiedAt   time.Time `json:"modified_at"`
62         ManifestText string    `json:"manifest_text"`
63 }
64
65 type SdkCollectionList struct {
66         ItemsAvailable int                 `json:"items_available"`
67         Items          []SdkCollectionInfo `json:"items"`
68 }
69
70 func init() {
71         flag.StringVar(&heapProfileFilename,
72                 "heap-profile",
73                 "",
74                 "File to write the heap profiles to. Leave blank to skip profiling.")
75 }
76
77 // Write the heap profile to a file for later review.
78 // Since a file is expected to only contain a single heap profile this
79 // function overwrites the previously written profile, so it is safe
80 // to call multiple times in a single run.
81 // Otherwise we would see cumulative numbers as explained here:
82 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
83 func WriteHeapProfile() {
84         if heapProfileFilename != "" {
85
86                 heap_profile, err := os.Create(heapProfileFilename)
87                 if err != nil {
88                         log.Fatal(err)
89                 }
90
91                 defer heap_profile.Close()
92
93                 err = pprof.WriteHeapProfile(heap_profile)
94                 if err != nil {
95                         log.Fatal(err)
96                 }
97         }
98 }
99
100 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
101         results = GetCollections(params)
102         results.Summarize(params.Logger)
103
104         log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
105         log.Printf("Read and processed %d collections",
106                 len(results.UuidToCollection))
107
108         // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
109         // lots of behaviors can become warnings (and obviously we can't
110         // write anything).
111         // if !readCollections.ReadAllCollections {
112         //      log.Fatalf("Did not read all collections")
113         // }
114
115         return
116 }
117
118 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
119         if &params.Client == nil {
120                 log.Fatalf("params.Client passed to GetCollections() should " +
121                         "contain a valid ArvadosClient, but instead it is nil.")
122         }
123
124         fieldsWanted := []string{"manifest_text",
125                 "owner_uuid",
126                 "uuid",
127                 "redundancy",
128                 "modified_at"}
129
130         sdkParams := arvadosclient.Dict{
131                 "select":  fieldsWanted,
132                 "order":   []string{"modified_at ASC"},
133                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
134
135         if params.BatchSize > 0 {
136                 sdkParams["limit"] = params.BatchSize
137         }
138
139         initialNumberOfCollectionsAvailable, err :=
140                 util.NumberItemsAvailable(params.Client, "collections")
141         if err != nil {
142                 loggerutil.FatalWithMessage(params.Logger,
143                         fmt.Sprintf("Error querying collection count: %v", err))
144         }
145         // Include a 1% margin for collections added while we're reading so
146         // that we don't have to grow the map in most cases.
147         maxExpectedCollections := int(
148                 float64(initialNumberOfCollectionsAvailable) * 1.01)
149         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
150
151         if params.Logger != nil {
152                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
153                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
154                         collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
155                         collectionInfo["batch_size"] = params.BatchSize
156                 })
157         }
158
159         // These values are just for getting the loop to run the first time,
160         // afterwards they'll be set to real values.
161         previousTotalCollections := -1
162         totalCollections := 0
163         for totalCollections > previousTotalCollections {
164                 // We're still finding new collections
165
166                 // Write the heap profile for examining memory usage
167                 WriteHeapProfile()
168
169                 // Get next batch of collections.
170                 var collections SdkCollectionList
171                 err := params.Client.List("collections", sdkParams, &collections)
172                 if err != nil {
173                         loggerutil.FatalWithMessage(params.Logger,
174                                 fmt.Sprintf("Error querying collections: %v", err))
175                 }
176
177                 // Process collection and update our date filter.
178                 sdkParams["filters"].([][]string)[0][2] =
179                         ProcessCollections(params.Logger,
180                                 collections.Items,
181                                 results.UuidToCollection).Format(time.RFC3339)
182
183                 // update counts
184                 previousTotalCollections = totalCollections
185                 totalCollections = len(results.UuidToCollection)
186
187                 log.Printf("%d collections read, %d new in last batch, "+
188                         "%s latest modified date, %.0f %d %d avg,max,total manifest size",
189                         totalCollections,
190                         totalCollections-previousTotalCollections,
191                         sdkParams["filters"].([][]string)[0][2],
192                         float32(totalManifestSize)/float32(totalCollections),
193                         maxManifestSize, totalManifestSize)
194
195                 if params.Logger != nil {
196                         params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
197                                 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
198                                 collectionInfo["collections_read"] = totalCollections
199                                 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
200                                 collectionInfo["total_manifest_size"] = totalManifestSize
201                                 collectionInfo["max_manifest_size"] = maxManifestSize
202                         })
203                 }
204         }
205
206         // Write the heap profile for examining memory usage
207         WriteHeapProfile()
208
209         return
210 }
211
212 // StrCopy returns a newly allocated string.
213 // It is useful to copy slices so that the garbage collector can reuse
214 // the memory of the longer strings they came from.
215 func StrCopy(s string) string {
216         return string([]byte(s))
217 }
218
219 func ProcessCollections(arvLogger *logger.Logger,
220         receivedCollections []SdkCollectionInfo,
221         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
222         for _, sdkCollection := range receivedCollections {
223                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
224                         OwnerUuid:         StrCopy(sdkCollection.OwnerUuid),
225                         ReplicationLevel:  sdkCollection.Redundancy,
226                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
227
228                 if sdkCollection.ModifiedAt.IsZero() {
229                         loggerutil.FatalWithMessage(arvLogger,
230                                 fmt.Sprintf(
231                                         "Arvados SDK collection returned with unexpected zero "+
232                                                 "modification date. This probably means that either we failed to "+
233                                                 "parse the modification date or the API server has changed how "+
234                                                 "it returns modification dates: %+v",
235                                         collection))
236                 }
237
238                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
239                         latestModificationDate = sdkCollection.ModifiedAt
240                 }
241
242                 if collection.ReplicationLevel == 0 {
243                         collection.ReplicationLevel = DefaultReplicationLevel
244                 }
245
246                 manifest := manifest.Manifest{sdkCollection.ManifestText}
247                 manifestSize := uint64(len(sdkCollection.ManifestText))
248
249                 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
250                         totalManifestSize += manifestSize
251                 }
252                 if manifestSize > maxManifestSize {
253                         maxManifestSize = manifestSize
254                 }
255
256                 blockChannel := manifest.BlockIterWithDuplicates()
257                 for block := range blockChannel {
258                         if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
259                                 message := fmt.Sprintf(
260                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
261                                         collection.Uuid,
262                                         stored_size,
263                                         block.Size,
264                                         block.Digest)
265                                 loggerutil.FatalWithMessage(arvLogger, message)
266                         }
267                         collection.BlockDigestToSize[block.Digest] = block.Size
268                 }
269                 collection.TotalSize = 0
270                 for _, size := range collection.BlockDigestToSize {
271                         collection.TotalSize += size
272                 }
273                 uuidToCollection[collection.Uuid] = collection
274
275                 // Clear out all the manifest strings that we don't need anymore.
276                 // These hopefully form the bulk of our memory usage.
277                 manifest.Text = ""
278                 sdkCollection.ManifestText = ""
279         }
280
281         return
282 }
283
284 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
285         readCollections.OwnerToCollectionSize = make(map[string]int)
286         readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
287         numCollections := len(readCollections.UuidToCollection)
288         readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
289         readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
290         readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
291
292         for _, coll := range readCollections.UuidToCollection {
293                 collectionIndex := len(readCollections.CollectionIndexToUuid)
294                 readCollections.CollectionIndexToUuid =
295                         append(readCollections.CollectionIndexToUuid, coll.Uuid)
296                 readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
297
298                 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
299                         readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
300
301                 for block, size := range coll.BlockDigestToSize {
302                         locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
303                         readCollections.BlockToCollectionIndices[locator] =
304                                 append(readCollections.BlockToCollectionIndices[locator],
305                                         collectionIndex)
306                         storedReplication := readCollections.BlockToDesiredReplication[locator]
307                         if coll.ReplicationLevel > storedReplication {
308                                 readCollections.BlockToDesiredReplication[locator] =
309                                         coll.ReplicationLevel
310                         }
311                 }
312         }
313
314         if arvLogger != nil {
315                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
316                         collectionInfo := logger.GetOrCreateMap(p, "collection_info")
317                         // Since maps are shallow copied, we run a risk of concurrent
318                         // updates here. By copying results.OwnerToCollectionSize into
319                         // the log, we're assuming that it won't be updated.
320                         collectionInfo["owner_to_collection_size"] =
321                                 readCollections.OwnerToCollectionSize
322                         collectionInfo["distinct_blocks_named"] =
323                                 len(readCollections.BlockToDesiredReplication)
324                 })
325         }
326
327         return
328 }