Merge branch '16561-listen-url'
authorTom Clegg <tom@curii.com>
Wed, 29 Jun 2022 17:37:15 +0000 (13:37 -0400)
committerTom Clegg <tom@curii.com>
Wed, 29 Jun 2022 17:37:15 +0000 (13:37 -0400)
closes #16561

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

21 files changed:
doc/api/methods/containers.html.textile.liquid
lib/boot/supervisor.go
lib/controller/handler.go
lib/controller/handler_test.go
lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/dispatchcloud/worker/worker.go
lib/service/cmd.go
sdk/go/arvados/fs_base.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/fs_lookup.go
sdk/go/arvados/fs_site_test.go
sdk/go/httpserver/inspect.go [new file with mode: 0644]
sdk/go/httpserver/inspect_test.go [new file with mode: 0644]
sdk/go/httpserver/logger.go
sdk/go/httpserver/request_limiter_test.go
services/api/app/controllers/database_controller.rb
services/api/app/models/container.rb
services/api/test/unit/container_test.rb
services/keep-web/server_test.go

index 76e5730c9f14605989cb9f179a3997e1990c06a3..43163c555053f1cf749c749fa073e306ceaccd12 100644 (file)
@@ -52,8 +52,8 @@ Generally this will contain additional keys that are not present in any correspo
 |output|string|Portable data hash of the output collection.|Null if the container is not yet finished.|
 |container_image|string|Portable data hash of a collection containing the docker image used to run the container.||
 |progress|number|A number between 0.0 and 1.0 describing the fraction of work done.||
-|priority|integer|Range 0-1000.  Indicate scheduling order preference.|Currently assigned by the system as the max() of the priorities of all associated ContainerRequests.  See "container request priority":container_requests.html#priority .|
-|exit_code|integer|Process exit code.|Null if state!="Complete"|
+|priority|integer|Range 0-1000.  Indicate scheduling order preference.|Currently assigned by the system as the max() of the priorities of all associated ContainerRequests.  See "container request priority":container_requests.html#priority.|
+|exit_code|integer|Process exit code.|Null if container process has not exited yet.|
 |auth_uuid|string|UUID of a token to be passed into the container itself, used to access Keep-backed mounts, etc.  Automatically assigned.|Null if state∉{"Locked","Running"} or if @runtime_token@ was provided.|
 |locked_by_uuid|string|UUID of a token, indicating which dispatch process changed state to Locked. If null, any token can be used to lock. If not null, only the indicated token can modify this container.|Null if state∉{"Locked","Running"}|
 |runtime_token|string|A v2 token to be passed into the container itself, used to access Keep-backed mounts, etc.|Not returned in API responses.  Reset to null when state is "Complete" or "Cancelled".|
index 7e641c62dd4407a69f6fbf7b3eeb6ff2cd97e002..8eb375b874853821944cef1c87bbd650bd6547ee 100644 (file)
@@ -848,7 +848,8 @@ func (super *Supervisor) autofillConfig() error {
                        }
                }
                if super.NoWorkbench1 && svc == &super.cluster.Services.Workbench1 ||
-                       super.NoWorkbench2 && svc == &super.cluster.Services.Workbench2 {
+                       super.NoWorkbench2 && svc == &super.cluster.Services.Workbench2 ||
+                       !super.cluster.Containers.CloudVMs.Enable && svc == &super.cluster.Services.DispatchCloud {
                        // When workbench1 is disabled, it gets an
                        // ExternalURL (so we have a valid listening
                        // port to write in our Nginx config) but no
index f5840b34ce72cd18da4d75c4d27dbb23920e53dd..665fd5c636372fc4a21bd7de68c5d886aafbcc7c 100644 (file)
@@ -13,7 +13,6 @@ import (
        "net/url"
        "strings"
        "sync"
-       "time"
 
        "git.arvados.org/arvados.git/lib/controller/api"
        "git.arvados.org/arvados.git/lib/controller/federation"
@@ -61,12 +60,6 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
                        req.URL.Path = strings.Replace(req.URL.Path, "//", "/", -1)
                }
        }
-       if h.Cluster.API.RequestTimeout > 0 {
-               ctx, cancel := context.WithDeadline(req.Context(), time.Now().Add(time.Duration(h.Cluster.API.RequestTimeout)))
-               req = req.WithContext(ctx)
-               defer cancel()
-       }
-
        h.handlerStack.ServeHTTP(w, req)
 }
 
index 5e467cb0588607d3deaa06c1d92326ed18f8f09c..39c2b1c68e5c82921e10bc9125d54e17846a8fed 100644 (file)
@@ -204,17 +204,21 @@ func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) {
        c.Check(len(dd.Schemas), check.Not(check.Equals), 0)
 }
 
-func (s *HandlerSuite) TestRequestTimeout(c *check.C) {
-       s.cluster.API.RequestTimeout = arvados.Duration(time.Nanosecond)
-       req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil)
+// Handler should give up and exit early if request context is
+// cancelled due to client hangup, httpserver.HandlerWithDeadline,
+// etc.
+func (s *HandlerSuite) TestRequestCancel(c *check.C) {
+       ctx, cancel := context.WithCancel(context.Background())
+       req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil).WithContext(ctx)
        resp := httptest.NewRecorder()
+       cancel()
        s.handler.ServeHTTP(resp, req)
        c.Check(resp.Code, check.Equals, http.StatusBadGateway)
        var jresp httpserver.ErrorResponse
        err := json.Unmarshal(resp.Body.Bytes(), &jresp)
        c.Check(err, check.IsNil)
        c.Assert(len(jresp.Errors), check.Equals, 1)
-       c.Check(jresp.Errors[0], check.Matches, `.*context deadline exceeded.*`)
+       c.Check(jresp.Errors[0], check.Matches, `.*context canceled`)
 }
 
 func (s *HandlerSuite) TestProxyWithoutToken(c *check.C) {
index 30871e734911ea2e56fdd7172ef261b65c726ff2..0e86f604a7733b1a12296c0f389386d5d352526b 100644 (file)
@@ -1095,6 +1095,12 @@ func (runner *ContainerRunner) WaitFinish() error {
                }
        }
        runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
+       err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "container": arvadosclient.Dict{"exit_code": exitcode},
+       }, nil)
+       if err != nil {
+               runner.CrunchLog.Printf("ignoring error updating exit_code: %s", err)
+       }
 
        var returnErr error
        if err = runner.executorStdin.Close(); err != nil {
@@ -1162,10 +1168,9 @@ func (runner *ContainerRunner) updateLogs() {
                        continue
                }
 
-               var updated arvados.Container
                err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
                        "container": arvadosclient.Dict{"log": saved.PortableDataHash},
-               }, &updated)
+               }, nil)
                if err != nil {
                        runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
                        continue
@@ -1443,13 +1448,13 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
        if runner.LogsPDH != nil {
                update["log"] = *runner.LogsPDH
        }
-       if runner.finalState == "Complete" {
-               if runner.ExitCode != nil {
-                       update["exit_code"] = *runner.ExitCode
-               }
-               if runner.OutputPDH != nil {
-                       update["output"] = *runner.OutputPDH
-               }
+       if runner.ExitCode != nil {
+               update["exit_code"] = *runner.ExitCode
+       } else {
+               update["exit_code"] = nil
+       }
+       if runner.finalState == "Complete" && runner.OutputPDH != nil {
+               update["output"] = *runner.OutputPDH
        }
        return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
 }
index 99717578932793599430c576d75d2a5d962f352b..76289b951d6b18014327c5c049d30af188db78ae 100644 (file)
@@ -276,7 +276,7 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
                if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
                        client.WasSetRunning = true
                }
-       } else if resourceType == "collections" {
+       } else if resourceType == "collections" && output != nil {
                mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
                output.(*arvados.Collection).UUID = uuid
                output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt)))
index 1c8d62c20ee40571e3f1789451e3b46d148abf4f..b01a820cd619b172538b725d689d0323897611d5 100644 (file)
@@ -418,6 +418,12 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
                        // empty string following final newline
                } else if s == "broken" {
                        reportsBroken = true
+               } else if !strings.HasPrefix(s, wkr.wp.cluster.ClusterID) {
+                       // Ignore crunch-run processes that belong to
+                       // a different cluster (e.g., a single host
+                       // running multiple clusters with the loopback
+                       // driver)
+                       continue
                } else if toks := strings.Split(s, " "); len(toks) == 1 {
                        running = append(running, s)
                } else if toks[1] == "stale" {
index 04c3e170ec6e093bac2e701e51e06894aa42009c..4b640c4e4773225ccb0e9312bc18a436552e9cfb 100644 (file)
@@ -147,9 +147,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        instrumented := httpserver.Instrument(reg, log,
                httpserver.HandlerWithDeadline(cluster.API.RequestTimeout.Duration(),
                        httpserver.AddRequestIDs(
-                               httpserver.LogRequests(
-                                       interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
-                                               httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg))))))
+                               httpserver.Inspect(reg, cluster.ManagementToken,
+                                       httpserver.LogRequests(
+                                               interceptHealthReqs(cluster.ManagementToken, handler.CheckHealth,
+                                                       httpserver.NewRequestLimiter(cluster.API.MaxConcurrentRequests, handler, reg)))))))
        srv := &httpserver.Server{
                Server: http.Server{
                        Handler:     ifCollectionInHost(instrumented, instrumented.ServeAPI(cluster.ManagementToken, instrumented)),
index bebb74261e4767dd917a919f170fd47312c7939d..ce9253ab3d4f5d5447273cfe02edca716afc52fd 100644 (file)
@@ -415,7 +415,7 @@ func (n *treenode) MemorySize() (size int64) {
        for _, inode := range n.inodes {
                size += inode.MemorySize()
        }
-       return
+       return 64 + size
 }
 
 type fileSystem struct {
index f4dae746e2a72a1a6384857749cf7261f5465917..ccfbdc4da262c13ee3d319ad072f73a10b9b1d0a 100644 (file)
@@ -1159,15 +1159,17 @@ func (dn *dirnode) MemorySize() (size int64) {
                case *dirnode:
                        size += node.MemorySize()
                case *filenode:
+                       size += 64
                        for _, seg := range node.segments {
                                switch seg := seg.(type) {
                                case *memSegment:
                                        size += int64(seg.Len())
                                }
+                               size += 64
                        }
                }
        }
-       return
+       return 64 + size
 }
 
 // caller must have write lock.
index b221aaa083a12fd1f13fd3eaec3d9c0f85ae61b5..c2cac3c6ce2e963b36b7654729e56524ba9bc2db 100644 (file)
@@ -1221,7 +1221,8 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
                        c.Assert(err, check.IsNil)
                }
        }
-       c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20))
+       inodebytes := int64((nDirs*(67*2+1) + 1) * 64)
+       c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20)+inodebytes)
        c.Check(flushed, check.Equals, int64(0))
 
        waitForFlush := func(expectUnflushed, expectFlushed int64) {
@@ -1232,27 +1233,27 @@ func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
        }
 
        // Nothing flushed yet
-       waitForFlush((nDirs*67)<<20, 0)
+       waitForFlush((nDirs*67)<<20+inodebytes, 0)
 
        // Flushing a non-empty dir "/" is non-recursive and there are
        // no top-level files, so this has no effect
        fs.Flush("/", false)
-       waitForFlush((nDirs*67)<<20, 0)
+       waitForFlush((nDirs*67)<<20+inodebytes, 0)
 
        // Flush the full block in dir0
        fs.Flush("dir0", false)
-       waitForFlush((nDirs*67-64)<<20, 64<<20)
+       waitForFlush((nDirs*67-64)<<20+inodebytes, 64<<20)
 
        err = fs.Flush("dir-does-not-exist", false)
        c.Check(err, check.NotNil)
 
        // Flush full blocks in all dirs
        fs.Flush("", false)
-       waitForFlush(nDirs*3<<20, nDirs*64<<20)
+       waitForFlush(nDirs*3<<20+inodebytes, nDirs*64<<20)
 
        // Flush non-full blocks, too
        fs.Flush("", true)
-       waitForFlush(0, nDirs*67<<20)
+       waitForFlush(inodebytes, nDirs*67<<20)
 }
 
 // Even when writing lots of files/dirs from different goroutines, as
index 471dc69c82f75318d290eaccfa7b8d4a542540f1..2bb09995e16e476b3889abbdc1c61b6fc1abbd15 100644 (file)
@@ -6,7 +6,6 @@ package arvados
 
 import (
        "os"
-       "sync"
        "time"
 )
 
@@ -21,9 +20,8 @@ type lookupnode struct {
        stale   func(time.Time) bool
 
        // internal fields
-       staleLock sync.Mutex
-       staleAll  time.Time
-       staleOne  map[string]time.Time
+       staleAll time.Time
+       staleOne map[string]time.Time
 }
 
 // Sync flushes pending writes for loaded children and, if successful,
@@ -33,29 +31,28 @@ func (ln *lookupnode) Sync() error {
        if err != nil {
                return err
        }
-       ln.staleLock.Lock()
+       ln.Lock()
        ln.staleAll = time.Time{}
        ln.staleOne = nil
-       ln.staleLock.Unlock()
+       ln.Unlock()
        return nil
 }
 
 func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
-       ln.staleLock.Lock()
-       defer ln.staleLock.Unlock()
+       ln.Lock()
        checkTime := time.Now()
        if ln.stale(ln.staleAll) {
                all, err := ln.loadAll(ln)
                if err != nil {
+                       ln.Unlock()
                        return nil, err
                }
                for _, child := range all {
-                       ln.treenode.Lock()
                        _, err = ln.treenode.Child(child.FileInfo().Name(), func(inode) (inode, error) {
                                return child, nil
                        })
-                       ln.treenode.Unlock()
                        if err != nil {
+                               ln.Unlock()
                                return nil, err
                        }
                }
@@ -65,6 +62,7 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
                // newer than ln.staleAll. Reclaim memory.
                ln.staleOne = nil
        }
+       ln.Unlock()
        return ln.treenode.Readdir()
 }
 
@@ -72,8 +70,6 @@ func (ln *lookupnode) Readdir() ([]os.FileInfo, error) {
 // children, instead calling loadOne when a non-existing child is
 // looked up.
 func (ln *lookupnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
-       ln.staleLock.Lock()
-       defer ln.staleLock.Unlock()
        checkTime := time.Now()
        var existing inode
        var err error
index bf24efa7ed055f78fe2db76c56f43794cf0d0e0b..3abe2b457f702b510a46f97ccf528f35f337eb92 100644 (file)
@@ -11,6 +11,7 @@ import (
        "net/http"
        "os"
        "strings"
+       "sync"
        "syscall"
        "time"
 
@@ -372,3 +373,117 @@ func (s *SiteFSSuite) TestSnapshotSplice(c *check.C) {
                c.Check(string(buf), check.Equals, string(thisfile))
        }
 }
+
+func (s *SiteFSSuite) TestLocks(c *check.C) {
+       DebugLocksPanicMode = false
+       done := make(chan struct{})
+       defer close(done)
+       ticker := time.NewTicker(2 * time.Second)
+       go func() {
+               for {
+                       timeout := time.AfterFunc(5*time.Second, func() {
+                               // c.FailNow() doesn't break deadlock, but this sure does
+                               panic("timed out -- deadlock?")
+                       })
+                       select {
+                       case <-done:
+                               timeout.Stop()
+                               return
+                       case <-ticker.C:
+                               c.Logf("MemorySize == %d", s.fs.MemorySize())
+                       }
+                       timeout.Stop()
+               }
+       }()
+       ncolls := 5
+       ndirs := 3
+       nfiles := 5
+       projects := make([]Group, 5)
+       for pnum := range projects {
+               c.Logf("make project %d", pnum)
+               err := s.client.RequestAndDecode(&projects[pnum], "POST", "arvados/v1/groups", nil, map[string]interface{}{
+                       "group": map[string]string{
+                               "name":        fmt.Sprintf("TestLocks project %d", pnum),
+                               "owner_uuid":  fixtureAProjectUUID,
+                               "group_class": "project",
+                       },
+                       "ensure_unique_name": true,
+               })
+               c.Assert(err, check.IsNil)
+               for cnum := 0; cnum < ncolls; cnum++ {
+                       c.Logf("make project %d collection %d", pnum, cnum)
+                       var coll Collection
+                       err = s.client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+                               "collection": map[string]string{
+                                       "name":       fmt.Sprintf("TestLocks collection %d", cnum),
+                                       "owner_uuid": projects[pnum].UUID,
+                               },
+                       })
+                       c.Assert(err, check.IsNil)
+                       for d1num := 0; d1num < ndirs; d1num++ {
+                               s.fs.Mkdir(fmt.Sprintf("/by_id/%s/dir1-%d", coll.UUID, d1num), 0777)
+                               for d2num := 0; d2num < ndirs; d2num++ {
+                                       s.fs.Mkdir(fmt.Sprintf("/by_id/%s/dir1-%d/dir2-%d", coll.UUID, d1num, d2num), 0777)
+                                       for fnum := 0; fnum < nfiles; fnum++ {
+                                               f, err := s.fs.OpenFile(fmt.Sprintf("/by_id/%s/dir1-%d/dir2-%d/file-%d", coll.UUID, d1num, d2num, fnum), os.O_CREATE|os.O_RDWR, 0755)
+                                               c.Assert(err, check.IsNil)
+                                               f.Close()
+                                               f, err = s.fs.OpenFile(fmt.Sprintf("/by_id/%s/dir1-%d/file-%d", coll.UUID, d1num, fnum), os.O_CREATE|os.O_RDWR, 0755)
+                                               c.Assert(err, check.IsNil)
+                                               f.Close()
+                                       }
+                               }
+                       }
+               }
+       }
+       c.Log("sync")
+       s.fs.Sync()
+       var wg sync.WaitGroup
+       for n := 0; n < 100; n++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for pnum, project := range projects {
+                               c.Logf("read project %d", pnum)
+                               if pnum%2 == 0 {
+                                       f, err := s.fs.Open(fmt.Sprintf("/by_id/%s", project.UUID))
+                                       c.Assert(err, check.IsNil)
+                                       f.Readdir(-1)
+                                       f.Close()
+                               }
+                               for cnum := 0; cnum < ncolls; cnum++ {
+                                       c.Logf("read project %d collection %d", pnum, cnum)
+                                       if pnum%2 == 0 {
+                                               f, err := s.fs.Open(fmt.Sprintf("/by_id/%s/TestLocks collection %d", project.UUID, cnum))
+                                               c.Assert(err, check.IsNil)
+                                               _, err = f.Readdir(-1)
+                                               c.Assert(err, check.IsNil)
+                                               f.Close()
+                                       }
+                                       if pnum%3 == 0 {
+                                               for d1num := 0; d1num < ndirs; d1num++ {
+                                                       f, err := s.fs.Open(fmt.Sprintf("/by_id/%s/TestLocks collection %d/dir1-%d", project.UUID, cnum, d1num))
+                                                       c.Assert(err, check.IsNil)
+                                                       fis, err := f.Readdir(-1)
+                                                       c.Assert(err, check.IsNil)
+                                                       c.Assert(fis, check.HasLen, ndirs+nfiles)
+                                                       f.Close()
+                                               }
+                                       }
+                                       for d1num := 0; d1num < ndirs; d1num++ {
+                                               for d2num := 0; d2num < ndirs; d2num++ {
+                                                       f, err := s.fs.Open(fmt.Sprintf("/by_id/%s/TestLocks collection %d/dir1-%d/dir2-%d", project.UUID, cnum, d1num, d2num))
+                                                       c.Assert(err, check.IsNil)
+                                                       fis, err := f.Readdir(-1)
+                                                       c.Assert(err, check.IsNil)
+                                                       c.Assert(fis, check.HasLen, nfiles)
+                                                       f.Close()
+                                               }
+                                       }
+                               }
+                       }
+               }()
+       }
+       wg.Wait()
+       c.Logf("MemorySize == %d", s.fs.MemorySize())
+}
diff --git a/sdk/go/httpserver/inspect.go b/sdk/go/httpserver/inspect.go
new file mode 100644 (file)
index 0000000..cb08acf
--- /dev/null
@@ -0,0 +1,133 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+       "encoding/json"
+       "net/http"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+// Inspect serves a report of current requests at "GET
+// /_inspect/requests", and passes other requests through to the next
+// handler.
+//
+// If registry is not nil, Inspect registers metrics about current
+// requests.
+func Inspect(registry *prometheus.Registry, authToken string, next http.Handler) http.Handler {
+       type ent struct {
+               startTime  time.Time
+               hangupTime atomic.Value
+       }
+       current := map[*http.Request]*ent{}
+       mtx := sync.Mutex{}
+       if registry != nil {
+               registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "max_active_request_age_seconds",
+                               Help:      "Age of oldest active request",
+                       },
+                       func() float64 {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               earliest := time.Time{}
+                               any := false
+                               for _, e := range current {
+                                       if _, ok := e.hangupTime.Load().(time.Time); ok {
+                                               // Don't count abandoned requests here
+                                               continue
+                                       }
+                                       if !any || e.startTime.Before(earliest) {
+                                               any = true
+                                               earliest = e.startTime
+                                       }
+                               }
+                               if !any {
+                                       return 0
+                               }
+                               return float64(time.Since(earliest).Seconds())
+                       },
+               ))
+               registry.MustRegister(prometheus.NewGaugeFunc(
+                       prometheus.GaugeOpts{
+                               Namespace: "arvados",
+                               Name:      "max_abandoned_request_age_seconds",
+                               Help:      "Maximum time since client hung up on a request whose processing thread is still running",
+                       },
+                       func() float64 {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               earliest := time.Time{}
+                               any := false
+                               for _, e := range current {
+                                       if hangupTime, ok := e.hangupTime.Load().(time.Time); ok {
+                                               if !any || hangupTime.Before(earliest) {
+                                                       any = true
+                                                       earliest = hangupTime
+                                               }
+                                       }
+                               }
+                               if !any {
+                                       return 0
+                               }
+                               return float64(time.Since(earliest).Seconds())
+                       },
+               ))
+       }
+       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               if req.Method == "GET" && req.URL.Path == "/_inspect/requests" {
+                       if authToken == "" || req.Header.Get("Authorization") != "Bearer "+authToken {
+                               Error(w, "unauthorized", http.StatusUnauthorized)
+                               return
+                       }
+                       mtx.Lock()
+                       defer mtx.Unlock()
+                       type outrec struct {
+                               RequestID  string
+                               Method     string
+                               Host       string
+                               URL        string
+                               RemoteAddr string
+                               Elapsed    float64
+                       }
+                       now := time.Now()
+                       outrecs := []outrec{}
+                       for req, e := range current {
+                               outrecs = append(outrecs, outrec{
+                                       RequestID:  req.Header.Get(HeaderRequestID),
+                                       Method:     req.Method,
+                                       Host:       req.Host,
+                                       URL:        req.URL.String(),
+                                       RemoteAddr: req.RemoteAddr,
+                                       Elapsed:    now.Sub(e.startTime).Seconds(),
+                               })
+                       }
+                       sort.Slice(outrecs, func(i, j int) bool { return outrecs[i].Elapsed < outrecs[j].Elapsed })
+                       w.Header().Set("Content-Type", "application/json")
+                       json.NewEncoder(w).Encode(outrecs)
+               } else {
+                       e := ent{startTime: time.Now()}
+                       mtx.Lock()
+                       current[req] = &e
+                       mtx.Unlock()
+                       go func() {
+                               <-req.Context().Done()
+                               e.hangupTime.Store(time.Now())
+                       }()
+                       defer func() {
+                               mtx.Lock()
+                               defer mtx.Unlock()
+                               delete(current, req)
+                       }()
+                       next.ServeHTTP(w, req)
+               }
+       })
+}
diff --git a/sdk/go/httpserver/inspect_test.go b/sdk/go/httpserver/inspect_test.go
new file mode 100644 (file)
index 0000000..624cedb
--- /dev/null
@@ -0,0 +1,98 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package httpserver
+
+import (
+       "context"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+       "strings"
+       "time"
+
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       check "gopkg.in/check.v1"
+)
+
+func (s *Suite) TestInspect(c *check.C) {
+       reg := prometheus.NewRegistry()
+       h := newTestHandler()
+       mh := Inspect(reg, "abcd", h)
+       handlerReturned := make(chan struct{})
+       reqctx, reqcancel := context.WithCancel(context.Background())
+       longreq := httptest.NewRequest("GET", "/test", nil).WithContext(reqctx)
+       go func() {
+               mh.ServeHTTP(httptest.NewRecorder(), longreq)
+               close(handlerReturned)
+       }()
+       <-h.inHandler
+
+       resp := httptest.NewRecorder()
+       req := httptest.NewRequest("GET", "/_inspect/requests", nil)
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+       c.Check(resp.Body.String(), check.Equals, `{"errors":["unauthorized"]}`+"\n")
+
+       resp = httptest.NewRecorder()
+       req.Header.Set("Authorization", "Bearer abcde")
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+
+       resp = httptest.NewRecorder()
+       req.Header.Set("Authorization", "Bearer abcd")
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       reqs := []map[string]interface{}{}
+       err := json.NewDecoder(resp.Body).Decode(&reqs)
+       c.Check(err, check.IsNil)
+       c.Check(reqs, check.HasLen, 1)
+       c.Check(reqs[0]["URL"], check.Equals, "/test")
+
+       // Request is active, so we should see active request age > 0
+       resp = httptest.NewRecorder()
+       mreq := httptest.NewRequest("GET", "/metrics", nil)
+       promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+       reqcancel()
+
+       // Request context is canceled but handler hasn't returned, so
+       // we should see max abandoned request age > 0 and active ==
+       // 0. We might need to wait a short time for the cancel to
+       // propagate.
+       for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); time.Sleep(time.Second / 100) {
+               resp = httptest.NewRecorder()
+               promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+               c.Assert(resp.Code, check.Equals, http.StatusOK)
+               if strings.Contains(resp.Body.String(), "\narvados_max_active_request_age_seconds 0\n") {
+                       break
+               }
+       }
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds [0\.]*[1-9][-\d\.e]*\n.*`)
+
+       h.okToProceed <- struct{}{}
+       <-handlerReturned
+
+       // Handler has returned, so we should see max abandoned
+       // request age == max active request age == 0
+       resp = httptest.NewRecorder()
+       promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(resp, mreq)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_active_request_age_seconds 0\n.*`)
+       c.Check(resp.Body.String(), check.Matches, `(?ms).*\narvados_max_abandoned_request_age_seconds 0\n.*`)
+
+       // ...and no active requests at the /_monitor endpoint
+       resp = httptest.NewRecorder()
+       mh.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       reqs = nil
+       err = json.NewDecoder(resp.Body).Decode(&reqs)
+       c.Check(err, check.IsNil)
+       c.Assert(reqs, check.HasLen, 0)
+}
index 5a46635e9102365bbfd01c9c9c120bd8e23a7026..b71adf71181a9eb6b550093f73dbe4ab884038ce 100644 (file)
@@ -47,7 +47,13 @@ func (hn hijackNotifier) Hijack() (net.Conn, *bufio.ReadWriter, error) {
 // HandlerWithDeadline cancels the request context if the request
 // takes longer than the specified timeout without having its
 // connection hijacked.
+//
+// If timeout is 0, there is no deadline: HandlerWithDeadline is a
+// no-op.
 func HandlerWithDeadline(timeout time.Duration, next http.Handler) http.Handler {
+       if timeout == 0 {
+               return next
+       }
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                ctx, cancel := context.WithCancel(r.Context())
                defer cancel()
index 64d1f3d4cfb3fc47930ad1d655ae97366af6efb0..9258fbfa58f4b5a4867651fc15aba4e9b9616dcf 100644 (file)
@@ -22,7 +22,7 @@ func (h *testHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        <-h.okToProceed
 }
 
-func newTestHandler(maxReqs int) *testHandler {
+func newTestHandler() *testHandler {
        return &testHandler{
                inHandler:   make(chan struct{}),
                okToProceed: make(chan struct{}),
@@ -30,7 +30,7 @@ func newTestHandler(maxReqs int) *testHandler {
 }
 
 func TestRequestLimiter1(t *testing.T) {
-       h := newTestHandler(10)
+       h := newTestHandler()
        l := NewRequestLimiter(1, h, nil)
        var wg sync.WaitGroup
        resps := make([]*httptest.ResponseRecorder, 10)
@@ -90,7 +90,7 @@ func TestRequestLimiter1(t *testing.T) {
 }
 
 func TestRequestLimiter10(t *testing.T) {
-       h := newTestHandler(10)
+       h := newTestHandler()
        l := NewRequestLimiter(10, h, nil)
        var wg sync.WaitGroup
        for i := 0; i < 10; i++ {
index 5c4cf7bc16c22ad8d8780714d9b0165cf2c4043b..fa1e1ca43c64dc0b98a0587e703f0a075e890dae 100644 (file)
@@ -6,6 +6,8 @@ class DatabaseController < ApplicationController
   skip_before_action :find_object_by_uuid
   skip_before_action :render_404_if_no_object
   before_action :admin_required
+  around_action :silence_logs, only: [:reset]
+
   def reset
     raise ArvadosModel::PermissionDeniedError unless Rails.env == 'test'
 
@@ -83,4 +85,17 @@ class DatabaseController < ApplicationController
     # Done.
     send_json success: true
   end
+
+  protected
+
+  def silence_logs
+    Rails.logger.info("(logging level temporarily raised to :error, see #{__FILE__})")
+    orig = ActiveRecord::Base.logger.level
+    ActiveRecord::Base.logger.level = :error
+    begin
+      yield
+    ensure
+      ActiveRecord::Base.logger.level = orig
+    end
+  end
 end
index 3a04c56046416771a903714e99543e40c7d66f4e..08f87bbdb13b3a4ae21ce4d26b694ecc2dd57cef 100644 (file)
@@ -478,8 +478,8 @@ class Container < ArvadosModel
 
   def validate_change
     permitted = [:state]
-    progress_attrs = [:progress, :runtime_status, :log, :output, :output_properties]
-    final_attrs = [:exit_code, :finished_at]
+    progress_attrs = [:progress, :runtime_status, :log, :output, :output_properties, :exit_code]
+    final_attrs = [:finished_at]
 
     if self.new_record?
       permitted.push(:owner_uuid, :command, :container_image, :cwd,
index ac3e6bea4280ae660042cfb4745c592dbe3c684e..bcf99da2e3442ba088d9effbe80a633e6467f857 100644 (file)
@@ -870,16 +870,14 @@ class ContainerTest < ActiveSupport::TestCase
     end
   end
 
-  test "Container only set exit code on complete" do
+  test "can only change exit code while running and at completion" do
     set_user_from_auth :active
     c, _ = minimal_new
     set_user_from_auth :dispatch1
     c.lock
+    check_illegal_updates c, [{exit_code: 1}]
     c.update_attributes! state: Container::Running
-
-    check_illegal_updates c, [{exit_code: 1},
-                              {exit_code: 1, state: Container::Cancelled}]
-
+    assert c.update_attributes(exit_code: 1)
     assert c.update_attributes(exit_code: 1, state: Container::Complete)
   end
 
@@ -933,7 +931,7 @@ class ContainerTest < ActiveSupport::TestCase
   end
 
   ["auth_uuid", "runtime_token"].each do |tok|
-    test "#{tok} can set output, progress, runtime_status, state on running container -- but not log" do
+    test "#{tok} can set output, progress, runtime_status, state, exit_code on running container -- but not log" do
       if tok == "runtime_token"
         set_user_from_auth :spectator
         c, _ = minimal_new(container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
@@ -963,6 +961,7 @@ class ContainerTest < ActiveSupport::TestCase
       assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
       assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
       assert c.update_attributes(progress: 0.5)
+      assert c.update_attributes(exit_code: 0)
       refute c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
       c.reload
       assert c.update_attributes(state: Container::Complete, exit_code: 0)
index dd8ce06172a01b3b90774f7985d8ab686f450af8..61c540808b640d6115a76e3efb70e10928b2dba3 100644 (file)
@@ -480,8 +480,10 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
        c.Check(counters["arvados_keepweb_collectioncache_hits//"].Value, check.Equals, int64(1))
        c.Check(counters["arvados_keepweb_collectioncache_pdh_hits//"].Value, check.Equals, int64(1))
        c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1))
-       // FooCollection's cached manifest size is 45 ("1f4b0....+45") plus one 51-byte blob signature
-       c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51))
+       // FooCollection's cached manifest size is 45 ("1f4b0....+45")
+       // plus one 51-byte blob signature; session fs counts 3 inodes
+       // * 64 bytes.
+       c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51+64*3))
 
        // If the Host header indicates a collection, /metrics.json
        // refers to a file in the collection -- the metrics handler