1 // Deals with parsing Collection responses from API Server.
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10 "git.curoverse.com/arvados.git/sdk/go/logger"
11 "git.curoverse.com/arvados.git/sdk/go/manifest"
12 "git.curoverse.com/arvados.git/sdk/go/util"
13 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
21 heapProfileFilename string
22 // globals for debugging
23 totalManifestSize uint64
24 maxManifestSize uint64
27 type Collection struct {
31 BlockDigestToSize map[blockdigest.BlockDigest]int
35 type ReadCollections struct {
36 ReadAllCollections bool
37 UuidToCollection map[string]Collection
38 OwnerToCollectionSize map[string]int
39 BlockToDesiredReplication map[blockdigest.DigestWithSize]int
40 CollectionUuidToIndex map[string]int
41 CollectionIndexToUuid []string
42 BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
45 type GetCollectionsParams struct {
46 Client arvadosclient.ArvadosClient
51 type SdkCollectionInfo struct {
52 Uuid string `json:"uuid"`
53 OwnerUuid string `json:"owner_uuid"`
54 Redundancy int `json:"redundancy"`
55 ModifiedAt time.Time `json:"modified_at"`
56 ManifestText string `json:"manifest_text"`
59 type SdkCollectionList struct {
60 ItemsAvailable int `json:"items_available"`
61 Items []SdkCollectionInfo `json:"items"`
65 flag.StringVar(&heapProfileFilename,
68 "File to write the heap profiles to. Leave blank to skip profiling.")
71 // Write the heap profile to a file for later review.
72 // Since a file is expected to only contain a single heap profile this
73 // function overwrites the previously written profile, so it is safe
74 // to call multiple times in a single run.
75 // Otherwise we would see cumulative numbers as explained here:
76 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
77 func WriteHeapProfile() {
78 if heapProfileFilename != "" {
80 heap_profile, err := os.Create(heapProfileFilename)
85 defer heap_profile.Close()
87 err = pprof.WriteHeapProfile(heap_profile)
94 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
95 results = GetCollections(params)
96 results.Summarize(params.Logger)
98 log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
99 log.Printf("Read and processed %d collections",
100 len(results.UuidToCollection))
102 // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
103 // lots of behaviors can become warnings (and obviously we can't
105 // if !readCollections.ReadAllCollections {
106 // log.Fatalf("Did not read all collections")
112 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
113 if ¶ms.Client == nil {
114 log.Fatalf("params.Client passed to GetCollections() should " +
115 "contain a valid ArvadosClient, but instead it is nil.")
118 fieldsWanted := []string{"manifest_text",
124 sdkParams := arvadosclient.Dict{
125 "select": fieldsWanted,
126 "order": []string{"modified_at ASC"},
127 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
129 if params.BatchSize > 0 {
130 sdkParams["limit"] = params.BatchSize
133 var defaultReplicationLevel int
135 value, err := params.Client.Discovery("defaultCollectionReplication")
137 loggerutil.FatalWithMessage(params.Logger,
138 fmt.Sprintf("Error querying default collection replication: %v", err))
141 defaultReplicationLevel = int(value.(float64))
142 if defaultReplicationLevel <= 0 {
143 loggerutil.FatalWithMessage(params.Logger,
144 fmt.Sprintf("Default collection replication returned by arvados SDK "+
145 "should be a positive integer but instead it was %d.",
146 defaultReplicationLevel))
150 initialNumberOfCollectionsAvailable, err :=
151 util.NumberItemsAvailable(params.Client, "collections")
153 loggerutil.FatalWithMessage(params.Logger,
154 fmt.Sprintf("Error querying collection count: %v", err))
156 // Include a 1% margin for collections added while we're reading so
157 // that we don't have to grow the map in most cases.
158 maxExpectedCollections := int(
159 float64(initialNumberOfCollectionsAvailable) * 1.01)
160 results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
162 if params.Logger != nil {
163 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
164 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
165 collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
166 collectionInfo["batch_size"] = params.BatchSize
167 collectionInfo["default_replication_level"] = defaultReplicationLevel
171 // These values are just for getting the loop to run the first time,
172 // afterwards they'll be set to real values.
173 previousTotalCollections := -1
174 totalCollections := 0
175 for totalCollections > previousTotalCollections {
176 // We're still finding new collections
178 // Write the heap profile for examining memory usage
181 // Get next batch of collections.
182 var collections SdkCollectionList
183 err := params.Client.List("collections", sdkParams, &collections)
185 loggerutil.FatalWithMessage(params.Logger,
186 fmt.Sprintf("Error querying collections: %v", err))
189 // Process collection and update our date filter.
190 sdkParams["filters"].([][]string)[0][2] =
191 ProcessCollections(params.Logger,
193 defaultReplicationLevel,
194 results.UuidToCollection).Format(time.RFC3339)
197 previousTotalCollections = totalCollections
198 totalCollections = len(results.UuidToCollection)
200 log.Printf("%d collections read, %d new in last batch, "+
201 "%s latest modified date, %.0f %d %d avg,max,total manifest size",
203 totalCollections-previousTotalCollections,
204 sdkParams["filters"].([][]string)[0][2],
205 float32(totalManifestSize)/float32(totalCollections),
206 maxManifestSize, totalManifestSize)
208 if params.Logger != nil {
209 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
210 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
211 collectionInfo["collections_read"] = totalCollections
212 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
213 collectionInfo["total_manifest_size"] = totalManifestSize
214 collectionInfo["max_manifest_size"] = maxManifestSize
219 // Write the heap profile for examining memory usage
225 // StrCopy returns a newly allocated string.
226 // It is useful to copy slices so that the garbage collector can reuse
227 // the memory of the longer strings they came from.
228 func StrCopy(s string) string {
229 return string([]byte(s))
232 func ProcessCollections(arvLogger *logger.Logger,
233 receivedCollections []SdkCollectionInfo,
234 defaultReplicationLevel int,
235 uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
236 for _, sdkCollection := range receivedCollections {
237 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
238 OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
239 ReplicationLevel: sdkCollection.Redundancy,
240 BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
242 if sdkCollection.ModifiedAt.IsZero() {
243 loggerutil.FatalWithMessage(arvLogger,
245 "Arvados SDK collection returned with unexpected zero "+
246 "modification date. This probably means that either we failed to "+
247 "parse the modification date or the API server has changed how "+
248 "it returns modification dates: %+v",
252 if sdkCollection.ModifiedAt.After(latestModificationDate) {
253 latestModificationDate = sdkCollection.ModifiedAt
256 if collection.ReplicationLevel == 0 {
257 collection.ReplicationLevel = defaultReplicationLevel
260 manifest := manifest.Manifest{sdkCollection.ManifestText}
261 manifestSize := uint64(len(sdkCollection.ManifestText))
263 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
264 totalManifestSize += manifestSize
266 if manifestSize > maxManifestSize {
267 maxManifestSize = manifestSize
270 blockChannel := manifest.BlockIterWithDuplicates()
271 for block := range blockChannel {
272 if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
273 message := fmt.Sprintf(
274 "Collection %s contains multiple sizes (%d and %d) for block %s",
279 loggerutil.FatalWithMessage(arvLogger, message)
281 collection.BlockDigestToSize[block.Digest] = block.Size
283 collection.TotalSize = 0
284 for _, size := range collection.BlockDigestToSize {
285 collection.TotalSize += size
287 uuidToCollection[collection.Uuid] = collection
289 // Clear out all the manifest strings that we don't need anymore.
290 // These hopefully form the bulk of our memory usage.
292 sdkCollection.ManifestText = ""
298 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
299 readCollections.OwnerToCollectionSize = make(map[string]int)
300 readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
301 numCollections := len(readCollections.UuidToCollection)
302 readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
303 readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
304 readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
306 for _, coll := range readCollections.UuidToCollection {
307 collectionIndex := len(readCollections.CollectionIndexToUuid)
308 readCollections.CollectionIndexToUuid =
309 append(readCollections.CollectionIndexToUuid, coll.Uuid)
310 readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
312 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
313 readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
315 for block, size := range coll.BlockDigestToSize {
316 locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
317 readCollections.BlockToCollectionIndices[locator] =
318 append(readCollections.BlockToCollectionIndices[locator],
320 storedReplication := readCollections.BlockToDesiredReplication[locator]
321 if coll.ReplicationLevel > storedReplication {
322 readCollections.BlockToDesiredReplication[locator] =
323 coll.ReplicationLevel
328 if arvLogger != nil {
329 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
330 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
331 // Since maps are shallow copied, we run a risk of concurrent
332 // updates here. By copying results.OwnerToCollectionSize into
333 // the log, we're assuming that it won't be updated.
334 collectionInfo["owner_to_collection_size"] =
335 readCollections.OwnerToCollectionSize
336 collectionInfo["distinct_blocks_named"] =
337 len(readCollections.BlockToDesiredReplication)