1 // Deals with parsing Collection responses from API Server.
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10 "git.curoverse.com/arvados.git/sdk/go/logger"
11 "git.curoverse.com/arvados.git/sdk/go/manifest"
12 "git.curoverse.com/arvados.git/sdk/go/util"
13 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
21 heapProfileFilename string
22 // globals for debugging
23 totalManifestSize uint64
24 maxManifestSize uint64
27 // Collection representation
28 type Collection struct {
32 BlockDigestToSize map[blockdigest.BlockDigest]int
36 // ReadCollections holds information about collections from API server
37 type ReadCollections struct {
38 ReadAllCollections bool
39 UUIDToCollection map[string]Collection
40 OwnerToCollectionSize map[string]int
41 BlockToDesiredReplication map[blockdigest.DigestWithSize]int
42 CollectionUUIDToIndex map[string]int
43 CollectionIndexToUUID []string
44 BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
47 // GetCollectionsParams params
48 type GetCollectionsParams struct {
49 Client arvadosclient.ArvadosClient
54 // SdkCollectionInfo holds collection info from api
55 type SdkCollectionInfo struct {
56 UUID string `json:"uuid"`
57 OwnerUUID string `json:"owner_uuid"`
58 Redundancy int `json:"redundancy"`
59 ModifiedAt time.Time `json:"modified_at"`
60 ManifestText string `json:"manifest_text"`
63 // SdkCollectionList lists collections from api
64 type SdkCollectionList struct {
65 ItemsAvailable int `json:"items_available"`
66 Items []SdkCollectionInfo `json:"items"`
70 flag.StringVar(&heapProfileFilename,
73 "File to write the heap profiles to. Leave blank to skip profiling.")
76 // WriteHeapProfile writes the heap profile to a file for later review.
77 // Since a file is expected to only contain a single heap profile this
78 // function overwrites the previously written profile, so it is safe
79 // to call multiple times in a single run.
80 // Otherwise we would see cumulative numbers as explained here:
81 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
82 func WriteHeapProfile() {
83 if heapProfileFilename != "" {
85 heapProfile, err := os.Create(heapProfileFilename)
90 defer heapProfile.Close()
92 err = pprof.WriteHeapProfile(heapProfile)
99 // GetCollectionsAndSummarize gets collections from api and summarizes
100 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
101 results = GetCollections(params)
102 results.Summarize(params.Logger)
104 log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
105 log.Printf("Read and processed %d collections",
106 len(results.UUIDToCollection))
108 // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
109 // lots of behaviors can become warnings (and obviously we can't
111 // if !readCollections.ReadAllCollections {
112 // log.Fatalf("Did not read all collections")
118 // GetCollections gets collections from api
119 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
120 if ¶ms.Client == nil {
121 log.Fatalf("params.Client passed to GetCollections() should " +
122 "contain a valid ArvadosClient, but instead it is nil.")
125 fieldsWanted := []string{"manifest_text",
131 sdkParams := arvadosclient.Dict{
132 "select": fieldsWanted,
133 "order": []string{"modified_at ASC"},
134 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
136 if params.BatchSize > 0 {
137 sdkParams["limit"] = params.BatchSize
140 var defaultReplicationLevel int
142 value, err := params.Client.Discovery("defaultCollectionReplication")
144 loggerutil.FatalWithMessage(params.Logger,
145 fmt.Sprintf("Error querying default collection replication: %v", err))
148 defaultReplicationLevel = int(value.(float64))
149 if defaultReplicationLevel <= 0 {
150 loggerutil.FatalWithMessage(params.Logger,
151 fmt.Sprintf("Default collection replication returned by arvados SDK "+
152 "should be a positive integer but instead it was %d.",
153 defaultReplicationLevel))
157 initialNumberOfCollectionsAvailable, err :=
158 util.NumberItemsAvailable(params.Client, "collections")
160 loggerutil.FatalWithMessage(params.Logger,
161 fmt.Sprintf("Error querying collection count: %v", err))
163 // Include a 1% margin for collections added while we're reading so
164 // that we don't have to grow the map in most cases.
165 maxExpectedCollections := int(
166 float64(initialNumberOfCollectionsAvailable) * 1.01)
167 results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
169 if params.Logger != nil {
170 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
171 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
172 collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
173 collectionInfo["batch_size"] = params.BatchSize
174 collectionInfo["default_replication_level"] = defaultReplicationLevel
178 // These values are just for getting the loop to run the first time,
179 // afterwards they'll be set to real values.
180 previousTotalCollections := -1
181 totalCollections := 0
182 for totalCollections > previousTotalCollections {
183 // We're still finding new collections
185 // Write the heap profile for examining memory usage
188 // Get next batch of collections.
189 var collections SdkCollectionList
190 err := params.Client.List("collections", sdkParams, &collections)
192 loggerutil.FatalWithMessage(params.Logger,
193 fmt.Sprintf("Error querying collections: %v", err))
196 // Process collection and update our date filter.
197 sdkParams["filters"].([][]string)[0][2] =
198 ProcessCollections(params.Logger,
200 defaultReplicationLevel,
201 results.UUIDToCollection).Format(time.RFC3339)
204 previousTotalCollections = totalCollections
205 totalCollections = len(results.UUIDToCollection)
207 log.Printf("%d collections read, %d new in last batch, "+
208 "%s latest modified date, %.0f %d %d avg,max,total manifest size",
210 totalCollections-previousTotalCollections,
211 sdkParams["filters"].([][]string)[0][2],
212 float32(totalManifestSize)/float32(totalCollections),
213 maxManifestSize, totalManifestSize)
215 if params.Logger != nil {
216 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
217 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
218 collectionInfo["collections_read"] = totalCollections
219 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
220 collectionInfo["total_manifest_size"] = totalManifestSize
221 collectionInfo["max_manifest_size"] = maxManifestSize
226 // Write the heap profile for examining memory usage
232 // StrCopy returns a newly allocated string.
233 // It is useful to copy slices so that the garbage collector can reuse
234 // the memory of the longer strings they came from.
235 func StrCopy(s string) string {
236 return string([]byte(s))
239 // ProcessCollections read from api server
240 func ProcessCollections(arvLogger *logger.Logger,
241 receivedCollections []SdkCollectionInfo,
242 defaultReplicationLevel int,
243 UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
244 for _, sdkCollection := range receivedCollections {
245 collection := Collection{UUID: StrCopy(sdkCollection.UUID),
246 OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
247 ReplicationLevel: sdkCollection.Redundancy,
248 BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
250 if sdkCollection.ModifiedAt.IsZero() {
251 loggerutil.FatalWithMessage(arvLogger,
253 "Arvados SDK collection returned with unexpected zero "+
254 "modification date. This probably means that either we failed to "+
255 "parse the modification date or the API server has changed how "+
256 "it returns modification dates: %+v",
260 if sdkCollection.ModifiedAt.After(latestModificationDate) {
261 latestModificationDate = sdkCollection.ModifiedAt
264 if collection.ReplicationLevel == 0 {
265 collection.ReplicationLevel = defaultReplicationLevel
268 manifest := manifest.Manifest{sdkCollection.ManifestText}
269 manifestSize := uint64(len(sdkCollection.ManifestText))
271 if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
272 totalManifestSize += manifestSize
274 if manifestSize > maxManifestSize {
275 maxManifestSize = manifestSize
278 blockChannel := manifest.BlockIterWithDuplicates()
279 for block := range blockChannel {
280 if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
281 message := fmt.Sprintf(
282 "Collection %s contains multiple sizes (%d and %d) for block %s",
287 loggerutil.FatalWithMessage(arvLogger, message)
289 collection.BlockDigestToSize[block.Digest] = block.Size
291 collection.TotalSize = 0
292 for _, size := range collection.BlockDigestToSize {
293 collection.TotalSize += size
295 UUIDToCollection[collection.UUID] = collection
297 // Clear out all the manifest strings that we don't need anymore.
298 // These hopefully form the bulk of our memory usage.
300 sdkCollection.ManifestText = ""
306 // Summarize the collections read
307 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
308 readCollections.OwnerToCollectionSize = make(map[string]int)
309 readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
310 numCollections := len(readCollections.UUIDToCollection)
311 readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
312 readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
313 readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
315 for _, coll := range readCollections.UUIDToCollection {
316 collectionIndex := len(readCollections.CollectionIndexToUUID)
317 readCollections.CollectionIndexToUUID =
318 append(readCollections.CollectionIndexToUUID, coll.UUID)
319 readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
321 readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
322 readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
324 for block, size := range coll.BlockDigestToSize {
325 locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
326 readCollections.BlockToCollectionIndices[locator] =
327 append(readCollections.BlockToCollectionIndices[locator],
329 storedReplication := readCollections.BlockToDesiredReplication[locator]
330 if coll.ReplicationLevel > storedReplication {
331 readCollections.BlockToDesiredReplication[locator] =
332 coll.ReplicationLevel
337 if arvLogger != nil {
338 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
339 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
340 // Since maps are shallow copied, we run a risk of concurrent
341 // updates here. By copying results.OwnerToCollectionSize into
342 // the log, we're assuming that it won't be updated.
343 collectionInfo["owner_to_collection_size"] =
344 readCollections.OwnerToCollectionSize
345 collectionInfo["distinct_blocks_named"] =
346 len(readCollections.BlockToDesiredReplication)