"context"
"encoding/json"
"fmt"
+ "net"
"net/http"
"os"
- "strings"
+ "sync"
"time"
"git.arvados.org/arvados.git/lib/controller/railsproxy"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
+ "github.com/hashicorp/yamux"
+ "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)
type Conn struct {
cluster *arvados.Cluster
*railsProxy // handles API methods that aren't defined on Conn itself
+ getdb func(context.Context) (*sqlx.DB, error)
vocabularyCache *arvados.Vocabulary
vocabularyFileModTime time.Time
lastVocabularyRefreshCheck time.Time
lastVocabularyError error
loginController
+ gwTunnels map[string]*yamux.Session
+ gwTunnelsLock sync.Mutex
+ activeUsers map[string]bool
+ activeUsersLock sync.Mutex
+ activeUsersReset time.Time
+
+ wantContainerPriorityUpdate chan struct{}
}
-func NewConn(cluster *arvados.Cluster) *Conn {
+func NewConn(bgCtx context.Context, cluster *arvados.Cluster, getdb func(context.Context) (*sqlx.DB, error)) *Conn {
railsProxy := railsproxy.NewConn(cluster)
railsProxy.RedactHostInErrors = true
conn := Conn{
- cluster: cluster,
- railsProxy: railsProxy,
+ cluster: cluster,
+ railsProxy: railsProxy,
+ getdb: getdb,
+ wantContainerPriorityUpdate: make(chan struct{}, 1),
}
conn.loginController = chooseLoginController(cluster, &conn)
+ go conn.runContainerPriorityUpdateThread(bgCtx)
return &conn
}
func (conn *Conn) loadVocabularyFile() error {
vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
if err != nil {
- return fmt.Errorf("couldn't reading the vocabulary file: %v", err)
+ return fmt.Errorf("while reading the vocabulary file: %v", err)
}
mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
for k := range conn.cluster.Collections.ManagedProperties {
return conn.loginController.UserAuthenticate(ctx, opts)
}
-func (conn *Conn) GroupContents(ctx context.Context, options arvados.GroupContentsOptions) (arvados.ObjectList, error) {
- // The requested UUID can be a user (virtual home project), which we just pass on to
- // the API server.
- if strings.Index(options.UUID, "-j7d0g-") != 5 {
- return conn.railsProxy.GroupContents(ctx, options)
- }
-
- var resp arvados.ObjectList
-
- // Get the group object
- respGroup, err := conn.GroupGet(ctx, arvados.GetOptions{UUID: options.UUID})
- if err != nil {
- return resp, err
- }
-
- // If the group has groupClass 'filter', apply the filters before getting the contents.
- if respGroup.GroupClass == "filter" {
- if filters, ok := respGroup.Properties["filters"].([]interface{}); ok {
- for _, f := range filters {
- // f is supposed to be a []string
- tmp, ok2 := f.([]interface{})
- if !ok2 || len(tmp) < 3 {
- return resp, fmt.Errorf("filter unparsable: %T, %+v, original field: %T, %+v\n", tmp, tmp, f, f)
- }
- var filter arvados.Filter
- if attr, ok2 := tmp[0].(string); ok2 {
- filter.Attr = attr
- } else {
- return resp, fmt.Errorf("filter unparsable: attribute must be string: %T, %+v, filter: %T, %+v\n", tmp[0], tmp[0], f, f)
- }
- if operator, ok2 := tmp[1].(string); ok2 {
- filter.Operator = operator
- } else {
- return resp, fmt.Errorf("filter unparsable: operator must be string: %T, %+v, filter: %T, %+v\n", tmp[1], tmp[1], f, f)
- }
- filter.Operand = tmp[2]
- options.Filters = append(options.Filters, filter)
- }
- } else {
- return resp, fmt.Errorf("filter unparsable: not an array\n")
+var privateNetworks = func() (nets []*net.IPNet) {
+ for _, s := range []string{
+ "127.0.0.0/8",
+ "10.0.0.0/8",
+ "172.16.0.0/12",
+ "192.168.0.0/16",
+ "169.254.0.0/16",
+ "::1/128",
+ "fe80::/10",
+ "fc00::/7",
+ } {
+ _, n, err := net.ParseCIDR(s)
+ if err != nil {
+ panic(fmt.Sprintf("privateNetworks: %q: %s", s, err))
}
- // Use the generic /groups/contents endpoint for filter groups
- options.UUID = ""
+ nets = append(nets, n)
}
-
- return conn.railsProxy.GroupContents(ctx, options)
-}
+ return
+}()
func httpErrorf(code int, format string, args ...interface{}) error {
return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)