+
+ if arvLogger != nil {
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ keepInfo := logger.GetOrCreateMap(p, "keep_info")
+ keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
+ })
+ }
+}
+
+// TrashRequest struct
+type TrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+}
+
+// TrashList is an array of TrashRequest objects
+type TrashList []TrashRequest
+
+// SendTrashLists to trash queue
+func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList, dryRun bool) (errs []error) {
+ count := 0
+ barrier := make(chan error)
+
+ client := kc.Client
+
+ for url, v := range spl {
+ if arvLogger != nil {
+ // We need a local variable because Update doesn't call our mutator func until later,
+ // when our list variable might have been reused by the next loop iteration.
+ url := url
+ trashLen := len(v)
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ trashListInfo := logger.GetOrCreateMap(p, "trash_list_len")
+ trashListInfo[url] = trashLen
+ })
+ }
+
+ if dryRun {
+ log.Printf("dry run, not sending trash list to service %s with %d blocks", url, len(v))
+ continue
+ }
+
+ count++
+ log.Printf("Sending trash list to %v", url)
+
+ go (func(url string, v TrashList) {
+ pipeReader, pipeWriter := io.Pipe()
+ go (func() {
+ enc := json.NewEncoder(pipeWriter)
+ enc.Encode(v)
+ pipeWriter.Close()
+ })()
+
+ req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
+ if err != nil {
+ log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
+ barrier <- err
+ return
+ }
+
+ req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
+
+ // Make the request
+ var resp *http.Response
+ if resp, err = client.Do(req); err != nil {
+ log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+ barrier <- err
+ return
+ }
+
+ log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
+
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ barrier <- errors.New(fmt.Sprintf("Got HTTP code %v", resp.StatusCode))
+ } else {
+ barrier <- nil
+ }
+ })(url, v)
+ }
+
+ for i := 0; i < count; i++ {
+ b := <-barrier
+ if b != nil {
+ errs = append(errs, b)
+ }
+ }
+
+ return errs