Adds worker pool to speed up processing lots of issues. Refs #19920 19920-move-unassigned-open-tickets
authorLucas Di Pentima <lucas@di-pentima.com.ar>
Wed, 25 Jan 2023 21:29:47 +0000 (18:29 -0300)
committerLucas Di Pentima <lucas@di-pentima.com.ar>
Wed, 25 Jan 2023 21:29:47 +0000 (18:29 -0300)
The single-threaded version was processing issues at about 1 per second,
too slow to move lots of issues in one go.

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima@curii.com>

cmd/art/redmine.go

index abeded38674116b70e6a37fc01f1d9cdd83c115b..1ad293b52b70a87044d2e06a554c9494204d9bad 100644 (file)
@@ -13,6 +13,7 @@ import (
        "regexp"
        "sort"
        "strconv"
+       "sync"
        "time"
 
        "git.arvados.org/arvados-dev.git/lib/redmine"
@@ -182,18 +183,84 @@ var associateOrphans = &cobra.Command{
                        fmt.Printf("Error requesting unassigned open issues from project %d: %s", p.ID, err)
                }
                fmt.Printf("Found %d issues from project '%s' to assign to release '%s'...\n", len(issues), p.Name, r.Name)
+
+               type job struct {
+                       issue  redmine.Issue
+                       rID    int
+                       dryRun bool
+               }
+               type result struct {
+                       msg     string
+                       success bool
+               }
+               var wg sync.WaitGroup
+               jobs := make(chan job, len(issues))
+               results := make(chan result, len(issues))
+
+               worker := func(id int, jobs <-chan job, results chan<- result) {
+                       for j := range jobs {
+                               msg := fmt.Sprintf("#%d - %s ", j.issue.ID, j.issue.Subject)
+                               success := true
+                               if !j.dryRun {
+                                       err = rm.SetRelease(j.issue, j.rID)
+                                       if err != nil {
+                                               success = false
+                                               msg = fmt.Sprintf("%s [error] (%s)\n", msg, err)
+                                       } else {
+                                               msg = fmt.Sprintf("%s [changed]\n", msg)
+                                       }
+                               } else {
+                                       msg = fmt.Sprintf("%s [skipped]\n", msg)
+                               }
+                               results <- result{
+                                       msg:     msg,
+                                       success: success,
+                               }
+                       }
+               }
+
+               wn := 8
+               if len(issues) < wn {
+                       wn = len(issues)
+               }
+               for w := 1; w <= wn; w++ {
+                       wg.Add(1)
+                       w := w
+                       go func() {
+                               defer wg.Done()
+                               worker(w, jobs, results)
+                       }()
+               }
+
                for _, issue := range issues {
-                       fmt.Printf("#%d - %s ", issue.ID, issue.Subject)
-                       if !dryRun {
-                               err = rm.SetRelease(issue, rID)
-                               if err != nil {
-                                       fmt.Printf("[error]\n")
-                                       log.Fatalf("Error trying to assign issue %d to release %d: %s", issue.ID, rID, err)
+                       jobs <- job{
+                               issue:  issue,
+                               rID:    rID,
+                               dryRun: dryRun,
+                       }
+               }
+               close(jobs)
+
+               succeded := true
+               errCount := 0
+               var wg2 sync.WaitGroup
+               wg2.Add(1)
+               go func() {
+                       defer wg2.Done()
+                       for r := range results {
+                               fmt.Printf(r.msg)
+                               if !r.success {
+                                       succeded = false
+                                       errCount += 1
                                }
-                               fmt.Printf("[changed]\n")
-                       } else {
-                               fmt.Printf("[skipped]\n")
                        }
+               }()
+
+               wg.Wait()
+               close(results)
+               wg2.Wait()
+               if !succeded {
+                       log.Fatalf("Warning: %d error(s) found.", errCount)
                }
        },
 }