1 /* Deals with parsing Collection responses from API Server. */
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
9 "git.curoverse.com/arvados.git/sdk/go/manifest"
10 //"git.curoverse.com/arvados.git/sdk/go/util"
18 heap_profile_filename string
22 type Collection struct {
26 BlockDigestToSize map[blockdigest.BlockDigest]int
30 type ReadCollections struct {
31 ReadAllCollections bool
32 UuidToCollection map[string]Collection
35 type GetCollectionsParams struct {
36 Client arvadosclient.ArvadosClient
40 type SdkCollectionInfo struct {
41 Uuid string `json:"uuid"`
42 OwnerUuid string `json:"owner_uuid"`
43 Redundancy int `json:"redundancy"`
44 ModifiedAt time.Time `json:"modified_at"`
45 ManifestText string `json:"manifest_text"`
48 type SdkCollectionList struct {
49 ItemsAvailable int `json:"items_available"`
50 Items []SdkCollectionInfo `json:"items"`
54 flag.StringVar(&heap_profile_filename,
57 "File to write the heap profiles to.")
60 // // Methods to implement util.SdkListResponse Interface
61 // func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
62 // return s.ItemsAvailable, nil
65 // func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
66 // return len(s.Items), nil
69 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
70 if ¶ms.Client == nil {
71 log.Fatalf("params.Client passed to GetCollections() should " +
72 "contain a valid ArvadosClient, but instead it is nil.")
75 // TODO(misha): move this code somewhere better and make sure it's
77 if heap_profile_filename != "" {
79 heap_profile, err = os.Create(heap_profile_filename)
85 fieldsWanted := []string{"manifest_text",
88 // TODO(misha): Start using the redundancy field.
92 sdkParams := arvadosclient.Dict{
93 "select": fieldsWanted,
94 "order": []string{"modified_at ASC"},
95 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
97 if params.BatchSize > 0 {
98 sdkParams["limit"] = params.BatchSize
101 // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
102 sdkParams["limit"] = 50
105 // var numReceived, numAvailable int
106 // results.ReadAllCollections, numReceived, numAvailable =
107 // util.ContainsAllAvailableItems(collections)
109 // if (!results.ReadAllCollections) {
110 // log.Printf("ERROR: Did not receive all collections.")
112 // log.Printf("Received %d of %d available collections.",
117 initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
118 // Include a 1% margin for collections added while we're reading so
119 // that we don't have to grow the map in most cases.
120 maxExpectedCollections := int(
121 float64(initialNumberOfCollectionsAvailable) * 1.01)
122 results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
124 // These values are just for getting the loop to run the first time,
125 // afterwards they'll be set to real values.
126 previousTotalCollections := -1
127 totalCollections := 0
128 for totalCollections > previousTotalCollections {
129 // We're still finding new collections
131 // Write the heap profile for examining memory usage
132 if heap_profile != nil {
133 err := pprof.WriteHeapProfile(heap_profile)
139 // Get next batch of collections.
140 var collections SdkCollectionList
141 err := params.Client.List("collections", sdkParams, &collections)
143 log.Fatalf("error querying collections: %+v", err)
146 // Process collection and update our date filter.
147 sdkParams["filters"].([][]string)[0][2] =
148 ProcessCollections(collections.Items, results.UuidToCollection).Format(time.RFC3339)
151 previousTotalCollections = totalCollections
152 totalCollections = len(results.UuidToCollection)
154 log.Printf("%d collections read, %d new in last batch, " +
155 "%s latest modified date",
157 totalCollections - previousTotalCollections,
158 sdkParams["filters"].([][]string)[0][2])
161 // Write the heap profile for examining memory usage
162 if heap_profile != nil {
163 err := pprof.WriteHeapProfile(heap_profile)
173 // StrCopy returns a newly allocated string.
174 // It is useful to copy slices so that the garbage collector can reuse
175 // the memory of the longer strings they came from.
176 func StrCopy(s string) string {
177 return string([]byte(s))
181 func ProcessCollections(receivedCollections []SdkCollectionInfo,
182 uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
183 for _, sdkCollection := range receivedCollections {
184 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
185 OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
186 ReplicationLevel: sdkCollection.Redundancy,
187 BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
189 if sdkCollection.ModifiedAt.IsZero() {
191 "Arvados SDK collection returned with unexpected zero modifcation " +
192 "date. This probably means that either we failed to parse the " +
193 "modification date or the API server has changed how it returns " +
194 "modification dates: %v",
197 if sdkCollection.ModifiedAt.After(latestModificationDate) {
198 latestModificationDate = sdkCollection.ModifiedAt
200 manifest := manifest.Manifest{sdkCollection.ManifestText}
201 blockChannel := manifest.BlockIterWithDuplicates()
202 for block := range blockChannel {
203 if stored_size, stored := collection.BlockDigestToSize[block.Digest];
204 stored && stored_size != block.Size {
206 "Collection %s contains multiple sizes (%d and %d) for block %s",
212 collection.BlockDigestToSize[block.Digest] = block.Size
214 collection.TotalSize = 0
215 for _, size := range collection.BlockDigestToSize {
216 collection.TotalSize += size
218 uuidToCollection[collection.Uuid] = collection
220 // Clear out all the manifest strings that we don't need anymore.
221 // These hopefully form the bulk of our memory usage.
223 sdkCollection.ManifestText = ""
230 func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
231 var collections SdkCollectionList
232 sdkParams := arvadosclient.Dict{"limit": 0}
233 err := client.List("collections", sdkParams, &collections)
235 log.Fatalf("error querying collections for items available: %v", err)
238 return collections.ItemsAvailable