Added string copying to try to reduce memory usage, didn't seem to work. Cleaned...
[arvados.git] / services / datamanager / collection / collection.go
1 /* Deals with parsing Collection responses from API Server. */
2
3 package collection
4
5 import (
6         "flag"
7         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
8         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
9         "git.curoverse.com/arvados.git/sdk/go/manifest"
10         //"git.curoverse.com/arvados.git/sdk/go/util"
11         "log"
12         "os"
13         "runtime/pprof"
14         "time"
15 )
16
17 var (
18         heap_profile_filename string
19         heap_profile *os.File
20 )
21
22 type Collection struct {
23         Uuid string
24         OwnerUuid string
25         ReplicationLevel int
26         BlockDigestToSize map[blockdigest.BlockDigest]int
27         TotalSize int
28 }
29
30 type ReadCollections struct {
31         ReadAllCollections bool
32         UuidToCollection map[string]Collection
33 }
34
35 type GetCollectionsParams struct {
36         Client arvadosclient.ArvadosClient
37         BatchSize int
38 }
39
40 type SdkCollectionInfo struct {
41         Uuid           string     `json:"uuid"`
42         OwnerUuid      string     `json:"owner_uuid"`
43         Redundancy     int        `json:"redundancy"`
44         ModifiedAt     time.Time  `json:"modified_at"`
45         ManifestText   string     `json:"manifest_text"`
46 }
47
48 type SdkCollectionList struct {
49         ItemsAvailable   int                   `json:"items_available"`
50         Items            []SdkCollectionInfo   `json:"items"`
51 }
52
53 func init() {
54         flag.StringVar(&heap_profile_filename, 
55                 "heap-profile",
56                 "",
57                 "File to write the heap profiles to.")
58 }
59
60 // // Methods to implement util.SdkListResponse Interface
61 // func (s SdkCollectionList) NumItemsAvailable() (numAvailable int, err error) {
62 //      return s.ItemsAvailable, nil
63 // }
64
65 // func (s SdkCollectionList) NumItemsContained() (numContained int, err error) {
66 //      return len(s.Items), nil
67 // }
68
69 func GetCollections(params GetCollectionsParams) (results ReadCollections) {
70         if &params.Client == nil {
71                 log.Fatalf("params.Client passed to GetCollections() should " +
72                         "contain a valid ArvadosClient, but instead it is nil.")
73         }
74
75         // TODO(misha): move this code somewhere better and make sure it's
76         // only run once
77         if heap_profile_filename != "" {
78                 var err error
79                 heap_profile, err = os.Create(heap_profile_filename)
80                 if err != nil {
81                         log.Fatal(err)
82                 }
83         }
84
85         fieldsWanted := []string{"manifest_text",
86                 "owner_uuid",
87                 "uuid",
88                 // TODO(misha): Start using the redundancy field.
89                 "redundancy",
90                 "modified_at"}
91
92         sdkParams := arvadosclient.Dict{
93                 "select": fieldsWanted,
94                 "order": []string{"modified_at ASC"},
95                 "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}}}
96
97         if params.BatchSize > 0 {
98                 sdkParams["limit"] = params.BatchSize
99         }
100
101         // MISHA UNDO THIS TEMPORARY HACK TO FIND BUG!
102         sdkParams["limit"] = 50
103
104         // {
105         //      var numReceived, numAvailable int
106         //      results.ReadAllCollections, numReceived, numAvailable =
107         //              util.ContainsAllAvailableItems(collections)
108
109         //      if (!results.ReadAllCollections) {
110         //              log.Printf("ERROR: Did not receive all collections.")
111         //      }
112         //      log.Printf("Received %d of %d available collections.",
113         //              numReceived,
114         //              numAvailable)
115         // }
116
117         initialNumberOfCollectionsAvailable := NumberCollectionsAvailable(params.Client)
118         // Include a 1% margin for collections added while we're reading so
119         // that we don't have to grow the map in most cases.
120         maxExpectedCollections := int(
121                 float64(initialNumberOfCollectionsAvailable) * 1.01)
122         results.UuidToCollection = make(map[string]Collection, maxExpectedCollections)
123
124         // These values are just for getting the loop to run the first time,
125         // afterwards they'll be set to real values.
126         previousTotalCollections := -1
127         totalCollections := 0
128         for totalCollections > previousTotalCollections {
129                 // We're still finding new collections
130
131                 // Write the heap profile for examining memory usage
132                 if heap_profile != nil {
133                         err := pprof.WriteHeapProfile(heap_profile)
134                         if err != nil {
135                                 log.Fatal(err)
136                         }
137                 }
138
139                 // Get next batch of collections.
140                 var collections SdkCollectionList
141                 err := params.Client.List("collections", sdkParams, &collections)
142                 if err != nil {
143                         log.Fatalf("error querying collections: %+v", err)
144                 }
145
146                 // Process collection and update our date filter.
147                 sdkParams["filters"].([][]string)[0][2] =
148                         ProcessCollections(collections.Items, results.UuidToCollection).Format(time.RFC3339)
149
150                 // update counts
151                 previousTotalCollections = totalCollections
152                 totalCollections = len(results.UuidToCollection)
153
154                 log.Printf("%d collections read, %d new in last batch, " +
155                         "%s latest modified date",
156                         totalCollections,
157                         totalCollections - previousTotalCollections,
158                         sdkParams["filters"].([][]string)[0][2])
159         }
160
161         // Write the heap profile for examining memory usage
162         if heap_profile != nil {
163                 err := pprof.WriteHeapProfile(heap_profile)
164                 if err != nil {
165                         log.Fatal(err)
166                 }
167         }
168
169         return
170 }
171
172
173 // StrCopy returns a newly allocated string.
174 // It is useful to copy slices so that the garbage collector can reuse
175 // the memory of the longer strings they came from.
176 func StrCopy(s string) string {
177         return string([]byte(s))
178 }
179
180
181 func ProcessCollections(receivedCollections []SdkCollectionInfo,
182         uuidToCollection map[string]Collection) (latestModificationDate time.Time) {
183         for _, sdkCollection := range receivedCollections {
184                 collection := Collection{Uuid: StrCopy(sdkCollection.Uuid),
185                         OwnerUuid: StrCopy(sdkCollection.OwnerUuid),
186                         ReplicationLevel: sdkCollection.Redundancy,
187                         BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
188
189                 if sdkCollection.ModifiedAt.IsZero() {
190                         log.Fatalf(
191                                 "Arvados SDK collection returned with unexpected zero modifcation " +
192                                         "date. This probably means that either we failed to parse the " +
193                                         "modification date or the API server has changed how it returns " +
194                                         "modification dates: %v",
195                                 collection)
196                 }
197                 if sdkCollection.ModifiedAt.After(latestModificationDate) {
198                         latestModificationDate = sdkCollection.ModifiedAt
199                 }
200                 manifest := manifest.Manifest{sdkCollection.ManifestText}
201                 blockChannel := manifest.BlockIterWithDuplicates()
202                 for block := range blockChannel {
203                         if stored_size, stored := collection.BlockDigestToSize[block.Digest];
204                         stored && stored_size != block.Size {
205                                 log.Fatalf(
206                                         "Collection %s contains multiple sizes (%d and %d) for block %s",
207                                         collection.Uuid,
208                                         stored_size,
209                                         block.Size,
210                                         block.Digest)
211                         }
212                         collection.BlockDigestToSize[block.Digest] = block.Size
213                 }
214                 collection.TotalSize = 0
215                 for _, size := range collection.BlockDigestToSize {
216                         collection.TotalSize += size
217                 }
218                 uuidToCollection[collection.Uuid] = collection
219
220                 // Clear out all the manifest strings that we don't need anymore.
221                 // These hopefully form the bulk of our memory usage.
222                 manifest.Text = ""
223                 sdkCollection.ManifestText = ""
224         }
225
226         return
227 }
228
229
230 func NumberCollectionsAvailable(client arvadosclient.ArvadosClient) (int) {
231         var collections SdkCollectionList
232         sdkParams := arvadosclient.Dict{"limit": 0}
233         err := client.List("collections", sdkParams, &collections)
234         if err != nil {
235                 log.Fatalf("error querying collections for items available: %v", err)
236         }
237
238         return collections.ItemsAvailable
239 }