1 /* Deals with parsing Collection responses from API Server. */
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
9 "git.curoverse.com/arvados.git/sdk/go/logger"
10 "git.curoverse.com/arvados.git/sdk/go/manifest"
19 heap_profile_filename string
20 // globals for debugging
21 totalManifestSize uint64
22 maxManifestSize uint64
25 type Collection struct {
29 BlockDigestToSize map[blockdigest.BlockDigest]int
33 type ReadCollections struct {
34 ReadAllCollections bool
35 UuidToCollection map[string]Collection
38 type GetCollectionsParams struct {
39 Client arvadosclient.ArvadosClient
44 type SdkCollectionInfo struct {
45 Uuid string `json:"uuid"`
46 OwnerUuid string `json:"owner_uuid"`
47 Redundancy int `json:"redundancy"`
48 ModifiedAt time.Time `json:"modified_at"`
49 ManifestText string `json:"manifest_text"`
52 type SdkCollectionList struct {
53 ItemsAvailable int `json:"items_available"`
54 Items []SdkCollectionInfo `json:"items"`
58 flag.StringVar(&heap_profile_filename,
61 "File to write the heap profiles to.")
64 // // Methods to implement util.SdkListResponse Interface
65 // func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
66 // return s.ItemsAvailable, nil
69 // func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
70 // return len(s.Items), nil
73 // Write the heap profile to a file for later review.
74 // Since a file is expected to only contain a single heap profile this
75 // function overwrites the previously written profile, so it is safe
76 // to call multiple times in a single run.
77 // Otherwise we would see cumulative numbers as explained here:
78 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
79 func WriteHeapProfile() {
80 if heap_profile_filename != "" {
82 heap_profile, err := os.Create(heap_profile_filename)
87 defer heap_profile.Close()
89 err = pprof.WriteHeapProfile(heap_profile)
97 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
98 if ¶ms.Client == nil {
99 log.Fatalf("params.Client passed to GetCollections() should " +
100 "contain a valid ArvadosClient, but instead it is nil.")
103 fieldsWanted := []string{"manifest_text",
106 // TODO(misha): Start using the redundancy field.
110 sdkParams := arvadosclient.Dict{
111 "select": fieldsWanted,
112 "order": []string{"modified_at ASC"},
113 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
115 if params.BatchSize > 0 {
116 sdkParams["limit"] = params.BatchSize
119 initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
120 // Include a 1% margin for collections added while we're reading so
121 // that we don't have to grow the map in most cases.
122 maxExpectedCollections := int(
123 float64(initialNumberOfCollectionsAvailable) * 1.01)
124 results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
127 properties,_ := params.Logger.Edit()
128 properties["num_collections_at_start"] = initialNumberOfCollectionsAvailable
130 params.Logger.Record()
132 // These values are just for getting the loop to run the first time,
133 // afterwards they'll be set to real values.
134 previousTotalCollections := -1
135 totalCollections := 0
136 for totalCollections > previousTotalCollections {
137 // We're still finding new collections
139 // Write the heap profile for examining memory usage
142 // Get next batch of collections.
143 var collections SdkCollectionList
144 err := params.Client.List("collections", sdkParams, &collections)
146 log.Fatalf("error querying collections: %+v", err)
149 // Process collection and update our date filter.
150 sdkParams["filters"].([][]string)[0][2] =
151 ProcessCollections(collections.Items, results.UuidToCollection).Format(time.RFC3339)
154 previousTotalCollections = totalCollections
155 totalCollections = len(results.UuidToCollection)
157 log.Printf("%d collections read, %d new in last batch, " +
158 "%s latest modified date, %.0f %d %d avg,max,total manifest size",
160 totalCollections - previousTotalCollections,
161 sdkParams["filters"].([][]string)[0][2],
162 float32(totalManifestSize)/float32(totalCollections),
163 maxManifestSize, totalManifestSize)
166 properties,_ := params.Logger.Edit()
167 properties["collections_read"] = totalCollections
168 properties["latest_modified_date"] = sdkParams["filters"].([][]string)[0][2]
169 properties["total_manifest_size"] = totalManifestSize
170 properties["max_manifest_size"] = maxManifestSize
172 params.Logger.Record()
175 // Just in case this lowers the numbers reported in the heap profile.
178 // Write the heap profile for examining memory usage
185 // StrCopy returns a newly allocated string.
186 // It is useful to copy slices so that the garbage collector can reuse
187 // the memory of the longer strings they came from.
188 func StrCopy(s string) string {
189 return string([]byte(s))
193 func ProcessCollections(receivedCollections []SdkCollectionInfo,
194 uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
195 for _, sdkCollection := range receivedCollections {
196 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
197 OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
198 ReplicationLevel: sdkCollection.Redundancy,
199 BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
201 if sdkCollection.ModifiedAt.IsZero() {
203 "Arvados SDK collection returned with unexpected zero modifcation " +
204 "date. This probably means that either we failed to parse the " +
205 "modification date or the API server has changed how it returns " +
206 "modification dates: %v",
209 if sdkCollection.ModifiedAt.After(latestModificationDate) {
210 latestModificationDate = sdkCollection.ModifiedAt
212 manifest := manifest.Manifest{sdkCollection.ManifestText}
213 manifestSize := uint64(len(sdkCollection.ManifestText))
215 if _, alreadySeen := uuidToCollection[collection.Uuid]; !alreadySeen {
216 totalManifestSize += manifestSize
218 if manifestSize > maxManifestSize {
219 maxManifestSize = manifestSize
222 blockChannel := manifest.BlockIterWithDuplicates()
223 for block := range blockChannel {
224 if stored_size, stored := collection.BlockDigestToSize[block.Digest];
225 stored && stored_size != block.Size {
227 "Collection %s contains multiple sizes (%d and %d) for block %s",
233 collection.BlockDigestToSize[block.Digest] = block.Size
235 collection.TotalSize = 0
236 for _, size := range collection.BlockDigestToSize {
237 collection.TotalSize += size
239 uuidToCollection[collection.Uuid] = collection
241 // Clear out all the manifest strings that we don't need anymore.
242 // These hopefully form the bulk of our memory usage.
244 sdkCollection.ManifestText = ""
251 func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
252 var collections SdkCollectionList
253 sdkParams := arvadosclient.Dict{"limit": 0}
254 err := client.List("collections", sdkParams, &collections)
256 log.Fatalf("error querying collections for items available: %v", err)
259 return collections.ItemsAvailable