1 /* Deals with parsing Collection responses from API Server. */
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10 "git.curoverse.com/arvados.git/sdk/go/logger"
11 "git.curoverse.com/arvados.git/sdk/go/manifest"
12 "git.curoverse.com/arvados.git/sdk/go/util"
13 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
22 heapProfileFilename string
23 // globals for debugging
24 totalManifestSize uint64
25 maxManifestSize uint64
29 DefaultReplicationLevel = 2
32 type Collection struct {
36 BlockDigestToSize map[blockdigest.BlockDigest]int
40 type ReadCollections struct {
41 ReadAllCollections bool
42 UuidToCollection map[string]Collection
43 OwnerToCollectionSize map[string]int
44 BlockToReplication map[blockdigest.BlockDigest]int
47 type GetCollectionsParams struct {
48 Client arvadosclient.ArvadosClient
53 type SdkCollectionInfo struct {
54 Uuid string `json:"uuid"`
55 OwnerUuid string `json:"owner_uuid"`
56 Redundancy int `json:"redundancy"`
57 ModifiedAt time.Time `json:"modified_at"`
58 ManifestText string `json:"manifest_text"`
61 type SdkCollectionList struct {
62 ItemsAvailable int `json:"items_available"`
63 Items []SdkCollectionInfo `json:"items"`
67 flag.StringVar(&heapProfileFilename,
70 "File to write the heap profiles to. Leave blank to skip profiling.")
73 // Write the heap profile to a file for later review.
74 // Since a file is expected to only contain a single heap profile this
75 // function overwrites the previously written profile, so it is safe
76 // to call multiple times in a single run.
77 // Otherwise we would see cumulative numbers as explained here:
78 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
79 func WriteHeapProfile() {
80 if heapProfileFilename != "" {
82 heap_profile, err := os.Create(heapProfileFilename)
87 defer heap_profile.Close()
89 err = pprof.WriteHeapProfile(heap_profile)
96 func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
97 results = GetCollections(params)
100 if params.Logger != nil {
101 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
102 collectionInfo := p["collection_info"].(map[string]interface{})
103 // Since maps are shallow copied, we run a risk of concurrent
104 // updates here. By copying results.OwnerToCollectionSize into
105 // the log, we're assuming that it won't be updated.
106 collectionInfo["owner_to_collection_size"] = results.OwnerToCollectionSize
110 log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
111 log.Printf("Read and processed %d collections",
112 len(results.UuidToCollection))
114 // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
115 // lots of behaviors can become warnings (and obviously we can't
117 // if !readCollections.ReadAllCollections {
118 // log.Fatalf("Did not read all collections")
124 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
125 if ¶ms.Client == nil {
126 log.Fatalf("params.Client passed to GetCollections() should " +
127 "contain a valid ArvadosClient, but instead it is nil.")
130 fieldsWanted := []string{"manifest_text",
133 // TODO(misha): Start using the redundancy field.
137 sdkParams := arvadosclient.Dict{
138 "select": fieldsWanted,
139 "order": []string{"modified_at ASC"},
140 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
142 if params.BatchSize > 0 {
143 sdkParams["limit"] = params.BatchSize
146 initialNumberOfCollectionsAvailable, err :=
147 util.NumberItemsAvailable(params.Client, "collections")
149 loggerutil.FatalWithMessage(params.Logger,
150 fmt.Sprintf("Error querying collection count: %v", err))
152 // Include a 1% margin for collections added while we're reading so
153 // that we don't have to grow the map in most cases.
154 maxExpectedCollections := int(
155 float64(initialNumberOfCollectionsAvailable) * 1.01)
156 results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
158 if params.Logger != nil {
159 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
160 collectionInfo := make(map[string]interface{})
161 collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
162 collectionInfo["batch_size"] = params.BatchSize
163 p["collection_info"] = collectionInfo
167 // These values are just for getting the loop to run the first time,
168 // afterwards they'll be set to real values.
169 previousTotalCollections := -1
170 totalCollections := 0
171 for totalCollections > previousTotalCollections {
172 // We're still finding new collections
174 // Write the heap profile for examining memory usage
177 // Get next batch of collections.
178 var collections SdkCollectionList
179 err := params.Client.List("collections", sdkParams, &collections)
181 loggerutil.FatalWithMessage(params.Logger,
182 fmt.Sprintf("Error querying collections: %v", err))
185 // Process collection and update our date filter.
186 sdkParams["filters"].([][]string)[0][2] =
187 ProcessCollections(params.Logger,
189 results.UuidToCollection).Format(time.RFC3339)
192 previousTotalCollections = totalCollections
193 totalCollections = len(results.UuidToCollection)
195 log.Printf("%d collections read, %d new in last batch, "+
196 "%s latest modified date, %.0f %d %d avg,max,total manifest size",
198 totalCollections-previousTotalCollections,
199 sdkParams["filters"].([][]string)[0][2],
200 float32(totalManifestSize)/float32(totalCollections),
201 maxManifestSize, totalManifestSize)
203 if params.Logger != nil {
204 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
205 collectionInfo := p["collection_info"].(map[string]interface{})
206 collectionInfo["collections_read"] = totalCollections
207 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
208 collectionInfo["total_manifest_size"] = totalManifestSize
209 collectionInfo["max_manifest_size"] = maxManifestSize
214 // Just in case this lowers the numbers reported in the heap profile.
217 // Write the heap profile for examining memory usage
223 // StrCopy returns a newly allocated string.
224 // It is useful to copy slices so that the garbage collector can reuse
225 // the memory of the longer strings they came from.
226 func StrCopy(s string) string {
227 return string([]byte(s))
230 func ProcessCollections(arvLogger *logger.Logger,
231 receivedCollections []SdkCollectionInfo,
232 uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
233 for _, sdkCollection := range receivedCollections {
234 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
235 OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
236 ReplicationLevel: sdkCollection.Redundancy,
237 BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
239 if sdkCollection.ModifiedAt.IsZero() {
240 loggerutil.FatalWithMessage(arvLogger,
242 "Arvados SDK collection returned with unexpected zero "+
243 "modifcation date. This probably means that either we failed to "+
244 "parse the modification date or the API server has changed how "+
245 "it returns modification dates: %+v",
249 if sdkCollection.ModifiedAt.After(latestModificationDate) {
250 latestModificationDate = sdkCollection.ModifiedAt
253 if collection.ReplicationLevel == 0 {
254 collection.ReplicationLevel = DefaultReplicationLevel
257 manifest := manifest.Manifest{sdkCollection.ManifestText}
258 manifestSize := uint64(len(sdkCollection.ManifestText))
260 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
261 totalManifestSize += manifestSize
263 if manifestSize > maxManifestSize {
264 maxManifestSize = manifestSize
267 blockChannel := manifest.BlockIterWithDuplicates()
268 for block := range blockChannel {
269 if stored_size, stored := collection.BlockDigestToSize[block.Digest]; stored && stored_size != block.Size {
270 message := fmt.Sprintf(
271 "Collection %s contains multiple sizes (%d and %d) for block %s",
276 loggerutil.FatalWithMessage(arvLogger, message)
278 collection.BlockDigestToSize[block.Digest] = block.Size
280 collection.TotalSize = 0
281 for _, size := range collection.BlockDigestToSize {
282 collection.TotalSize += size
284 uuidToCollection[collection.Uuid] = collection
286 // Clear out all the manifest strings that we don't need anymore.
287 // These hopefully form the bulk of our memory usage.
289 sdkCollection.ManifestText = ""
295 func Summarize(readCollections *ReadCollections) {
296 readCollections.OwnerToCollectionSize = make(map[string]int)
297 readCollections.BlockToReplication = make(map[blockdigest.BlockDigest]int)
299 for _, coll := range readCollections.UuidToCollection {
300 readCollections.OwnerToCollectionSize[coll.OwnerUuid] =
301 readCollections.OwnerToCollectionSize[coll.OwnerUuid] + coll.TotalSize
303 for block, _ := range coll.BlockDigestToSize {
304 storedReplication := readCollections.BlockToReplication[block]
305 if coll.ReplicationLevel > storedReplication {
306 readCollections.BlockToReplication[block] = coll.ReplicationLevel