COMMAND_ARR+=(".")
+ debug_echo -e "\n${COMMAND_ARR[@]}\n"
+
FPM_RESULTS=$("${COMMAND_ARR[@]}")
FPM_EXIT_CODE=$?
COMMAND_ARR+=('--exclude' "$i")
done
+ COMMAND_ARR+=("${fpm_args[@]}")
+
# Append remaining function arguments directly to fpm's command line.
for i; do
COMMAND_ARR+=("$i")
done
- COMMAND_ARR+=("${fpm_args[@]}")
-
COMMAND_ARR+=("$PACKAGE")
debug_echo -e "\n${COMMAND_ARR[@]}\n"
flags := flag.NewFlagSet("", flag.ContinueOnError)
flags.SetOutput(stderr)
loader.SetupFlags(flags)
+ strict := flags.Bool("strict", true, "Strict validation of configuration file (warnings result in non-zero exit code)")
err = flags.Parse(args)
if err == flag.ErrHelp {
fmt.Fprintln(stdout, "Your configuration is relying on deprecated entries. Suggest making the following changes.")
stdout.Write(diff)
err = nil
- return 1
+ if *strict {
+ return 1
+ }
} else if len(diff) > 0 {
fmt.Fprintf(stderr, "Unexpected diff output:\n%s", diff)
- return 1
+ if *strict {
+ return 1
+ }
} else if err != nil {
return 1
}
if logbuf.Len() > 0 {
- return 1
+ if *strict {
+ return 1
+ }
}
if problems {
# Time before repeating SIGTERM when killing a container.
TimeoutSignal: 5s
+ # Time to give up on a process (most likely arv-mount) that
+ # still holds a container lockfile after its main supervisor
+ # process has exited, and declare the instance broken.
+ TimeoutStaleRunLock: 5s
+
# Time to give up on SIGTERM and write off the worker.
TimeoutTERM: 2m
# Time before repeating SIGTERM when killing a container.
TimeoutSignal: 5s
+ # Time to give up on a process (most likely arv-mount) that
+ # still holds a container lockfile after its main supervisor
+ # process has exited, and declare the instance broken.
+ TimeoutStaleRunLock: 5s
+
# Time to give up on SIGTERM and write off the worker.
TimeoutTERM: 2m
return nil
}
+ proc, err := os.FindProcess(pi.PID)
+ if err != nil {
+ // FindProcess should have succeeded, even if the
+ // process does not exist.
+ fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
+ return nil
+ }
+ err = proc.Signal(syscall.Signal(0))
+ if err != nil {
+ // Process is dead, even though lockfile was
+ // still locked. Most likely a stuck arv-mount
+ // process that inherited the lock from
+ // crunch-run. Report container UUID as
+ // "stale".
+ fmt.Fprintln(stdout, pi.UUID, "stale")
+ return nil
+ }
+
fmt.Fprintln(stdout, pi.UUID)
return nil
}))
if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
- sched := scheduler.New(disp.Context, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+ sched := scheduler.New(disp.Context, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
sched.Start()
defer sched.Stop()
ProbeInterval: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutStaleRunLock: arvados.Duration(3 * time.Millisecond),
TimeoutTERM: arvados.Duration(20 * time.Millisecond),
ResourceTags: map[string]string{"testtag": "test value"},
TagKeyPrefix: "test:",
stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
default:
stubvm.CrunchRunCrashRate = 0.1
+ stubvm.ArvMountDeadlockRate = 0.1
}
}
s.stubDriver.Bugf = c.Errorf
dontstart := map[arvados.InstanceType]bool{}
var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
+ var containerAllocatedWorkerBootingCount int
tryrun:
for i, ctr := range sorted {
} else if sch.pool.StartContainer(it, ctr) {
// Success.
} else {
+ containerAllocatedWorkerBootingCount += 1
dontstart[it] = true
}
}
}
+ sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
+ sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota)))
+
if len(overquota) > 0 {
// Unlock any containers that are unmappable while
// we're at quota.
"git.arvados.org/arvados.git/lib/dispatchcloud/worker"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+
+ "github.com/prometheus/client_golang/prometheus/testutil"
+
check "gopkg.in/check.v1"
)
running: map[string]time.Time{},
canCreate: 0,
}
- New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1), test.InstanceType(1), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
starts: []string{},
canCreate: 0,
}
- New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, shouldCreate)
if len(shouldCreate) == 0 {
c.Check(pool.starts, check.DeepEquals, []string{})
},
}
queue.Update()
- New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
},
}
queue.Update()
- sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+ sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
c.Check(pool.running, check.HasLen, 1)
sch.sync()
for deadline := time.Now().Add(time.Second); len(pool.Running()) > 0 && time.Now().Before(deadline); time.Sleep(time.Millisecond) {
}
c.Check(pool.Running(), check.HasLen, 0)
}
+
+func (*SchedulerSuite) TestContainersAllocatedNotStartedMetric(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+ queue := test.Queue{
+ ChooseType: chooseType,
+ Containers: []arvados.Container{
+ {
+ UUID: test.ContainerUUID(1),
+ Priority: 1,
+ State: arvados.ContainerStateLocked,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
+ },
+ },
+ }
+ queue.Update()
+
+ // Create a pool with one unallocated (idle/booting/unknown) worker,
+ // and `idle` and `unknown` not set (empty). Iow this worker is in the booting
+ // state, and the container will be allocated but not started yet.
+ pool := stubPool{
+ unalloc: map[arvados.InstanceType]int{test.InstanceType(1): 1},
+ }
+ sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch.runQueue()
+
+ c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
+ c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
+
+ // Create a pool without workers. The queued container will not be started, and the
+ // 'over quota' metric will be 1 because no workers are available and canCreate defaults
+ // to zero.
+ pool = stubPool{}
+ sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
+ sch.runQueue()
+
+ c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
+ c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
+}
"time"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
logger logrus.FieldLogger
queue ContainerQueue
pool WorkerPool
+ reg *prometheus.Registry
staleLockTimeout time.Duration
queueUpdateInterval time.Duration
runOnce sync.Once
stop chan struct{}
stopped chan struct{}
+
+ mContainersAllocatedNotStarted prometheus.Gauge
+ mContainersNotAllocatedOverQuota prometheus.Gauge
}
// New returns a new unstarted Scheduler.
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
- return &Scheduler{
+func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, reg *prometheus.Registry, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+ sch := &Scheduler{
logger: ctxlog.FromContext(ctx),
queue: queue,
pool: pool,
+ reg: reg,
staleLockTimeout: staleLockTimeout,
queueUpdateInterval: queueUpdateInterval,
wakeup: time.NewTimer(time.Second),
stopped: make(chan struct{}),
uuidOp: map[string]string{},
}
+ sch.registerMetrics(reg)
+ return sch
+}
+
+func (sch *Scheduler) registerMetrics(reg *prometheus.Registry) {
+ if reg == nil {
+ reg = prometheus.NewRegistry()
+ }
+ sch.mContainersAllocatedNotStarted = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "containers_allocated_not_started",
+ Help: "Number of containers allocated to a worker but not started yet (worker is booting).",
+ })
+ reg.MustRegister(sch.mContainersAllocatedNotStarted)
+ sch.mContainersNotAllocatedOverQuota = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "containers_not_allocated_over_quota",
+ Help: "Number of containers not allocated to a worker because the system has hit a quota.",
+ })
+ reg.MustRegister(sch.mContainersNotAllocatedOverQuota)
}
// Start starts the scheduler.
ents, _ := queue.Entries()
c.Check(ents, check.HasLen, 1)
- sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+ sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.sync()
ents, _ = queue.Entries()
ents, _ := queue.Entries()
c.Check(ents, check.HasLen, 1)
- sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+ sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
// Sync shouldn't cancel the container because it might be
// running on the VM with state=="unknown".
tags: copyTags(tags),
providerType: it.ProviderType,
initCommand: cmd,
- running: map[string]int64{},
+ running: map[string]stubProcess{},
killing: map[string]bool{},
}
svm.SSHService = SSHService{
CrunchRunMissing bool
CrunchRunCrashRate float64
CrunchRunDetachDelay time.Duration
+ ArvMountMaxExitLag time.Duration
+ ArvMountDeadlockRate float64
ExecuteContainer func(arvados.Container) int
CrashRunningContainer func(arvados.Container)
initCommand cloud.InitCommand
providerType string
SSHService SSHService
- running map[string]int64
+ running map[string]stubProcess
killing map[string]bool
lastPID int64
+ deadlocked string
sync.Mutex
}
+type stubProcess struct {
+ pid int64
+
+ // crunch-run has exited, but arv-mount process (or something)
+ // still holds lock in /var/run/
+ exited bool
+}
+
func (svm *StubVM) Instance() stubInstance {
svm.Lock()
defer svm.Unlock()
svm.Lock()
svm.lastPID++
pid := svm.lastPID
- svm.running[uuid] = pid
+ svm.running[uuid] = stubProcess{pid: pid}
svm.Unlock()
time.Sleep(svm.CrunchRunDetachDelay)
fmt.Fprintf(stderr, "starting %s\n", uuid)
logger.Print("[test] exiting crunch-run stub")
svm.Lock()
defer svm.Unlock()
- if svm.running[uuid] != pid {
+ if svm.running[uuid].pid != pid {
bugf := svm.sis.driver.Bugf
if bugf == nil {
bugf = logger.Warnf
}
- bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s]==%d", pid, uuid, svm.running[uuid])
- } else {
- delete(svm.running, uuid)
+ bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
+ return
}
if !completed {
logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
svm.CrashRunningContainer(ctr)
}
}
+ sproc := svm.running[uuid]
+ sproc.exited = true
+ svm.running[uuid] = sproc
+ svm.Unlock()
+ time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
+ svm.Lock()
+ if math_rand.Float64() >= svm.ArvMountDeadlockRate {
+ delete(svm.running, uuid)
+ }
}()
crashluck := math_rand.Float64()
if command == "crunch-run --list" {
svm.Lock()
defer svm.Unlock()
- for uuid := range svm.running {
- fmt.Fprintf(stdout, "%s\n", uuid)
+ for uuid, sproc := range svm.running {
+ if sproc.exited {
+ fmt.Fprintf(stdout, "%s stale\n", uuid)
+ } else {
+ fmt.Fprintf(stdout, "%s\n", uuid)
+ }
}
if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
fmt.Fprintln(stdout, "broken")
}
+ fmt.Fprintln(stdout, svm.deadlocked)
return 0
}
if strings.HasPrefix(command, "crunch-run --kill ") {
svm.Lock()
- _, running := svm.running[uuid]
- if running {
+ sproc, running := svm.running[uuid]
+ if running && !sproc.exited {
svm.killing[uuid] = true
svm.Unlock()
time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
svm.Lock()
- _, running = svm.running[uuid]
+ sproc, running = svm.running[uuid]
}
svm.Unlock()
- if running {
+ if running && !sproc.exited {
fmt.Fprintf(stderr, "%s: container is running\n", uuid)
return 1
}
}
const (
- defaultSyncInterval = time.Minute
- defaultProbeInterval = time.Second * 10
- defaultMaxProbesPerSecond = 10
- defaultTimeoutIdle = time.Minute
- defaultTimeoutBooting = time.Minute * 10
- defaultTimeoutProbe = time.Minute * 10
- defaultTimeoutShutdown = time.Second * 10
- defaultTimeoutTERM = time.Minute * 2
- defaultTimeoutSignal = time.Second * 5
+ defaultSyncInterval = time.Minute
+ defaultProbeInterval = time.Second * 10
+ defaultMaxProbesPerSecond = 10
+ defaultTimeoutIdle = time.Minute
+ defaultTimeoutBooting = time.Minute * 10
+ defaultTimeoutProbe = time.Minute * 10
+ defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
+ defaultTimeoutStaleRunLock = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+ timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
installPublicKey: installPublicKey,
tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
stop: make(chan bool),
timeoutShutdown time.Duration
timeoutTERM time.Duration
timeoutSignal time.Duration
+ timeoutStaleRunLock time.Duration
installPublicKey ssh.PublicKey
tagKeyPrefix string
probing chan struct{}
bootOutcomeReported bool
timeToReadyReported bool
+ staleRunLockSince time.Time
}
func (wkr *worker) onUnkillable(uuid string) {
return
}
ok = true
+
+ staleRunLock := false
for _, s := range strings.Split(string(stdout), "\n") {
- if s == "broken" {
+ // Each line of the "crunch-run --list" output is one
+ // of the following:
+ //
+ // * a container UUID, indicating that processes
+ // related to that container are currently running.
+ // Optionally followed by " stale", indicating that
+ // the crunch-run process itself has exited (the
+ // remaining process is probably arv-mount).
+ //
+ // * the string "broken", indicating that the instance
+ // appears incapable of starting containers.
+ //
+ // See ListProcesses() in lib/crunchrun/background.go.
+ if s == "" {
+ // empty string following final newline
+ } else if s == "broken" {
reportsBroken = true
- } else if s != "" {
+ } else if toks := strings.Split(s, " "); len(toks) == 1 {
running = append(running, s)
+ } else if toks[1] == "stale" {
+ wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
+ staleRunLock = true
}
}
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ if !staleRunLock {
+ wkr.staleRunLockSince = time.Time{}
+ } else if wkr.staleRunLockSince.IsZero() {
+ wkr.staleRunLockSince = time.Now()
+ } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
+ wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
+ reportsBroken = true
+ }
return
}
"ca-certificates",
"cadaver",
"curl",
- "cython",
+ "cython3",
"daemontools", // lib/boot uses setuidgid to drop privileges when running as root
"default-jdk-headless",
"default-jre-headless",
"libpam-dev",
"libpcre3-dev",
"libpq-dev",
- "libpython2.7-dev",
"libreadline-dev",
"libssl-dev",
"libwww-perl",
"pkg-config",
"postgresql",
"postgresql-contrib",
- "python",
"python3-dev",
"python-epydoc",
"r-base",
"r-cran-testthat",
"sudo",
- "virtualenv",
+ "python3-virtualenv",
+ "python3-venv",
"wget",
"xvfb",
"zlib1g-dev",
TimeoutProbe Duration
TimeoutShutdown Duration
TimeoutSignal Duration
+ TimeoutStaleRunLock Duration
TimeoutTERM Duration
ResourceTags map[string]string
TagKeyPrefix string
#
# SPDX-License-Identifier: Apache-2.0
-# arv-copy [--recursive] [--no-recursive] object-uuid src dst
+# arv-copy [--recursive] [--no-recursive] object-uuid
#
# Copies an object from Arvados instance src to instance dst.
#
import logging
import tempfile
import urllib.parse
+import io
import arvados
import arvados.config
'-f', '--force', dest='force', action='store_true',
help='Perform copy even if the object appears to exist at the remote destination.')
copy_opts.add_argument(
- '--src', dest='source_arvados', required=True,
+ '--src', dest='source_arvados',
help='The name of the source Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
copy_opts.add_argument(
- '--dst', dest='destination_arvados', required=True,
+ '--dst', dest='destination_arvados',
help='The name of the destination Arvados instance (required) - points at an Arvados config file. May be either a pathname to a config file, or (for example) "foo" as shorthand for $HOME/.config/arvados/foo.conf.')
copy_opts.add_argument(
'--recursive', dest='recursive', action='store_true',
- help='Recursively copy any dependencies for this object. (default)')
+ help='Recursively copy any dependencies for this object, and subprojects. (default)')
copy_opts.add_argument(
'--no-recursive', dest='recursive', action='store_false',
- help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
+ help='Do not copy any dependencies or subprojects.')
copy_opts.add_argument(
'--project-uuid', dest='project_uuid',
help='The UUID of the project at the destination to which the collection or workflow should be copied.')
else:
logger.setLevel(logging.INFO)
+ if not args.source_arvados:
+ args.source_arvados = args.object_uuid[:5]
+
# Create API clients for the source and destination instances
src_arv = api_for_instance(args.source_arvados)
dst_arv = api_for_instance(args.destination_arvados)
elif t == 'Workflow':
set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args)
result = copy_workflow(args.object_uuid, src_arv, dst_arv, args)
+ elif t == 'Group':
+ set_src_owner_uuid(src_arv.groups(), args.object_uuid, args)
+ result = copy_project(args.object_uuid, src_arv, dst_arv, args.project_uuid, args)
else:
abort("cannot copy object {} of type {}".format(args.object_uuid, t))
# $HOME/.config/arvados/instance_name.conf
#
def api_for_instance(instance_name):
+ if not instance_name:
+ # Use environment
+ return arvados.api('v1', model=OrderedJsonModel())
+
if '/' in instance_name:
config_file = instance_name
else:
# copy the workflow itself
del wf['uuid']
wf['owner_uuid'] = args.project_uuid
- return dst.workflows().create(body=wf).execute(num_retries=args.retries)
+
+ existing = dst.workflows().list(filters=[["owner_uuid", "=", args.project_uuid],
+ ["name", "=", wf["name"]]]).execute()
+ if len(existing["items"]) == 0:
+ return dst.workflows().create(body=wf).execute(num_retries=args.retries)
+ else:
+ return dst.workflows().update(uuid=existing["items"][0]["uuid"], body=wf).execute(num_retries=args.retries)
+
def workflow_collections(obj, locations, docker_images):
if isinstance(obj, dict):
if loc.startswith("keep:"):
locations.append(loc[5:])
- docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None)
+ docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) or obj.get('acrContainerImage', None)
if docker_image is not None:
ds = docker_image.split(":", 1)
tag = ds[1] if len(ds)==2 else 'latest'
# a new manifest as we go.
src_keep = arvados.keep.KeepClient(api_client=src, num_retries=args.retries)
dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=args.retries)
- dst_manifest = ""
+ dst_manifest = io.StringIO()
dst_locators = {}
bytes_written = 0
bytes_expected = total_collection_size(manifest)
for line in manifest.splitlines():
words = line.split()
- dst_manifest += words[0]
+ dst_manifest.write(words[0])
for word in words[1:]:
try:
loc = arvados.KeepLocator(word)
except ValueError:
# If 'word' can't be parsed as a locator,
# presume it's a filename.
- dst_manifest += ' ' + word
+ dst_manifest.write(' ')
+ dst_manifest.write(word)
continue
blockhash = loc.md5sum
# copy this block if we haven't seen it before
dst_locator = dst_keep.put(data)
dst_locators[blockhash] = dst_locator
bytes_written += loc.size
- dst_manifest += ' ' + dst_locators[blockhash]
- dst_manifest += "\n"
+ dst_manifest.write(' ')
+ dst_manifest.write(dst_locators[blockhash])
+ dst_manifest.write("\n")
if progress_writer:
progress_writer.report(obj_uuid, bytes_written, bytes_expected)
progress_writer.finish()
# Copy the manifest and save the collection.
- logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest)
+ logger.debug('saving %s with manifest: <%s>', obj_uuid, dst_manifest.getvalue())
- c['manifest_text'] = dst_manifest
+ c['manifest_text'] = dst_manifest.getvalue()
return create_collection_from(c, src, dst, args)
def select_git_url(api, repo_name, retries, allow_insecure_http, allow_insecure_http_opt):
else:
logger.warning('Could not find docker image {}:{}'.format(docker_image, docker_image_tag))
+def copy_project(obj_uuid, src, dst, owner_uuid, args):
+
+ src_project_record = src.groups().get(uuid=obj_uuid).execute(num_retries=args.retries)
+
+ # Create/update the destination project
+ existing = dst.groups().list(filters=[["owner_uuid", "=", owner_uuid],
+ ["name", "=", src_project_record["name"]]]).execute(num_retries=args.retries)
+ if len(existing["items"]) == 0:
+ project_record = dst.groups().create(body={"group": {"group_class": "project",
+ "owner_uuid": owner_uuid,
+ "name": src_project_record["name"]}}).execute(num_retries=args.retries)
+ else:
+ project_record = existing["items"][0]
+
+ dst.groups().update(uuid=project_record["uuid"],
+ body={"group": {
+ "description": src_project_record["description"]}}).execute(num_retries=args.retries)
+
+ args.project_uuid = project_record["uuid"]
+
+ logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
+
+ # Copy collections
+ copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
+ src, dst, args)
+
+ # Copy workflows
+ for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
+ copy_workflow(w["uuid"], src, dst, args)
+
+ if args.recursive:
+ for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
+ copy_project(g["uuid"], src, dst, project_record["uuid"], args)
+
+ return project_record
+
# git_rev_parse(rev, repo)
#
# Returns the 40-character commit hash corresponding to 'rev' in
# Special case: if handed a Keep locator hash, return 'Collection'.
#
def uuid_type(api, object_uuid):
- if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
+ if re.match(arvados.util.keep_locator_pattern, object_uuid):
return 'Collection'
p = object_uuid.split('-')
if len(p) == 3:
ARVADOS_DIR = os.path.realpath(os.path.join(MY_DIRNAME, '../../..'))
SERVICES_SRC_DIR = os.path.join(ARVADOS_DIR, 'services')
+
+# Work around https://bugs.python.org/issue27805, should be no longer
+# necessary from sometime in Python 3.8.x
+if not os.environ.get('ARVADOS_DEBUG', ''):
+ WRITE_MODE = 'a'
+else:
+ WRITE_MODE = 'w'
+
if 'GOPATH' in os.environ:
# Add all GOPATH bin dirs to PATH -- but insert them after the
# ruby gems bin dir, to ensure "bundle" runs the Ruby bundler
env.pop('ARVADOS_API_HOST', None)
env.pop('ARVADOS_API_HOST_INSECURE', None)
env.pop('ARVADOS_API_TOKEN', None)
- logf = open(_logfilename('railsapi'), 'a')
+ logf = open(_logfilename('railsapi'), WRITE_MODE)
railsapi = subprocess.Popen(
['bundle', 'exec',
'passenger', 'start', '-p{}'.format(port),
if 'ARVADOS_TEST_PROXY_SERVICES' in os.environ:
return
stop_controller()
- logf = open(_logfilename('controller'), 'a')
+ logf = open(_logfilename('controller'), WRITE_MODE)
port = internal_port_from_config("Controller")
controller = subprocess.Popen(
["arvados-server", "controller"],
return
stop_ws()
port = internal_port_from_config("Websocket")
- logf = open(_logfilename('ws'), 'a')
+ logf = open(_logfilename('ws'), WRITE_MODE)
ws = subprocess.Popen(
["arvados-server", "ws"],
stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
yaml.safe_dump(confdata, f)
keep_cmd = ["keepstore", "-config", conf]
- with open(_logfilename('keep{}'.format(n)), 'a') as logf:
+ with open(_logfilename('keep{}'.format(n)), WRITE_MODE) as logf:
with open('/dev/null') as _stdin:
child = subprocess.Popen(
keep_cmd, stdin=_stdin, stdout=logf, stderr=logf, close_fds=True)
port = internal_port_from_config("Keepproxy")
env = os.environ.copy()
env['ARVADOS_API_TOKEN'] = auth_token('anonymous')
- logf = open(_logfilename('keepproxy'), 'a')
+ logf = open(_logfilename('keepproxy'), WRITE_MODE)
kp = subprocess.Popen(
['keepproxy'], env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf, close_fds=True)
gitport = internal_port_from_config("GitHTTP")
env = os.environ.copy()
env.pop('ARVADOS_API_TOKEN', None)
- logf = open(_logfilename('arv-git-httpd'), 'a')
+ logf = open(_logfilename('arv-git-httpd'), WRITE_MODE)
agh = subprocess.Popen(['arv-git-httpd'],
env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf)
with open(_pidfile('arv-git-httpd'), 'w') as f:
keepwebport = internal_port_from_config("WebDAV")
env = os.environ.copy()
- logf = open(_logfilename('keep-web'), 'a')
+ logf = open(_logfilename('keep-web'), WRITE_MODE)
keepweb = subprocess.Popen(
['keep-web'],
env=env, stdin=open('/dev/null'), stdout=logf, stderr=logf)
import sys
import tempfile
import unittest
+import shutil
+import arvados.api
+from arvados.collection import Collection, CollectionReader
import arvados.commands.arv_copy as arv_copy
from . import arvados_testutil as tutil
+from . import run_test_server
+
+class ArvCopyVersionTestCase(run_test_server.TestCaseWithServers, tutil.VersionChecker):
+ MAIN_SERVER = {}
+ KEEP_SERVER = {}
-class ArvCopyTestCase(unittest.TestCase, tutil.VersionChecker):
def run_copy(self, args):
sys.argv = ['arv-copy'] + args
return arv_copy.main()
with self.assertRaises(SystemExit):
self.run_copy(['--version'])
self.assertVersionOutput(out, err)
+
+ def test_copy_project(self):
+ api = arvados.api()
+ src_proj = api.groups().create(body={"group": {"name": "arv-copy project", "group_class": "project"}}).execute()["uuid"]
+
+ c = Collection()
+ with c.open('foo', 'wt') as f:
+ f.write('foo')
+ c.save_new("arv-copy foo collection", owner_uuid=src_proj)
+
+ dest_proj = api.groups().create(body={"group": {"name": "arv-copy dest project", "group_class": "project"}}).execute()["uuid"]
+
+ tmphome = tempfile.mkdtemp()
+ home_was = os.environ['HOME']
+ os.environ['HOME'] = tmphome
+ try:
+ cfgdir = os.path.join(tmphome, ".config", "arvados")
+ os.makedirs(cfgdir)
+ with open(os.path.join(cfgdir, "zzzzz.conf"), "wt") as f:
+ f.write("ARVADOS_API_HOST=%s\n" % os.environ["ARVADOS_API_HOST"])
+ f.write("ARVADOS_API_TOKEN=%s\n" % os.environ["ARVADOS_API_TOKEN"])
+ f.write("ARVADOS_API_HOST_INSECURE=1\n")
+
+ contents = api.groups().list(filters=[["owner_uuid", "=", dest_proj]]).execute()
+ assert len(contents["items"]) == 0
+
+ try:
+ self.run_copy(["--project-uuid", dest_proj, src_proj])
+ except SystemExit as e:
+ assert e.code == 0
+
+ contents = api.groups().list(filters=[["owner_uuid", "=", dest_proj]]).execute()
+ assert len(contents["items"]) == 1
+
+ assert contents["items"][0]["name"] == "arv-copy project"
+ copied_project = contents["items"][0]["uuid"]
+
+ contents = api.collections().list(filters=[["owner_uuid", "=", copied_project]]).execute()
+ assert len(contents["items"]) == 1
+
+ assert contents["items"][0]["uuid"] != c.manifest_locator()
+ assert contents["items"][0]["name"] == "arv-copy foo collection"
+ assert contents["items"][0]["portable_data_hash"] == c.portable_data_hash()
+
+ finally:
+ os.environ['HOME'] = home_was
+ shutil.rmtree(tmphome)
rescue ActiveRecord::RecordNotUnique
retry
end
- u.update_attributes!(nullify_attrs(attrs))
+ needupdate = {}
+ nullify_attrs(attrs).each do |k,v|
+ if !v.nil? && u.send(k) != v
+ needupdate[k] = v
+ end
+ end
+ if needupdate.length > 0
+ u.update_attributes!(needupdate)
+ end
@objects << u
end
@offset = 0
test "batch update" do
existinguuid = 'remot-tpzed-foobarbazwazqux'
newuuid = 'remot-tpzed-newnarnazwazqux'
+ unchanginguuid = 'remot-tpzed-nochangingattrs'
act_as_system_user do
User.create!(uuid: existinguuid, email: 'root@existing.example.com')
+ User.create!(uuid: unchanginguuid, email: 'root@unchanging.example.com', prefs: {'foo' => {'bar' => 'baz'}})
end
+ assert_equal(1, Log.where(object_uuid: unchanginguuid).count)
authorize_with(:admin)
patch(:batch_update,
'email' => 'root@remot.example.com',
'username' => '',
},
+ unchanginguuid => {
+ 'email' => 'root@unchanging.example.com',
+ 'prefs' => {'foo' => {'bar' => 'baz'}},
+ },
}})
assert_response(:success)
assert_equal('noot', User.find_by_uuid(newuuid).first_name)
assert_equal('root@remot.example.com', User.find_by_uuid(newuuid).email)
+
+ assert_equal(1, Log.where(object_uuid: unchanginguuid).count)
end
NON_ADMIN_USER_DATA = ["uuid", "kind", "is_active", "email", "first_name",
apt-get -yq --no-install-recommends -o Acquire::Retries=6 install \
postgresql-9.6 postgresql-contrib-9.6 git build-essential runit curl libpq-dev \
libcurl4-openssl-dev libssl1.0-dev zlib1g-dev libpcre3-dev libpam-dev \
- openssh-server python-setuptools netcat-traditional \
- python-epydoc graphviz bzip2 less sudo virtualenv \
- libpython-dev fuse libfuse-dev python-pip python-yaml \
- pkg-config libattr1-dev python-pycurl \
+ openssh-server netcat-traditional \
+ graphviz bzip2 less sudo virtualenv \
+ libpython-dev fuse libfuse-dev \
+ pkg-config libattr1-dev \
libwww-perl libio-socket-ssl-perl libcrypt-ssleay-perl \
libjson-perl nginx gitolite3 lsof libreadline-dev \
- apt-transport-https ca-certificates \
- linkchecker python3-virtualenv python-virtualenv xvfb iceweasel \
+ apt-transport-https ca-certificates python3-yaml \
+ linkchecker python3-virtualenv python3-venv xvfb iceweasel \
libgnutls28-dev python3-dev vim cadaver cython gnupg dirmngr \
libsecret-1-dev r-base r-cran-testthat libxml2-dev pandoc \
python3-setuptools python3-pip openjdk-8-jdk bsdmainutils net-tools \
- ruby2.3 ruby-dev bundler shellinabox && \
+ ruby2.3 ruby-dev bundler shellinabox && \
+ apt-get remove -yq libpython-dev libpython-stdlib libpython2.7 libpython2.7-dev \
+ libpython2.7-minimal libpython2.7-stdlib python2.7-minimal python2.7 && \
apt-get clean
ENV RUBYVERSION_MINOR 2.3
ENV GDURL=https://github.com/mozilla/geckodriver/releases/download/$GDVERSION/geckodriver-$GDVERSION-linux64.tar.gz
RUN set -e && curl -L -f ${GDURL} | tar -C /usr/local/bin -xzf - geckodriver
-RUN pip install -U setuptools
-
ENV NODEVERSION v8.15.1
# Install nodejs binary
pip_install wheel
cd /usr/src/arvados/sdk/python
-python setup.py sdist
+$PYCMD setup.py sdist
pip_install $(ls dist/arvados-python-client-*.tar.gz | tail -n1)
cd /usr/src/arvados/services/fuse
-python setup.py sdist
+$PYCMD setup.py sdist
pip_install $(ls dist/arvados_fuse-*.tar.gz | tail -n1)
cd /usr/src/arvados/sdk/cwl
-python setup.py sdist
+$PYCMD setup.py sdist
pip_install $(ls dist/arvados-cwl-runner-*.tar.gz | tail -n1)
def recursiveMerge(a, b):
if isinstance(a, dict) and isinstance(b, dict):
for k in b:
- print k
+ print(k)
a[k] = recursiveMerge(a.get(k), b[k])
return a
else:
-#!/bin/sh
+#!/bin/bash
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: AGPL-3.0
-set -e
+set -e -o pipefail
-if test -z "$1" ; then
+if test -z "$1" ; then
echo "$0: Copies Arvados tutorial resources from public data cluster (jutro)"
- echo "Usage: copy-tutorial.sh <dest>"
- echo "<dest> is destination cluster configuration that can be found in ~/.config/arvados"
+ echo "Usage: copy-tutorial.sh <tutorial>"
+ echo "<tutorial> is which tutorial to copy, one of:"
+ echo " bwa-mem Tutorial from https://doc.arvados.org/user/tutorials/tutorial-workflow-workbench.html"
+ echo " whole-genome Whole genome variant calling tutorial workflow (large)"
exit
fi
-echo "Copying from public data cluster (jutro) to $1"
+if test -z "ARVADOS_API_HOST" ; then
+ echo "Please set ARVADOS_API_HOST to the destination cluster"
+ exit
+fi
+
+src=jutro
+tutorial=$1
+
+if ! test -f $HOME/.config/arvados/jutro.conf ; then
+ # Set it up with the anonymous user token.
+ echo "ARVADOS_API_HOST=jutro.arvadosapi.com" > $HOME/.config/arvados/jutro.conf
+ echo "ARVADOS_API_TOKEN=v2/jutro-gj3su-e2o9x84aeg7q005/22idg1m3zna4qe4id3n0b9aw86t72jdw8qu1zj45aboh1mm4ej" >> $HOME/.config/arvados/jutro.conf
+ exit 1
+fi
+
+echo
+echo "Copying from public data cluster (jutro) to $ARVADOS_API_HOST"
+echo
+
+make_project() {
+ name="$1"
+ owner="$2"
+ if test -z "$owner" ; then
+ owner=$(arv --format=uuid user current)
+ fi
+ project_uuid=$(arv --format=uuid group list --filters '[["name", "=", "'"$name"'"], ["owner_uuid", "=", "'$owner'"]]')
+ if test -z "$project_uuid" ; then
+ project_uuid=$(arv --format=uuid group create --group '{"name":"'"$name"'", "group_class": "project", "owner_uuid": "'$owner'"}')
+
+ fi
+ echo $project_uuid
+}
-for a in $(cat $HOME/.config/arvados/$1.conf) ; do export $a ; done
+copy_jobs_image() {
+ if ! arv-keepdocker | grep "arvados/jobs *latest" ; then
+ arv-copy --project-uuid=$parent_project jutro-4zz18-sxmit0qs6i9n2s4
+ fi
+}
-project_uuid=$(arv --format=uuid group create --group '{"name":"User guide resources", "group_class": "project"}')
+parent_project=$(make_project "Tutorial projects")
+copy_jobs_image
-# Bwa-mem workflow
-arv-copy --src jutro --dst $1 --project-uuid=$project_uuid f141fc27e7cfa7f7b6d208df5e0ee01b+59
-arv-copy --src jutro --dst $1 --project-uuid=$project_uuid jutro-7fd4e-mkmmq53m1ze6apx
+if test "$tutorial" = "bwa-mem" ; then
+ echo
+ echo "Copying bwa mem tutorial"
+ echo
-echo "Data copied to \"User guide resources\" at $project_uuid"
+ arv-copy --project-uuid=$parent_project jutro-j7d0g-rehmt1w5v2p2drp
+
+ echo
+ echo "Finished, data copied to \"User guide resources\" at $parent_project"
+ echo "You can now go to Workbench and choose 'Run a process' and then select 'bwa-mem.cwl'"
+ echo
+fi
+
+if test "$tutorial" = "whole-genome" ; then
+ echo
+ echo "Copying whole genome variant calling tutorial"
+ echo
+
+ arv-copy --project-uuid=$parent_project jutro-j7d0g-n2g87m02rsl4cx2
+
+ echo
+ echo "Finished, data copied to \"WGS Processing Tutorial\" at $parent_project"
+ echo "You can now go to Workbench and choose 'Run a process' and then select 'WGS Processing Tutorial'"
+ echo
+fi