1 /* Deals with parsing Collection responses from API Server. */
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
10 "git.curoverse.com/arvados.git/sdk/go/logger"
11 "git.curoverse.com/arvados.git/sdk/go/manifest"
20 heap_profile_filename string
21 // globals for debugging
22 totalManifestSize uint64
23 maxManifestSize uint64
26 type Collection struct {
30 BlockDigestToSize map[blockdigest.BlockDigest]int
34 type ReadCollections struct {
35 ReadAllCollections bool
36 UuidToCollection map[string]Collection
39 type GetCollectionsParams struct {
40 Client arvadosclient.ArvadosClient
45 type SdkCollectionInfo struct {
46 Uuid string `json:"uuid"`
47 OwnerUuid string `json:"owner_uuid"`
48 Redundancy int `json:"redundancy"`
49 ModifiedAt time.Time `json:"modified_at"`
50 ManifestText string `json:"manifest_text"`
53 type SdkCollectionList struct {
54 ItemsAvailable int `json:"items_available"`
55 Items []SdkCollectionInfo `json:"items"`
59 flag.StringVar(&heap_profile_filename,
62 "File to write the heap profiles to. Leave blank to skip profiling.")
65 // // Methods to implement util.SdkListResponse Interface
66 // func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
67 // return s.ItemsAvailable, nil
70 // func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
71 // return len(s.Items), nil
74 // Write the heap profile to a file for later review.
75 // Since a file is expected to only contain a single heap profile this
76 // function overwrites the previously written profile, so it is safe
77 // to call multiple times in a single run.
78 // Otherwise we would see cumulative numbers as explained here:
79 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
80 func WriteHeapProfile() {
81 if heap_profile_filename != "" {
83 heap_profile, err := os.Create(heap_profile_filename)
88 defer heap_profile.Close()
90 err = pprof.WriteHeapProfile(heap_profile)
98 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
99 if ¶ms.Client == nil {
100 log.Fatalf("params.Client passed to GetCollections() should " +
101 "contain a valid ArvadosClient, but instead it is nil.")
104 fieldsWanted := []string{"manifest_text",
107 // TODO(misha): Start using the redundancy field.
111 sdkParams := arvadosclient.Dict{
112 "select": fieldsWanted,
113 "order": []string{"modified_at ASC"},
114 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
116 if params.BatchSize > 0 {
117 sdkParams["limit"] = params.BatchSize
120 initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
121 // Include a 1% margin for collections added while we're reading so
122 // that we don't have to grow the map in most cases.
123 maxExpectedCollections := int(
124 float64(initialNumberOfCollectionsAvailable) * 1.01)
125 results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
127 if params.Logger != nil {
128 properties,_ := params.Logger.Edit()
129 collectionInfo := make(map[string]interface{})
130 collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
131 collectionInfo["batch_size"] = params.BatchSize
132 properties["collection_info"] = collectionInfo
133 params.Logger.Record()
136 // These values are just for getting the loop to run the first time,
137 // afterwards they'll be set to real values.
138 previousTotalCollections := -1
139 totalCollections := 0
140 for totalCollections > previousTotalCollections {
141 // We're still finding new collections
143 // Write the heap profile for examining memory usage
146 // Get next batch of collections.
147 var collections SdkCollectionList
148 err := params.Client.List("collections", sdkParams, &collections)
150 fatalWithMessage(params.Logger,
151 fmt.Sprintf("Error querying collections: %v", err))
154 // Process collection and update our date filter.
155 sdkParams["filters"].([][]string)[0][2] =
156 ProcessCollections(params.Logger,
158 results.UuidToCollection).Format(time.RFC3339)
161 previousTotalCollections = totalCollections
162 totalCollections = len(results.UuidToCollection)
164 log.Printf("%d collections read, %d new in last batch, " +
165 "%s latest modified date, %.0f %d %d avg,max,total manifest size",
167 totalCollections - previousTotalCollections,
168 sdkParams["filters"].([][]string)[0][2],
169 float32(totalManifestSize)/float32(totalCollections),
170 maxManifestSize, totalManifestSize)
172 if params.Logger != nil {
173 properties,_ := params.Logger.Edit()
174 collectionInfo := properties["collection_info"].(map[string]interface{})
175 collectionInfo["collections_read"] = totalCollections
176 collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
177 collectionInfo["total_manifest_size"] = totalManifestSize
178 collectionInfo["max_manifest_size"] = maxManifestSize
179 params.Logger.Record()
183 // Just in case this lowers the numbers reported in the heap profile.
186 // Write the heap profile for examining memory usage
193 // StrCopy returns a newly allocated string.
194 // It is useful to copy slices so that the garbage collector can reuse
195 // the memory of the longer strings they came from.
196 func StrCopy(s string) string {
197 return string([]byte(s))
201 func ProcessCollections(arvLogger *logger.Logger,
202 receivedCollections []SdkCollectionInfo,
203 uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
204 for _, sdkCollection := range receivedCollections {
205 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
206 OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
207 ReplicationLevel: sdkCollection.Redundancy,
208 BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
210 if sdkCollection.ModifiedAt.IsZero() {
211 fatalWithMessage(arvLogger,
213 "Arvados SDK collection returned with unexpected zero " +
214 "modifcation date. This probably means that either we failed to " +
215 "parse the modification date or the API server has changed how " +
216 "it returns modification dates: %v",
220 if sdkCollection.ModifiedAt.After(latestModificationDate) {
221 latestModificationDate = sdkCollection.ModifiedAt
223 manifest := manifest.Manifest{sdkCollection.ManifestText}
224 manifestSize := uint64(len(sdkCollection.ManifestText))
226 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
227 totalManifestSize += manifestSize
229 if manifestSize > maxManifestSize {
230 maxManifestSize = manifestSize
233 blockChannel := manifest.BlockIterWithDuplicates()
234 for block := range blockChannel {
235 if stored_size, stored := collection.BlockDigestToSize[block.Digest];
236 stored && stored_size != block.Size {
237 message := fmt.Sprintf(
238 "Collection %s contains multiple sizes (%d and %d) for block %s",
243 fatalWithMessage(arvLogger, message)
245 collection.BlockDigestToSize[block.Digest] = block.Size
247 collection.TotalSize = 0
248 for _, size := range collection.BlockDigestToSize {
249 collection.TotalSize += size
251 uuidToCollection[collection.Uuid] = collection
253 // Clear out all the manifest strings that we don't need anymore.
254 // These hopefully form the bulk of our memory usage.
256 sdkCollection.ManifestText = ""
263 func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
264 var collections SdkCollectionList
265 sdkParams := arvadosclient.Dict{"limit": 0}
266 err := client.List("collections", sdkParams, &collections)
268 log.Fatalf("error querying collections for items available: %v", err)
271 return collections.ItemsAvailable
275 // Assumes you haven't already called arvLogger.Edit()!
276 // If you have called arvLogger.Edit() this method will hang waiting
277 // for the lock you're already holding.
278 func fatalWithMessage(arvLogger *logger.Logger, message string) {
279 if arvLogger != nil {
280 properties,_ := arvLogger.Edit()
281 properties["FATAL"] = message
282 properties["run_info"].(map[string]interface{})["end_time"] = time.Now()
283 arvLogger.ForceRecord()