1 // Deals with parsing Collection responses from API Server.
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10 "git.curoverse.com/arvados.git/sdk/go/logger"
11 "git.curoverse.com/arvados.git/sdk/go/manifest"
12 "git.curoverse.com/arvados.git/sdk/go/util"
13 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
21 heapProfileFilename string
22 // globals for debugging
23 totalManifestSize uint64
24 maxManifestSize uint64
28 // TODO(misha): Read this value from the SDK once support is added
29 // as suggested in https://arvados.org/issues/3408#note-31
30 DefaultReplicationLevel = 2
33 type Collection struct {
37 BlockDigestToSize map[blockdigest.BlockDigest]int
41 type ReadCollections struct {
42 ReadAllCollections bool
43 UuidToCollection map[string]Collection
44 OwnerToCollectionSize map[string]int
45 BlockToDesiredReplication map[blockdigest.DigestWithSize]int
46 CollectionUuidToIndex map[string]int
47 CollectionIndexToUuid []string
48 BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
51 type GetCollectionsParams struct {
52 Client arvadosclient.ArvadosClient
57 type SdkCollectionInfo struct {
58 Uuid string `json:"uuid"`
59 OwnerUuid string `json:"owner_uuid"`
60 Redundancy int `json:"redundancy"`
61 ModifiedAt time.Time `json:"modified_at"`
62 ManifestText string `json:"manifest_text"`
65 type SdkCollectionList struct {
66 ItemsAvailable int `json:"items_available"`
67 Items []SdkCollectionInfo `json:"items"`
71 flag.StringVar(&heapProfileFilename,
74 "File to write the heap profiles to. Leave blank to skip profiling.")
77 // Write the heap profile to a file for later review.
78 // Since a file is expected to only contain a single heap profile this
79 // function overwrites the previously written profile, so it is safe
80 // to call multiple times in a single run.
81 // Otherwise we would see cumulative numbers as explained here:
82 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
83 func WriteHeapProfile() {
84 if heapProfileFilename != "" {
86 heap_profile, err := os.Create(heapProfileFilename)
91 defer heap_profile.Close()
93 err = pprof.WriteHeapProfile(heap_profile)
100 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
101 results = GetCollections(params)
102 results.Summarize(params.Logger)
104 log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
105 log.Printf("Read and processed %d collections",
106 len(results.UuidToCollection))
108 // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
109 // lots of behaviors can become warnings (and obviously we can't
111 // if !readCollections.ReadAllCollections {
112 // log.Fatalf("Did not read all collections")
118 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
119 if ¶ms.Client == nil {
120 log.Fatalf("params.Client passed to GetCollections() should " +
121 "contain a valid ArvadosClient, but instead it is nil.")
124 fieldsWanted := []string{"manifest_text",
130 sdkParams := arvadosclient.Dict{
131 "select": fieldsWanted,
132 "order": []string{"modified_at ASC"},
133 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
135 if params.BatchSize > 0 {
136 sdkParams["limit"] = params.BatchSize
139 initialNumberOfCollectionsAvailable, err :=
140 util.NumberItemsAvailable(params.Client, "collections")
142 loggerutil.FatalWithMessage(params.Logger,
143 fmt.Sprintf("Error querying collection count: %v", err))
145 // Include a 1% margin for collections added while we're reading so
146 // that we don't have to grow the map in most cases.
147 maxExpectedCollections := int(
148 float64(initialNumberOfCollectionsAvailable) * 1.01)
149 results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
151 if params.Logger != nil {
152 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
153 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
154 collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
155 collectionInfo["batch_size"] = params.BatchSize
159 // These values are just for getting the loop to run the first time,
160 // afterwards they'll be set to real values.
161 previousTotalCollections := -1
162 totalCollections := 0
163 for totalCollections > previousTotalCollections {
164 // We're still finding new collections
166 // Write the heap profile for examining memory usage
169 // Get next batch of collections.
170 var collections SdkCollectionList
171 err := params.Client.List("collections", sdkParams, &collections)
173 loggerutil.FatalWithMessage(params.Logger,
174 fmt.Sprintf("Error querying collections: %v", err))
177 // Process collection and update our date filter.
178 sdkParams["filters"].([][]string)[0][2] =
179 ProcessCollections(params.Logger,
181 results.UuidToCollection).Format(time.RFC3339)
184 previousTotalCollections = totalCollections
185 totalCollections = len(results.UuidToCollection)
187 log.Printf("%d collections read, %d new in last batch, "+
188 "%s latest modified date, %.0f %d %d avg,max,total manifest size",
190 totalCollections-previousTotalCollections,
191 sdkParams["filters"].([][]string)[0][2],
192 float32(totalManifestSize)/float32(totalCollections),
193 maxManifestSize, totalManifestSize)
195 if params.Logger != nil {
196 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
197 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
198 collectionInfo["collections_read"] = totalCollections
199 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
200 collectionInfo["total_manifest_size"] = totalManifestSize
201 collectionInfo["max_manifest_size"] = maxManifestSize
206 // Write the heap profile for examining memory usage
212 // StrCopy returns a newly allocated string.
213 // It is useful to copy slices so that the garbage collector can reuse
214 // the memory of the longer strings they came from.
215 func StrCopy(s string) string {
216 return string([]byte(s))
219 func ProcessCollections(arvLogger *logger.Logger,
220 receivedCollections []SdkCollectionInfo,
221 uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
222 for _, sdkCollection := range receivedCollections {
223 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
224 OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
225 ReplicationLevel: sdkCollection.Redundancy,
226 BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
228 if sdkCollection.ModifiedAt.IsZero() {
229 loggerutil.FatalWithMessage(arvLogger,
231 "Arvados SDK collection returned with unexpected zero "+
232 "modification date. This probably means that either we failed to "+
233 "parse the modification date or the API server has changed how "+
234 "it returns modification dates: %+v",
238 if sdkCollection.ModifiedAt.After(latestModificationDate) {
239 latestModificationDate = sdkCollection.ModifiedAt
242 if collection.ReplicationLevel == 0 {
243 collection.ReplicationLevel = DefaultReplicationLevel
246 manifest := manifest.Manifest{sdkCollection.ManifestText}
247 manifestSize := uint64(len(sdkCollection.ManifestText))
249 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
250 totalManifestSize += manifestSize
252 if manifestSize > maxManifestSize {
253 maxManifestSize = manifestSize
256 blockChannel := manifest.BlockIterWithDuplicates()
257 for block := range blockChannel {
258 if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
259 message := fmt.Sprintf(
260 "Collection %s contains multiple sizes (%d and %d) for block %s",
265 loggerutil.FatalWithMessage(arvLogger, message)
267 collection.BlockDigestToSize[block.Digest] = block.Size
269 collection.TotalSize = 0
270 for _, size := range collection.BlockDigestToSize {
271 collection.TotalSize += size
273 uuidToCollection[collection.Uuid] = collection
275 // Clear out all the manifest strings that we don't need anymore.
276 // These hopefully form the bulk of our memory usage.
278 sdkCollection.ManifestText = ""
284 func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
285 readCollections.OwnerToCollectionSize = make(map[string]int)
286 readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
287 numCollections := len(readCollections.UuidToCollection)
288 readCollections.CollectionUuidToIndex = make(map[string]int, numCollections)
289 readCollections.CollectionIndexToUuid = make([]string, 0, numCollections)
290 readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
292 for _, coll := range readCollections.UuidToCollection {
293 collectionIndex := len(readCollections.CollectionIndexToUuid)
294 readCollections.CollectionIndexToUuid =
295 append(readCollections.CollectionIndexToUuid, coll.Uuid)
296 readCollections.CollectionUuidToIndex[coll.Uuid] = collectionIndex
298 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
299 readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
301 for block, size := range coll.BlockDigestToSize {
302 locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
303 readCollections.BlockToCollectionIndices[locator] =
304 append(readCollections.BlockToCollectionIndices[locator],
306 storedReplication := readCollections.BlockToDesiredReplication[locator]
307 if coll.ReplicationLevel > storedReplication {
308 readCollections.BlockToDesiredReplication[locator] =
309 coll.ReplicationLevel
314 if arvLogger != nil {
315 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
316 collectionInfo := logger.GetOrCreateMap(p, "collection_info")
317 // Since maps are shallow copied, we run a risk of concurrent
318 // updates here. By copying results.OwnerToCollectionSize into
319 // the log, we're assuming that it won't be updated.
320 collectionInfo["owner_to_collection_size"] =
321 readCollections.OwnerToCollectionSize
322 collectionInfo["distinct_blocks_named"] =
323 len(readCollections.BlockToDesiredReplication)