Use new Arvados logs API.
authorTom Clegg <tom@curii.com>
Thu, 16 Nov 2023 16:12:59 +0000 (11:12 -0500)
committerTom Clegg <tom@curii.com>
Thu, 16 Nov 2023 21:49:00 +0000 (16:49 -0500)
No issue #

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

arvados.go

index 44d893caf471be81c9fcb04b847ac60361bc74bb..5d23286fa0763f894fe084b0251792301fc58378 100644 (file)
@@ -6,12 +6,14 @@ package lightning
 
 import (
        "bufio"
+       "bytes"
        "context"
        "encoding/json"
        "errors"
        "fmt"
        "io"
        "io/ioutil"
+       "net/http"
        "net/url"
        "os"
        "regexp"
@@ -297,6 +299,7 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e
        }()
 
        neednewline := ""
+       logTell := map[string]int64{}
 
        lastState := cr.State
        refreshCR := func() {
@@ -325,9 +328,14 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e
                        log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
                        client.Subscribe(logch, cr.ContainerUUID)
                        subscribedUUID = cr.ContainerUUID
+                       logTell = map[string]int64{}
                }
        }
 
+       var logWaitMax = time.Second * 10
+       var logWaitMin = time.Second
+       var logWait = logWaitMin
+       var logWaitDone = time.After(logWait)
        var reCrunchstat = regexp.MustCompile(`mem .* (\d+) rss`)
 waitctr:
        for cr.State != arvados.ContainerRequestStateFinal {
@@ -345,27 +353,72 @@ waitctr:
                case <-refreshTicker.C:
                        refreshCR()
                case msg := <-logch:
-                       switch msg.EventType {
-                       case "update":
+                       if msg.EventType == "update" {
                                refreshCR()
-                       case "stderr":
-                               for _, line := range strings.Split(msg.Properties.Text, "\n") {
-                                       if line != "" {
+                       }
+               case <-logWaitDone:
+                       any := false
+                       for _, fnm := range []string{"stderr.txt", "crunchstat.txt"} {
+                               req, err := http.NewRequest("GET", "https://"+runner.Client.APIHost+"/arvados/v1/container_requests/"+cr.UUID+"/log/"+cr.ContainerUUID+"/"+fnm, nil)
+                               if err != nil {
+                                       log.Errorf("error preparing log request: %s", err)
+                                       continue
+                               }
+                               req.Header.Set("Range", fmt.Sprintf("bytes=%d-", logTell[fnm]))
+                               resp, err := runner.Client.Do(req)
+                               if err != nil {
+                                       log.Errorf("error getting log data: %s", err)
+                                       continue
+                               } else if (resp.StatusCode == http.StatusNotFound && logTell[fnm] == 0) ||
+                                       (resp.StatusCode == http.StatusRequestedRangeNotSatisfiable && logTell[fnm] > 0) {
+                                       continue
+                               } else if resp.StatusCode >= 300 {
+                                       log.Errorf("error getting log data: %s", resp.Status)
+                                       continue
+                               }
+                               logdata, err := io.ReadAll(resp.Body)
+                               if err != nil {
+                                       log.Errorf("error reading log data: %s", err)
+                                       continue
+                               }
+                               if len(logdata) == 0 {
+                                       continue
+                               }
+                               for {
+                                       eol := bytes.Index(logdata, []byte{'\n'})
+                                       if eol < 0 {
+                                               break
+                                       }
+                                       line := string(logdata[:eol])
+                                       logdata = logdata[eol+1:]
+                                       logTell[fnm] += int64(eol + 1)
+                                       if len(line) == 0 {
+                                               continue
+                                       }
+                                       any = true
+                                       if fnm == "stderr.txt" {
                                                fmt.Fprint(os.Stderr, neednewline)
                                                neednewline = ""
                                                log.Print(line)
+                                       } else if fnm == "crunchstat.txt" {
+                                               m := reCrunchstat.FindStringSubmatch(line)
+                                               if m != nil {
+                                                       rss, _ := strconv.ParseInt(m[1], 10, 64)
+                                                       fmt.Fprintf(os.Stderr, "%s rss %.3f GB           \r", cr.UUID, float64(rss)/1e9)
+                                                       neednewline = "\n"
+                                               }
                                        }
                                }
-                       case "crunchstat":
-                               for _, line := range strings.Split(msg.Properties.Text, "\n") {
-                                       m := reCrunchstat.FindStringSubmatch(line)
-                                       if m != nil {
-                                               rss, _ := strconv.ParseInt(m[1], 10, 64)
-                                               fmt.Fprintf(os.Stderr, "%s rss %.3f GB           \r", cr.UUID, float64(rss)/1e9)
-                                               neednewline = "\n"
-                                       }
+                       }
+                       if any {
+                               logWait = logWaitMin
+                       } else {
+                               logWait = logWait * 2
+                               if logWait > logWaitMax {
+                                       logWait = logWaitMax
                                }
                        }
+                       logWaitDone = time.After(logWait)
                }
        }
        fmt.Fprint(os.Stderr, neednewline)