h3. Changing ulimits
Docker containers inherit ulimits from the Docker daemon. However, the ulimits for a single Unix daemon may not accommodate a long-running Crunch job. You may want to increase default limits for compute containers by passing @--default-ulimit@ options to the Docker daemon. For example, to allow containers to open 10,000 files, set @--default-ulimit nofile=10000:10000@.
+
+h2. Troubleshooting
+
+h3. Workflows fail with @ValidationException: Not found: '/var/lib/cwl/workflow.json#main'@
+
+A possible configuration error is having Docker installed as a @snap@ package rather than a @deb@ package. This is a problem because @snap@ packages are partially containerized and may have a different view of the filesystem than @crunch-run@. This will produce confusing problems, for example, directory bind mounts sent to Docker that are empty (instead of containing the intended files) and resulting in unexpected "file not found" errors.
+
+To check for this situation, run @snap list@ and look for @docker@. If found, run @snap remove docker@ and follow the instructions to above to "install Docker Engine":#install_docker .
Note that exclusion filters @!=@ and @not in@ will return records for which the property is not defined at all. To restrict filtering to records on which the subproperty is defined, combine with an @exists@ filter.
+h4(#filterexpression). Filtering using boolean expressions
+
+In addition to the three-element array form described above, a string containing a boolean expression is also accepted. The following restrictions apply:
+* The expression must contain exactly one operator.
+* The operator must be @=@, @<@, @<=@, @>@, or @>=@.
+* There must be exactly one pair of parentheses, surrounding the entire expression.
+* Each operand must be the name of a numeric attribute like @replication_desired@ (literal values like @3@ and non-numeric attributes like @uuid@ are not accepted).
+* The expression must not contain whitespace other than an ASCII space (newline and tab characters are not accepted).
+
+Examples:
+* @(replication_desired > replication_confirmed)@
+* @(replication_desired = replication_confirmed)@
+
+Both types of filter (boolean expressions and @[attribute, operator, operand]@ filters) can be combined in the same API call. Example:
+* @{"filters": ["(replication_desired > replication_confirmed)", ["replication_desired", "<", 2]]}@
+
h4. Federated listing
Federated listing forwards a request to multiple clusters and combines the results. Currently only a very restricted form of the "list" method is supported.
func (s *RouterSuite) TestOptions(c *check.C) {
token := arvadostest.ActiveToken
for _, trial := range []struct {
+ comment string // unparsed -- only used to help match test failures to trials
method string
path string
header http.Header
shouldCall: "CollectionList",
withOptions: arvados.ListOptions{Limit: 123, Offset: 456, IncludeTrash: true, IncludeOldVersions: true},
},
+ {
+ comment: "form-encoded expression filter in query string",
+ method: "GET",
+ path: "/arvados/v1/collections?filters=[%22(foo<bar)%22]",
+ header: http.Header{"Content-Type": {"application/x-www-form-urlencoded"}},
+ shouldCall: "CollectionList",
+ withOptions: arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"(foo<bar)", "=", true}}},
+ },
+ {
+ comment: "form-encoded expression filter in POST body",
+ method: "POST",
+ path: "/arvados/v1/collections",
+ body: "filters=[\"(foo<bar)\"]&_method=GET",
+ header: http.Header{"Content-Type": {"application/x-www-form-urlencoded"}},
+ shouldCall: "CollectionList",
+ withOptions: arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"(foo<bar)", "=", true}}},
+ },
+ {
+ comment: "json-encoded expression filter in POST body",
+ method: "POST",
+ path: "/arvados/v1/collections?_method=GET",
+ body: `{"filters":["(foo<bar)",["bar","=","baz"]],"limit":2}`,
+ header: http.Header{"Content-Type": {"application/json"}},
+ shouldCall: "CollectionList",
+ withOptions: arvados.ListOptions{Limit: 2, Filters: []arvados.Filter{{"(foo<bar)", "=", true}, {"bar", "=", "baz"}}},
+ },
{
method: "PATCH",
path: "/arvados/v1/collections",
// Reset calls captured in previous trial
s.stub = arvadostest.APIStub{}
- c.Logf("trial: %#v", trial)
+ c.Logf("trial: %+v", trial)
+ comment := check.Commentf("trial comment: %s", trial.comment)
+
_, rr, _ := doRequest(c, s.rtr, token, trial.method, trial.path, trial.header, bytes.NewBufferString(trial.body))
if trial.shouldStatus == 0 {
- c.Check(rr.Code, check.Equals, http.StatusOK)
+ c.Check(rr.Code, check.Equals, http.StatusOK, comment)
} else {
- c.Check(rr.Code, check.Equals, trial.shouldStatus)
+ c.Check(rr.Code, check.Equals, trial.shouldStatus, comment)
}
calls := s.stub.Calls(nil)
if trial.shouldCall == "" {
- c.Check(calls, check.HasLen, 0)
+ c.Check(calls, check.HasLen, 0, comment)
} else if len(calls) != 1 {
- c.Check(calls, check.HasLen, 1)
+ c.Check(calls, check.HasLen, 1, comment)
} else {
- c.Check(calls[0].Method, isMethodNamed, trial.shouldCall)
- c.Check(calls[0].Options, check.DeepEquals, trial.withOptions)
+ c.Check(calls[0].Method, isMethodNamed, trial.shouldCall, comment)
+ c.Check(calls[0].Options, check.DeepEquals, trial.withOptions, comment)
}
}
}
if umnterr != nil {
runner.CrunchLog.Printf("Error unmounting: %v", umnterr)
+ runner.ArvMount.Process.Kill()
} else {
// If arv-mount --unmount gets stuck for any reason, we
// don't want to wait for it forever. Do Wait() in a goroutine
// populated.
Container arvados.Container `json:"container"`
InstanceType arvados.InstanceType `json:"instance_type"`
+ FirstSeenAt time.Time `json:"first_seen_at"`
}
// String implements fmt.Stringer by returning the queued container's
delete(cq.current, uuid)
}
+// Caller must have lock.
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
"Priority": ctr.Priority,
"InstanceType": it.Name,
}).Info("adding container to queue")
- cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+ cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()}
}
// Lock acquires the dispatch lock for the given container.
sorted = append(sorted, ent)
}
sort.Slice(sorted, func(i, j int) bool {
- return sorted[i].Container.Priority > sorted[j].Container.Priority
+ if pi, pj := sorted[i].Container.Priority, sorted[j].Container.Priority; pi != pj {
+ return pi > pj
+ } else {
+ // When containers have identical priority,
+ // start them in the order we first noticed
+ // them. This avoids extra lock/unlock cycles
+ // when we unlock the containers that don't
+ // fit in the available pool.
+ return sorted[i].FirstSeenAt.Before(sorted[j].FirstSeenAt)
+ }
})
running := sch.pool.Running()
// starve this one by using keeping
// idle workers alive on different
// instance types.
- logger.Debug("unlocking: AtQuota and no unalloc workers")
- sch.queue.Unlock(ctr.UUID)
+ logger.Trace("overquota")
overquota = sorted[i:]
break tryrun
} else if logger.Info("creating new instance"); sch.pool.Create(it) {
// avoid getting starved here if
// instances of a specific type always
// fail.
+ logger.Trace("pool declined to create new instance")
continue
}
starts: []string{},
canCreate: 0,
}
- New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
+ sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch.runQueue()
+ sch.sync()
+ sch.runQueue()
+ sch.sync()
c.Check(pool.creates, check.DeepEquals, shouldCreate)
if len(shouldCreate) == 0 {
c.Check(pool.starts, check.DeepEquals, []string{})
- c.Check(pool.shutdowns, check.Not(check.Equals), 0)
} else {
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(2)})
- c.Check(pool.shutdowns, check.Equals, 0)
}
+ c.Check(pool.shutdowns, check.Equals, 3-quota)
+ c.Check(queue.StateChanges(), check.DeepEquals, []test.QueueStateChange{
+ {UUID: "zzzzz-dz642-000000000000003", From: "Locked", To: "Queued"},
+ {UUID: "zzzzz-dz642-000000000000002", From: "Locked", To: "Queued"},
+ })
+ }
+}
+
+// Don't flap lock/unlock when equal-priority containers compete for
+// limited workers.
+//
+// (Unless we use FirstSeenAt as a secondary sort key, each runQueue()
+// tends to choose a different one of the equal-priority containers as
+// the "first" one that should be locked, and unlock the one it chose
+// last time. This generates logging noise, and fails containers by
+// reaching MaxDispatchAttempts quickly.)
+func (*SchedulerSuite) TestEqualPriorityContainers(c *check.C) {
+ logger := ctxlog.TestLogger(c)
+ ctx := ctxlog.Context(context.Background(), logger)
+ queue := test.Queue{
+ ChooseType: chooseType,
+ Logger: logger,
+ }
+ for i := 0; i < 8; i++ {
+ queue.Containers = append(queue.Containers, arvados.Container{
+ UUID: test.ContainerUUID(i),
+ Priority: 333,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 3,
+ RAM: 3 << 30,
+ },
+ })
+ }
+ queue.Update()
+ pool := stubPool{
+ quota: 2,
+ unalloc: map[arvados.InstanceType]int{
+ test.InstanceType(3): 1,
+ },
+ idle: map[arvados.InstanceType]int{
+ test.InstanceType(3): 1,
+ },
+ running: map[string]time.Time{},
+ creates: []arvados.InstanceType{},
+ starts: []string{},
+ canCreate: 1,
+ }
+ sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ for i := 0; i < 30; i++ {
+ sch.runQueue()
+ sch.sync()
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(pool.shutdowns, check.Equals, 0)
+ c.Check(pool.starts, check.HasLen, 1)
+ unlocked := map[string]int{}
+ for _, chg := range queue.StateChanges() {
+ if chg.To == arvados.ContainerStateQueued {
+ unlocked[chg.UUID]++
+ }
+ }
+ for uuid, count := range unlocked {
+ c.Check(count, check.Equals, 1, check.Commentf("%s", uuid))
}
}
"github.com/sirupsen/logrus"
)
+var reportedUnexpectedState = false
+
// sync resolves discrepancies between the queue and the pool:
//
// Lingering crunch-run processes for finalized and unlocked/requeued
// a network outage and is still
// preparing to run a container that
// has already been unlocked/requeued.
- go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
+ go sch.kill(uuid, fmt.Sprintf("pool says running, but queue says state=%s", ent.Container.State))
} else if ent.Container.Priority == 0 {
sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
go sch.requeue(ent, "priority=0")
}
default:
- sch.logger.WithFields(logrus.Fields{
- "ContainerUUID": uuid,
- "State": ent.Container.State,
- }).Error("BUG: unexpected state")
+ if !reportedUnexpectedState {
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ }).Error("BUG: unexpected state")
+ reportedUnexpectedState = true
+ }
}
}
for uuid := range running {
return
}
defer sch.uuidUnlock(uuid)
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "reason": reason,
+ }).Debug("kill")
sch.pool.KillContainer(uuid, reason)
sch.pool.ForgetContainer(uuid)
}
Logger logrus.FieldLogger
- entries map[string]container.QueueEnt
- updTime time.Time
- subscribers map[<-chan struct{}]chan struct{}
+ entries map[string]container.QueueEnt
+ updTime time.Time
+ subscribers map[<-chan struct{}]chan struct{}
+ stateChanges []QueueStateChange
mtx sync.Mutex
}
+type QueueStateChange struct {
+ UUID string
+ From arvados.ContainerState
+ To arvados.ContainerState
+}
+
+// All calls to Lock/Unlock/Cancel to date.
+func (q *Queue) StateChanges() []QueueStateChange {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
+ return q.stateChanges
+}
+
// Entries returns the containers that were queued when Update was
// last called.
func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
// caller must have lock.
func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
ent := q.entries[uuid]
+ q.stateChanges = append(q.stateChanges, QueueStateChange{uuid, from, to})
if ent.Container.State != from {
return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
}
upd[ctr.UUID] = container.QueueEnt{
Container: ctr,
InstanceType: it,
+ FirstSeenAt: time.Now(),
}
}
}
// UnmarshalJSON decodes a JSON array to a Filter.
func (f *Filter) UnmarshalJSON(data []byte) error {
- var elements []interface{}
- err := json.Unmarshal(data, &elements)
+ var decoded interface{}
+ err := json.Unmarshal(data, &decoded)
if err != nil {
return err
}
- if len(elements) != 3 {
- return fmt.Errorf("invalid filter %q: must have 3 elements", data)
- }
- attr, ok := elements[0].(string)
- if !ok {
- return fmt.Errorf("invalid filter attr %q", elements[0])
- }
- op, ok := elements[1].(string)
- if !ok {
- return fmt.Errorf("invalid filter operator %q", elements[1])
- }
- operand := elements[2]
- switch operand.(type) {
- case string, float64, []interface{}, nil, bool:
+ switch decoded := decoded.(type) {
+ case string:
+ // Accept "(foo < bar)" as a more obvious way to spell
+ // ["(foo < bar)","=",true]
+ *f = Filter{decoded, "=", true}
+ case []interface{}:
+ if len(decoded) != 3 {
+ return fmt.Errorf("invalid filter %q: must have 3 decoded", data)
+ }
+ attr, ok := decoded[0].(string)
+ if !ok {
+ return fmt.Errorf("invalid filter attr %q", decoded[0])
+ }
+ op, ok := decoded[1].(string)
+ if !ok {
+ return fmt.Errorf("invalid filter operator %q", decoded[1])
+ }
+ operand := decoded[2]
+ switch operand.(type) {
+ case string, float64, []interface{}, nil, bool:
+ default:
+ return fmt.Errorf("invalid filter operand %q", decoded[2])
+ }
+ *f = Filter{attr, op, operand}
default:
- return fmt.Errorf("invalid filter operand %q", elements[2])
+ return fmt.Errorf("invalid filter: json decoded as %T instead of array or string", decoded)
}
- *f = Filter{attr, op, operand}
return nil
}
package arvados
import (
- "bytes"
"encoding/json"
- "testing"
"time"
+
+ check "gopkg.in/check.v1"
)
-func TestMarshalFiltersWithNanoseconds(t *testing.T) {
+var _ = check.Suite(&filterEncodingSuite{})
+
+type filterEncodingSuite struct{}
+
+func (s *filterEncodingSuite) TestMarshalNanoseconds(c *check.C) {
t0 := time.Now()
t0str := t0.Format(time.RFC3339Nano)
buf, err := json.Marshal([]Filter{
{Attr: "modified_at", Operator: "=", Operand: t0}})
- if err != nil {
- t.Fatal(err)
- }
- if expect := []byte(`[["modified_at","=","` + t0str + `"]]`); 0 != bytes.Compare(buf, expect) {
- t.Errorf("Encoded as %q, expected %q", buf, expect)
- }
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `[["modified_at","=","`+t0str+`"]]`)
}
-func TestMarshalFiltersWithNil(t *testing.T) {
+func (s *filterEncodingSuite) TestMarshalNil(c *check.C) {
buf, err := json.Marshal([]Filter{
{Attr: "modified_at", Operator: "=", Operand: nil}})
- if err != nil {
- t.Fatal(err)
- }
- if expect := []byte(`[["modified_at","=",null]]`); 0 != bytes.Compare(buf, expect) {
- t.Errorf("Encoded as %q, expected %q", buf, expect)
- }
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `[["modified_at","=",null]]`)
}
-func TestUnmarshalFiltersWithNil(t *testing.T) {
+func (s *filterEncodingSuite) TestUnmarshalNil(c *check.C) {
buf := []byte(`["modified_at","=",null]`)
- f := &Filter{}
+ var f Filter
err := f.UnmarshalJSON(buf)
- if err != nil {
- t.Fatal(err)
- }
- expect := Filter{Attr: "modified_at", Operator: "=", Operand: nil}
- if f.Attr != expect.Attr || f.Operator != expect.Operator || f.Operand != expect.Operand {
- t.Errorf("Decoded as %q, expected %q", f, expect)
- }
+ c.Assert(err, check.IsNil)
+ c.Check(f, check.DeepEquals, Filter{Attr: "modified_at", Operator: "=", Operand: nil})
}
-func TestMarshalFiltersWithBoolean(t *testing.T) {
+func (s *filterEncodingSuite) TestMarshalBoolean(c *check.C) {
buf, err := json.Marshal([]Filter{
{Attr: "is_active", Operator: "=", Operand: true}})
- if err != nil {
- t.Fatal(err)
- }
- if expect := []byte(`[["is_active","=",true]]`); 0 != bytes.Compare(buf, expect) {
- t.Errorf("Encoded as %q, expected %q", buf, expect)
- }
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `[["is_active","=",true]]`)
}
-func TestUnmarshalFiltersWithBoolean(t *testing.T) {
+func (s *filterEncodingSuite) TestUnmarshalBoolean(c *check.C) {
buf := []byte(`["is_active","=",true]`)
- f := &Filter{}
+ var f Filter
+ err := f.UnmarshalJSON(buf)
+ c.Assert(err, check.IsNil)
+ c.Check(f, check.DeepEquals, Filter{Attr: "is_active", Operator: "=", Operand: true})
+}
+
+func (s *filterEncodingSuite) TestUnmarshalBooleanExpression(c *check.C) {
+ buf := []byte(`"(foo < bar)"`)
+ var f Filter
err := f.UnmarshalJSON(buf)
- if err != nil {
- t.Fatal(err)
- }
- expect := Filter{Attr: "is_active", Operator: "=", Operand: true}
- if f.Attr != expect.Attr || f.Operator != expect.Operator || f.Operand != expect.Operand {
- t.Errorf("Decoded as %q, expected %q", f, expect)
- }
+ c.Assert(err, check.IsNil)
+ c.Check(f, check.DeepEquals, Filter{Attr: "(foo < bar)", Operator: "=", Operand: true})
}
cond_out << "jsonb_exists(#{attr_table_name}.#{attr}, ?)"
param_out << operand
+ elsif expr = /^ *\( *(\w+) *(<=?|>=?|=) *(\w+) *\) *$/.match(attr)
+ if operator != '=' || ![true,"true"].index(operand)
+ raise ArgumentError.new("Invalid expression filter '#{attr}': subsequent elements must be [\"=\", true]")
+ end
+ operator = expr[2]
+ attr1, attr2 = expr[1], expr[3]
+ allowed = attr_model_class.searchable_columns(operator)
+ [attr1, attr2].each do |tok|
+ if !allowed.index(tok)
+ raise ArgumentError.new("Invalid attribute in expression: '#{tok}'")
+ end
+ col = attr_model_class.columns.select { |c| c.name == tok }.first
+ if col.type != :integer
+ raise ArgumentError.new("Non-numeric attribute in expression: '#{tok}'")
+ end
+ end
+ cond_out << "#{attr1} #{operator} #{attr2}"
else
if !attr_model_class.searchable_columns(operator).index(attr) &&
!(col.andand.type == :jsonb && ['contains', '=', '<>', '!='].index(operator))
assert_equal col.version, json_response['version'], 'Trashing a collection should not create a new version'
end
+ [['<', :<],
+ ['<=', :<=],
+ ['>', :>],
+ ['>=', :>=],
+ ['=', :==]].each do |op, rubyop|
+ test "filter collections by replication_desired #{op} replication_confirmed" do
+ authorize_with(:active)
+ get :index, params: {
+ filters: [["(replication_desired #{op} replication_confirmed)", "=", true]],
+ }
+ assert_response :success
+ json_response["items"].each do |c|
+ assert_operator(c["replication_desired"], rubyop, c["replication_confirmed"])
+ end
+ end
+ end
+
+ ["(replication_desired < bogus)",
+ "replication_desired < replication_confirmed",
+ "(replication_desired < replication_confirmed",
+ "(replication_desired ! replication_confirmed)",
+ "(replication_desired <)",
+ "(replication_desired < manifest_text)",
+ "(manifest_text < manifest_text)", # currently only numeric attrs are supported
+ "(replication_desired < 2)", # currently only attrs are supported, not literals
+ "(1 < 2)",
+ ].each do |expr|
+ test "invalid filter expression #{expr}" do
+ authorize_with(:active)
+ get :index, params: {
+ filters: [[expr, "=", true]],
+ }
+ assert_response 422
+ end
+ end
+
+ test "invalid op/arg with filter expression" do
+ authorize_with(:active)
+ get :index, params: {
+ filters: [["replication_desired < replication_confirmed", "!=", false]],
+ }
+ assert_response 422
+ end
+
["storage_classes_desired", "storage_classes_confirmed"].each do |attr|
test "filter collections by #{attr}" do
authorize_with(:active)
if attempted:
# Report buffered stderr from previous call to fusermount,
# now that we know it didn't succeed.
- sys.stderr.write(fusermount_output)
+ sys.stderr.buffer.write(fusermount_output)
delay = 1
if deadline:
WORKBENCH2_BRANCH=main
fi
+# Update this to the docker tag for the version on releases.
+DEFAULT_TAG=
+
PG_DATA="$ARVBOX_DATA/postgres"
VAR_DATA="$ARVBOX_DATA/var"
PASSENGER="$ARVBOX_DATA/passenger"
fi
fi
+ if test -z "$TAG" -a -n "$DEFAULT_TAG"; then
+ TAG=":$DEFAULT_TAG"
+ fi
+
if [[ "$CONFIG" =~ ^public ]] ; then
if test -n "$ARVBOX_PUBLISH_IP" ; then
localip=$ARVBOX_PUBLISH_IP