// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
- PutB(buf []byte) (string, int, error)
+ BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
LocalLocator(locator string) (string, error)
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- executor containerExecutor
+ executor containerExecutor
+ executorStdin io.Closer
+ executorStdout io.Closer
+ executorStderr io.Closer
// Dispatcher client is initialized with the Dispatcher token.
// This is a privileged token used to manage container status
ExitCode *int
NewLogWriter NewLogWriter
CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
logUUID string
logMtx sync.Mutex
LogCollection arvados.CollectionFileSystem
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
- var stdin io.ReadCloser
+ var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
if mnt, ok := runner.Container.Mounts["stdin"]; ok {
switch mnt.Kind {
case "collection":
if !runner.enableMemoryLimit {
ram = 0
}
+ runner.executorStdin = stdin
+ runner.executorStdout = stdout
+ runner.executorStderr = stderr
return runner.executor.Create(containerSpec{
Image: imageID,
VCPUs: runner.Container.RuntimeConstraints.VCPUs,
}
runner.ExitCode = &exitcode
+ var returnErr error
+ if err = runner.executorStdin.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdin: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ returnErr = err
+ }
+ if err = runner.executorStdout.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdout: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+ if err = runner.executorStderr.Close(); err != nil {
+ err = fmt.Errorf("error closing container stderr: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+
if runner.statReporter != nil {
runner.statReporter.Stop()
err = runner.statLogger.Close()
runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
}
- return nil
+ return returnErr
}
func (runner *ContainerRunner) updateLogs() {
func (e *stubExecutor) Stop() error { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
func (e *stubExecutor) Close() { e.closed = true }
func (e *stubExecutor) Wait(context.Context) (int, error) {
- defer e.created.Stdout.Close()
- defer e.created.Stderr.Close()
return <-e.exit, e.waitErr
}
return locator, nil
}
-func (client *KeepTestClient) PutB(buf []byte) (string, int, error) {
- client.Content = buf
- return fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), len(buf), nil
+func (client *KeepTestClient) BlockWrite(_ context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ client.Content = opts.Data
+ return arvados.BlockWriteResponse{
+ Locator: fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)),
+ }, nil
}
func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
return nil, errors.New("KeepError")
}
-func (*KeepErrorTestClient) PutB(buf []byte) (string, int, error) {
- return "", 0, errors.New("KeepError")
+func (*KeepErrorTestClient) BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ return arvados.BlockWriteResponse{}, errors.New("KeepError")
}
func (*KeepErrorTestClient) LocalLocator(string) (string, error) {
func (s *TestSuite) TestRunContainer(c *C) {
s.executor.runFunc = func() {
fmt.Fprintf(s.executor.created.Stdout, "Hello world\n")
- s.executor.created.Stdout.Close()
- s.executor.created.Stderr.Close()
s.executor.exit <- 0
}