-logging.info("start writing output to keep")
-
-done = False
-resume_cache = put.ResumeCache(os.path.join(arvados.current_task().tmpdir, "upload-output-checkpoint"))
-reporter = put.progress_writer(machine_progress)
-bytes_expected = put.expected_bytes_for(".")
-while not done:
- try:
- out = put.ArvPutCollectionWriter.from_cache(resume_cache, reporter, bytes_expected)
- out.do_queued_work()
- out.write_directory_tree(".", max_manifest_depth=0)
- outuuid = out.finish()
- api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output':outuuid,
- 'success': (rcode == 0),
- 'progress':1.0
- }).execute()
- done = True
- except KeyboardInterrupt:
- logging.critical("terminating on signal 2")
- sys.exit(2)
- except Exception as e:
- logging.exception("caught exception:")
- time.sleep(5)
+logger.info("start writing output to keep")
+
+if "task.vwd" in taskp:
+ if "task.foreach" in jobp:
+ # This is a subtask, so don't merge with the original collection, that will happen at the end
+ outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=False).manifest_text()
+ else:
+ # Just a single task, so do merge with the original collection
+ outcollection = vwd.checkin(subst.do_substitution(taskp, taskp["task.vwd"]), outdir, merge=True).manifest_text()
+else:
+ outcollection = robust_put.upload(outdir, logger)
+
+api.job_tasks().update(uuid=arvados.current_task()['uuid'],
+ body={
+ 'output': outcollection,
+ 'success': (rcode == 0),
+ 'progress':1.0
+ }).execute()