Merge branch '17395-container-output-storage-class' into main
authorPeter Amstutz <peter.amstutz@curii.com>
Thu, 1 Jul 2021 20:13:58 +0000 (16:13 -0400)
committerPeter Amstutz <peter.amstutz@curii.com>
Thu, 1 Jul 2021 20:13:58 +0000 (16:13 -0400)
refs #17395

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

1  2 
services/keepproxy/keepproxy.go

index c679e5b91cff8f9b86daa0b8ccda60aa20b36f96,740ba9b1cbf849462a20f251effea867fba48c29..dd67aff797281829cd21da95e0400448775b9539
@@@ -16,6 -16,7 +16,6 @@@ import 
        "os/signal"
        "regexp"
        "strings"
 -      "sync"
        "syscall"
        "time"
  
@@@ -28,7 -29,6 +28,7 @@@
        "github.com/coreos/go-systemd/daemon"
        "github.com/ghodss/yaml"
        "github.com/gorilla/mux"
 +      lru "github.com/hashicorp/golang-lru"
        log "github.com/sirupsen/logrus"
  )
  
@@@ -163,53 -163,45 +163,53 @@@ func run(logger log.FieldLogger, cluste
        signal.Notify(term, syscall.SIGINT)
  
        // Start serving requests.
 -      router = MakeRESTRouter(kc, time.Duration(keepclient.DefaultProxyRequestTimeout), cluster.ManagementToken)
 +      router, err = MakeRESTRouter(kc, time.Duration(keepclient.DefaultProxyRequestTimeout), cluster, logger)
 +      if err != nil {
 +              return err
 +      }
        return http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
  }
  
 +type TokenCacheEntry struct {
 +      expire int64
 +      user   *arvados.User
 +}
 +
  type APITokenCache struct {
 -      tokens     map[string]int64
 -      lock       sync.Mutex
 +      tokens     *lru.TwoQueueCache
        expireTime int64
  }
  
 -// RememberToken caches the token and set an expire time.  If we already have
 -// an expire time on the token, it is not updated.
 -func (cache *APITokenCache) RememberToken(token string) {
 -      cache.lock.Lock()
 -      defer cache.lock.Unlock()
 -
 +// RememberToken caches the token and set an expire time.  If the
 +// token is already in the cache, it is not updated.
 +func (cache *APITokenCache) RememberToken(token string, user *arvados.User) {
        now := time.Now().Unix()
 -      if cache.tokens[token] == 0 {
 -              cache.tokens[token] = now + cache.expireTime
 +      _, ok := cache.tokens.Get(token)
 +      if !ok {
 +              cache.tokens.Add(token, TokenCacheEntry{
 +                      expire: now + cache.expireTime,
 +                      user:   user,
 +              })
        }
  }
  
  // RecallToken checks if the cached token is known and still believed to be
  // valid.
 -func (cache *APITokenCache) RecallToken(token string) bool {
 -      cache.lock.Lock()
 -      defer cache.lock.Unlock()
 +func (cache *APITokenCache) RecallToken(token string) (bool, *arvados.User) {
 +      val, ok := cache.tokens.Get(token)
 +      if !ok {
 +              return false, nil
 +      }
  
 +      cacheEntry := val.(TokenCacheEntry)
        now := time.Now().Unix()
 -      if cache.tokens[token] == 0 {
 -              // Unknown token
 -              return false
 -      } else if now < cache.tokens[token] {
 +      if now < cacheEntry.expire {
                // Token is known and still valid
 -              return true
 +              return true, cacheEntry.user
        } else {
                // Token is expired
 -              cache.tokens[token] = 0
 -              return false
 +              cache.tokens.Remove(token)
 +              return false, nil
        }
  }
  
@@@ -224,10 -216,10 +224,10 @@@ func GetRemoteAddress(req *http.Request
        return req.RemoteAddr
  }
  
 -func CheckAuthorizationHeader(kc *keepclient.KeepClient, cache *APITokenCache, req *http.Request) (pass bool, tok string) {
 +func (h *proxyHandler) CheckAuthorizationHeader(req *http.Request) (pass bool, tok string, user *arvados.User) {
        parts := strings.SplitN(req.Header.Get("Authorization"), " ", 2)
        if len(parts) < 2 || !(parts[0] == "OAuth2" || parts[0] == "Bearer") || len(parts[1]) == 0 {
 -              return false, ""
 +              return false, "", nil
        }
        tok = parts[1]
  
                op = "write"
        }
  
 -      if cache.RecallToken(op + ":" + tok) {
 +      if ok, user := h.APITokenCache.RecallToken(op + ":" + tok); ok {
                // Valid in the cache, short circuit
 -              return true, tok
 +              return true, tok, user
        }
  
        var err error
 -      arv := *kc.Arvados
 +      arv := *h.KeepClient.Arvados
        arv.ApiToken = tok
        arv.RequestID = req.Header.Get("X-Request-Id")
 -      if op == "read" {
 -              err = arv.Call("HEAD", "keep_services", "", "accessible", nil, nil)
 -      } else {
 -              err = arv.Call("HEAD", "users", "", "current", nil, nil)
 +      user = &arvados.User{}
 +      userCurrentError := arv.Call("GET", "users", "", "current", nil, user)
 +      err = userCurrentError
 +      if err != nil && op == "read" {
 +              apiError, ok := err.(arvadosclient.APIServerError)
 +              if ok && apiError.HttpStatusCode == http.StatusForbidden {
 +                      // If it was a scoped "sharing" token it will
 +                      // return 403 instead of 401 for the current
 +                      // user check.  If it is a download operation
 +                      // and they have permission to read the
 +                      // keep_services table, we can allow it.
 +                      err = arv.Call("HEAD", "keep_services", "", "accessible", nil, nil)
 +              }
        }
        if err != nil {
                log.Printf("%s: CheckAuthorizationHeader error: %v", GetRemoteAddress(req), err)
 -              return false, ""
 +              return false, "", nil
 +      }
 +
 +      if userCurrentError == nil && user.IsAdmin {
 +              // checking userCurrentError is probably redundant,
 +              // IsAdmin would be false anyway. But can't hurt.
 +              if op == "read" && !h.cluster.Collections.KeepproxyPermission.Admin.Download {
 +                      return false, "", nil
 +              }
 +              if op == "write" && !h.cluster.Collections.KeepproxyPermission.Admin.Upload {
 +                      return false, "", nil
 +              }
 +      } else {
 +              if op == "read" && !h.cluster.Collections.KeepproxyPermission.User.Download {
 +                      return false, "", nil
 +              }
 +              if op == "write" && !h.cluster.Collections.KeepproxyPermission.User.Upload {
 +                      return false, "", nil
 +              }
        }
  
        // Success!  Update cache
 -      cache.RememberToken(op + ":" + tok)
 +      h.APITokenCache.RememberToken(op+":"+tok, user)
  
 -      return true, tok
 +      return true, tok, user
  }
  
  // We need to make a private copy of the default http transport early
@@@ -308,13 -273,11 +308,13 @@@ type proxyHandler struct 
        *APITokenCache
        timeout   time.Duration
        transport *http.Transport
 +      logger    log.FieldLogger
 +      cluster   *arvados.Cluster
  }
  
  // MakeRESTRouter returns an http.Handler that passes GET and PUT
  // requests to the appropriate handlers.
 -func MakeRESTRouter(kc *keepclient.KeepClient, timeout time.Duration, mgmtToken string) http.Handler {
 +func MakeRESTRouter(kc *keepclient.KeepClient, timeout time.Duration, cluster *arvados.Cluster, logger log.FieldLogger) (http.Handler, error) {
        rest := mux.NewRouter()
  
        transport := defaultTransport
        transport.TLSClientConfig = arvadosclient.MakeTLSConfig(kc.Arvados.ApiInsecure)
        transport.TLSHandshakeTimeout = keepclient.DefaultTLSHandshakeTimeout
  
 +      cacheQ, err := lru.New2Q(500)
 +      if err != nil {
 +              return nil, fmt.Errorf("Error from lru.New2Q: %v", err)
 +      }
 +
        h := &proxyHandler{
                Handler:    rest,
                KeepClient: kc,
                timeout:    timeout,
                transport:  &transport,
                APITokenCache: &APITokenCache{
 -                      tokens:     make(map[string]int64),
 +                      tokens:     cacheQ,
                        expireTime: 300,
                },
 +              logger:  logger,
 +              cluster: cluster,
        }
  
        rest.HandleFunc(`/{locator:[0-9a-f]{32}\+.*}`, h.Get).Methods("GET", "HEAD")
        rest.HandleFunc(`/`, h.Options).Methods("OPTIONS")
  
        rest.Handle("/_health/{check}", &health.Handler{
 -              Token:  mgmtToken,
 +              Token:  cluster.ManagementToken,
                Prefix: "/_health/",
        }).Methods("GET")
  
        rest.NotFoundHandler = InvalidPathHandler{}
 -      return h
 +      return h, nil
  }
  
  var errLoopDetected = errors.New("loop detected")
  
 -func (*proxyHandler) checkLoop(resp http.ResponseWriter, req *http.Request) error {
 +func (*proxyHandler) checkLoop(resp http.ResponseWriter, req *http.Request) error {
        if via := req.Header.Get("Via"); strings.Index(via, " "+viaAlias) >= 0 {
 -              log.Printf("proxy loop detected (request has Via: %q): perhaps keepproxy is misidentified by gateway config as an external client, or its keep_services record does not have service_type=proxy?", via)
 +              h.logger.Printf("proxy loop detected (request has Via: %q): perhaps keepproxy is misidentified by gateway config as an external client, or its keep_services record does not have service_type=proxy?", via)
                http.Error(resp, errLoopDetected.Error(), http.StatusInternalServerError)
                return errLoopDetected
        }
@@@ -398,7 -354,7 +398,7 @@@ func (h *proxyHandler) Options(resp htt
        SetCorsHeaders(resp)
  }
  
 -var errBadAuthorizationHeader = errors.New("Missing or invalid Authorization header")
 +var errBadAuthorizationHeader = errors.New("Missing or invalid Authorization header, or method not allowed")
  var errContentLengthMismatch = errors.New("Actual length != expected content length")
  var errMethodNotSupported = errors.New("Method not supported")
  
@@@ -428,8 -384,7 +428,8 @@@ func (h *proxyHandler) Get(resp http.Re
  
        var pass bool
        var tok string
 -      if pass, tok = CheckAuthorizationHeader(kc, h.APITokenCache, req); !pass {
 +      var user *arvados.User
 +      if pass, tok, user = h.CheckAuthorizationHeader(req); !pass {
                status, err = http.StatusForbidden, errBadAuthorizationHeader
                return
        }
  
        locator = removeHint.ReplaceAllString(locator, "$1")
  
 +      if locator != "" {
 +              parts := strings.SplitN(locator, "+", 3)
 +              if len(parts) >= 2 {
 +                      logger := h.logger
 +                      if user != nil {
 +                              logger = logger.WithField("user_uuid", user.UUID).
 +                                      WithField("user_full_name", user.FullName)
 +                      }
 +                      logger.WithField("locator", fmt.Sprintf("%s+%s", parts[0], parts[1])).Infof("Block download")
 +              }
 +      }
 +
        switch req.Method {
        case "HEAD":
                expectLength, proxiedURI, err = kc.Ask(locator)
@@@ -531,7 -474,7 +531,7 @@@ func (h *proxyHandler) Put(resp http.Re
                for _, sc := range strings.Split(req.Header.Get("X-Keep-Storage-Classes"), ",") {
                        scl = append(scl, strings.Trim(sc, " "))
                }
-               kc.StorageClasses = scl
+               kc.SetStorageClasses(scl)
        }
  
        _, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
  
        var pass bool
        var tok string
 -      if pass, tok = CheckAuthorizationHeader(kc, h.APITokenCache, req); !pass {
 +      var user *arvados.User
 +      if pass, tok, user = h.CheckAuthorizationHeader(req); !pass {
                err = errBadAuthorizationHeader
                status = http.StatusForbidden
                return
                locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
        }
  
 +      if locatorOut != "" {
 +              parts := strings.SplitN(locatorOut, "+", 3)
 +              if len(parts) >= 2 {
 +                      logger := h.logger
 +                      if user != nil {
 +                              logger = logger.WithField("user_uuid", user.UUID).
 +                                      WithField("user_full_name", user.FullName)
 +                      }
 +                      logger.WithField("locator", fmt.Sprintf("%s+%s", parts[0], parts[1])).Infof("Block upload")
 +              }
 +      }
 +
        // Tell the client how many successful PUTs we accomplished
        resp.Header().Set(keepclient.XKeepReplicasStored, fmt.Sprintf("%d", wroteReplicas))
  
@@@ -655,7 -585,7 +655,7 @@@ func (h *proxyHandler) Index(resp http.
        }()
  
        kc := h.makeKeepClient(req)
 -      ok, token := CheckAuthorizationHeader(kc, h.APITokenCache, req)
 +      ok, token, _ := h.CheckAuthorizationHeader(req)
        if !ok {
                status, err = http.StatusForbidden, errBadAuthorizationHeader
                return