5cb982e1bbf9e5de77df1903f278878212a4d443
[arvados.git] / lib / crunchrun / crunchrun_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "errors"
13         "fmt"
14         "io"
15         "io/fs"
16         "io/ioutil"
17         "log"
18         "math/rand"
19         "net/http"
20         "net/http/httptest"
21         "net/http/httputil"
22         "net/url"
23         "os"
24         "os/exec"
25         "path"
26         "regexp"
27         "runtime/pprof"
28         "strings"
29         "sync"
30         "sync/atomic"
31         "syscall"
32         "testing"
33         "time"
34
35         "git.arvados.org/arvados.git/lib/cloud"
36         "git.arvados.org/arvados.git/lib/cmd"
37         "git.arvados.org/arvados.git/sdk/go/arvados"
38         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
39         "git.arvados.org/arvados.git/sdk/go/arvadostest"
40         "git.arvados.org/arvados.git/sdk/go/manifest"
41
42         . "gopkg.in/check.v1"
43 )
44
45 // Gocheck boilerplate
46 func TestCrunchExec(t *testing.T) {
47         TestingT(t)
48 }
49
50 const logLineStart = `(?m)(.*\n)*\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z `
51
52 var _ = Suite(&TestSuite{})
53
54 type TestSuite struct {
55         client                   *arvados.Client
56         api                      *ArvTestClient
57         runner                   *ContainerRunner
58         executor                 *stubExecutor
59         keepmount                string
60         keepmountTmp             []string
61         testDispatcherKeepClient KeepTestClient
62         testContainerKeepClient  KeepTestClient
63         debian12MemoryCurrent    int64
64         debian12SwapCurrent      int64
65 }
66
67 func (s *TestSuite) SetUpSuite(c *C) {
68         buf, err := os.ReadFile("../crunchstat/testdata/debian12/sys/fs/cgroup/user.slice/user-1000.slice/session-4.scope/memory.current")
69         c.Assert(err, IsNil)
70         _, err = fmt.Sscanf(string(buf), "%d", &s.debian12MemoryCurrent)
71         c.Assert(err, IsNil)
72
73         buf, err = os.ReadFile("../crunchstat/testdata/debian12/sys/fs/cgroup/user.slice/user-1000.slice/session-4.scope/memory.swap.current")
74         c.Assert(err, IsNil)
75         _, err = fmt.Sscanf(string(buf), "%d", &s.debian12SwapCurrent)
76         c.Assert(err, IsNil)
77 }
78
79 func (s *TestSuite) SetUpTest(c *C) {
80         s.client = arvados.NewClientFromEnv()
81         s.executor = &stubExecutor{}
82         var err error
83         s.api = &ArvTestClient{}
84         s.runner, err = NewContainerRunner(s.client, s.api, &s.testDispatcherKeepClient, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
85         c.Assert(err, IsNil)
86         s.runner.executor = s.executor
87         s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
88                 return s.api, &s.testContainerKeepClient, s.client, nil
89         }
90         s.runner.RunArvMount = func(cmd []string, tok string) (*exec.Cmd, error) {
91                 s.runner.ArvMountPoint = s.keepmount
92                 for i, opt := range cmd {
93                         if opt == "--mount-tmp" {
94                                 err := os.Mkdir(s.keepmount+"/"+cmd[i+1], 0700)
95                                 if err != nil {
96                                         return nil, err
97                                 }
98                                 s.keepmountTmp = append(s.keepmountTmp, cmd[i+1])
99                         }
100                 }
101                 return nil, nil
102         }
103         s.keepmount = c.MkDir()
104         err = os.Mkdir(s.keepmount+"/by_id", 0755)
105         s.keepmountTmp = nil
106         c.Assert(err, IsNil)
107         err = os.Mkdir(s.keepmount+"/by_id/"+arvadostest.DockerImage112PDH, 0755)
108         c.Assert(err, IsNil)
109         err = ioutil.WriteFile(s.keepmount+"/by_id/"+arvadostest.DockerImage112PDH+"/"+arvadostest.DockerImage112Filename, []byte("#notarealtarball"), 0644)
110         err = os.Mkdir(s.keepmount+"/by_id/"+fakeInputCollectionPDH, 0755)
111         c.Assert(err, IsNil)
112         err = ioutil.WriteFile(s.keepmount+"/by_id/"+fakeInputCollectionPDH+"/input.json", []byte(`{"input":true}`), 0644)
113         c.Assert(err, IsNil)
114         s.runner.ArvMountPoint = s.keepmount
115         os.Setenv("InstanceType", `{"ProviderType":"a1.2xlarge","Price":1.2}`)
116 }
117
118 type ArvTestClient struct {
119         Total   int64
120         Calls   int
121         Content []arvadosclient.Dict
122         arvados.Container
123         secretMounts []byte
124         Logs         map[string]*bytes.Buffer
125         sync.Mutex
126         WasSetRunning bool
127         callraw       bool
128 }
129
130 type KeepTestClient struct {
131         Called         bool
132         StorageClasses []string
133         blocks         sync.Map
134 }
135
136 type stubExecutor struct {
137         imageLoaded bool
138         loaded      string
139         loadErr     error
140         exitCode    int
141         createErr   error
142         created     containerSpec
143         startErr    error
144         waitSleep   time.Duration
145         waitErr     error
146         stopErr     error
147         stopped     bool
148         closed      bool
149         runFunc     func() int
150         exit        chan int
151 }
152
153 func (e *stubExecutor) LoadImage(imageId string, tarball string, container arvados.Container, keepMount string,
154         containerClient *arvados.Client) error {
155         e.loaded = tarball
156         return e.loadErr
157 }
158 func (e *stubExecutor) Runtime() string                 { return "stub" }
159 func (e *stubExecutor) Version() string                 { return "stub " + cmd.Version.String() }
160 func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr }
161 func (e *stubExecutor) Start() error {
162         e.exit = make(chan int, 1)
163         go func() { e.exit <- e.runFunc() }()
164         return e.startErr
165 }
166 func (e *stubExecutor) Pid() int    { return 1115883 } // matches pid in ../crunchstat/testdata/debian12/proc/
167 func (e *stubExecutor) Stop() error { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
168 func (e *stubExecutor) Close()      { e.closed = true }
169 func (e *stubExecutor) Wait(context.Context) (int, error) {
170         return <-e.exit, e.waitErr
171 }
172 func (e *stubExecutor) InjectCommand(ctx context.Context, _, _ string, _ bool, _ []string) (*exec.Cmd, error) {
173         return nil, errors.New("unimplemented")
174 }
175 func (e *stubExecutor) IPAddress() (string, error) { return "", errors.New("unimplemented") }
176
177 const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234"
178
179 var hwManifest = ". 82ab40c24fc8df01798e57ba66795bb1+841216+Aa124ac75e5168396c73c0a18eda641a4f41791c0@569fa8c3 0:841216:9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7.tar\n"
180 var hwPDH = "a45557269dcb65a6b78f9ac061c0850b+120"
181 var hwImageID = "9c31ee32b3d15268a0754e8edc74d4f815ee014b693bc5109058e431dd5caea7"
182
183 var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb1decdf60cc1c937f556d@5693216f 0:46:md5sum.txt\n"
184 var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
185
186 var normalizedManifestWithSubdirs = `. 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 0:9:file1_in_main.txt 9:18:file2_in_main.txt 0:27:zzzzz-8i9sb-bcdefghijkdhvnk.log.txt
187 ./subdir1 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
188 ./subdir1/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
189 `
190
191 var normalizedWithSubdirsPDH = "a0def87f80dd594d4675809e83bd4f15+367"
192
193 var denormalizedManifestWithSubdirs = ". 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 0:9:file1_in_main.txt 9:18:file2_in_main.txt 0:27:zzzzz-8i9sb-bcdefghijkdhvnk.log.txt 0:10:subdir1/file1_in_subdir1.txt 10:17:subdir1/file2_in_subdir1.txt\n"
194 var denormalizedWithSubdirsPDH = "b0def87f80dd594d4675809e83bd4f15+367"
195
196 var fakeAuthUUID = "zzzzz-gj3su-55pqoyepgi2glem"
197 var fakeAuthToken = "a3ltuwzqcu2u4sc0q7yhpc2w7s00fdcqecg5d6e0u3pfohmbjt"
198
199 func (client *ArvTestClient) Create(resourceType string,
200         parameters arvadosclient.Dict,
201         output interface{}) error {
202
203         client.Mutex.Lock()
204         defer client.Mutex.Unlock()
205
206         client.Calls++
207         client.Content = append(client.Content, parameters)
208
209         if resourceType == "logs" {
210                 et := parameters["log"].(arvadosclient.Dict)["event_type"].(string)
211                 if client.Logs == nil {
212                         client.Logs = make(map[string]*bytes.Buffer)
213                 }
214                 if client.Logs[et] == nil {
215                         client.Logs[et] = &bytes.Buffer{}
216                 }
217                 client.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
218         }
219
220         if resourceType == "collections" && output != nil {
221                 mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
222                 md5sum := md5.Sum([]byte(mt))
223                 outmap := output.(*arvados.Collection)
224                 outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5sum, len(mt))
225                 outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%015x", md5sum[:7])
226         }
227
228         return nil
229 }
230
231 func (client *ArvTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
232         switch {
233         case method == "GET" && resourceType == "containers" && action == "auth":
234                 return json.Unmarshal([]byte(`{
235                         "kind": "arvados#api_client_authorization",
236                         "uuid": "`+fakeAuthUUID+`",
237                         "api_token": "`+fakeAuthToken+`"
238                         }`), output)
239         case method == "GET" && resourceType == "containers" && action == "secret_mounts":
240                 if client.secretMounts != nil {
241                         return json.Unmarshal(client.secretMounts, output)
242                 }
243                 return json.Unmarshal([]byte(`{"secret_mounts":{}}`), output)
244         default:
245                 return fmt.Errorf("Not found")
246         }
247 }
248
249 func (client *ArvTestClient) CallRaw(method, resourceType, uuid, action string,
250         parameters arvadosclient.Dict) (reader io.ReadCloser, err error) {
251         var j []byte
252         if method == "GET" && resourceType == "nodes" && uuid == "" && action == "" {
253                 j = []byte(`{
254                         "kind": "arvados#nodeList",
255                         "items": [{
256                                 "uuid": "zzzzz-7ekkf-2z3mc76g2q73aio",
257                                 "hostname": "compute2",
258                                 "properties": {"total_cpu_cores": 16}
259                         }]}`)
260         } else if method == "GET" && resourceType == "containers" && action == "" && !client.callraw {
261                 if uuid == "" {
262                         j, err = json.Marshal(map[string]interface{}{
263                                 "items": []interface{}{client.Container},
264                                 "kind":  "arvados#nodeList",
265                         })
266                 } else {
267                         j, err = json.Marshal(client.Container)
268                 }
269         } else {
270                 j = []byte(`{
271                         "command": ["sleep", "1"],
272                         "container_image": "` + arvadostest.DockerImage112PDH + `",
273                         "cwd": ".",
274                         "environment": {},
275                         "mounts": {"/tmp": {"kind": "tmp"}, "/json": {"kind": "json", "content": {"number": 123456789123456789}}},
276                         "output_path": "/tmp",
277                         "priority": 1,
278                         "runtime_constraints": {}
279                 }`)
280         }
281         return ioutil.NopCloser(bytes.NewReader(j)), err
282 }
283
284 func (client *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
285         if resourceType == "collections" {
286                 if uuid == hwPDH {
287                         output.(*arvados.Collection).ManifestText = hwManifest
288                 } else if uuid == otherPDH {
289                         output.(*arvados.Collection).ManifestText = otherManifest
290                 } else if uuid == normalizedWithSubdirsPDH {
291                         output.(*arvados.Collection).ManifestText = normalizedManifestWithSubdirs
292                 } else if uuid == denormalizedWithSubdirsPDH {
293                         output.(*arvados.Collection).ManifestText = denormalizedManifestWithSubdirs
294                 }
295         }
296         if resourceType == "containers" {
297                 (*output.(*arvados.Container)) = client.Container
298         }
299         return nil
300 }
301
302 func (client *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
303         client.Mutex.Lock()
304         defer client.Mutex.Unlock()
305         client.Calls++
306         client.Content = append(client.Content, parameters)
307         if resourceType == "containers" {
308                 if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
309                         client.WasSetRunning = true
310                 }
311         } else if resourceType == "collections" && output != nil {
312                 mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
313                 output.(*arvados.Collection).UUID = uuid
314                 output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt)))
315         }
316         return nil
317 }
318
319 var discoveryMap = map[string]interface{}{
320         "defaultTrashLifetime":               float64(1209600),
321         "crunchLimitLogBytesPerJob":          float64(67108864),
322         "crunchLogThrottleBytes":             float64(65536),
323         "crunchLogThrottlePeriod":            float64(60),
324         "crunchLogThrottleLines":             float64(1024),
325         "crunchLogPartialLineThrottlePeriod": float64(5),
326         "crunchLogBytesPerEvent":             float64(4096),
327         "crunchLogSecondsBetweenEvents":      float64(1),
328 }
329
330 func (client *ArvTestClient) Discovery(key string) (interface{}, error) {
331         return discoveryMap[key], nil
332 }
333
334 // CalledWith returns the parameters from the first API call whose
335 // parameters match jpath/string. E.g., CalledWith(c, "foo.bar",
336 // "baz") returns parameters with parameters["foo"]["bar"]=="baz". If
337 // no call matches, it returns nil.
338 func (client *ArvTestClient) CalledWith(jpath string, expect interface{}) arvadosclient.Dict {
339 call:
340         for _, content := range client.Content {
341                 var v interface{} = content
342                 for _, k := range strings.Split(jpath, ".") {
343                         if dict, ok := v.(arvadosclient.Dict); !ok {
344                                 continue call
345                         } else {
346                                 v = dict[k]
347                         }
348                 }
349                 if v == expect {
350                         return content
351                 }
352         }
353         return nil
354 }
355
356 func (client *KeepTestClient) LocalLocator(locator string) (string, error) {
357         return locator, nil
358 }
359
360 func (client *KeepTestClient) BlockWrite(_ context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
361         locator := fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data))
362         client.blocks.Store(locator, append([]byte(nil), opts.Data...))
363         return arvados.BlockWriteResponse{
364                 Locator: locator,
365         }, nil
366 }
367
368 func (client *KeepTestClient) ReadAt(locator string, dst []byte, offset int) (int, error) {
369         loaded, ok := client.blocks.Load(locator)
370         if !ok {
371                 return 0, os.ErrNotExist
372         }
373         data := loaded.([]byte)
374         if offset >= len(data) {
375                 return 0, io.EOF
376         }
377         return copy(dst, data[offset:]), nil
378 }
379
380 func (client *KeepTestClient) Close() {
381         client.blocks.Range(func(locator, value interface{}) bool {
382                 client.blocks.Delete(locator)
383                 return true
384         })
385 }
386
387 func (client *KeepTestClient) SetStorageClasses(sc []string) {
388         client.StorageClasses = sc
389 }
390
391 type FileWrapper struct {
392         io.ReadCloser
393         len int64
394 }
395
396 func (fw FileWrapper) Readdir(n int) ([]os.FileInfo, error) {
397         return nil, errors.New("not implemented")
398 }
399
400 func (fw FileWrapper) Seek(int64, int) (int64, error) {
401         return 0, errors.New("not implemented")
402 }
403
404 func (fw FileWrapper) Size() int64 {
405         return fw.len
406 }
407
408 func (fw FileWrapper) Stat() (os.FileInfo, error) {
409         return nil, errors.New("not implemented")
410 }
411
412 func (fw FileWrapper) Truncate(int64) error {
413         return errors.New("not implemented")
414 }
415
416 func (fw FileWrapper) Write([]byte) (int, error) {
417         return 0, errors.New("not implemented")
418 }
419
420 func (fw FileWrapper) Sync() error {
421         return errors.New("not implemented")
422 }
423
424 func (fw FileWrapper) Snapshot() (*arvados.Subtree, error) {
425         return nil, errors.New("not implemented")
426 }
427
428 func (fw FileWrapper) Splice(*arvados.Subtree) error {
429         return errors.New("not implemented")
430 }
431
432 func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
433         if filename == hwImageID+".tar" {
434                 rdr := ioutil.NopCloser(&bytes.Buffer{})
435                 client.Called = true
436                 return FileWrapper{rdr, 1321984}, nil
437         } else if filename == "/file1_in_main.txt" {
438                 rdr := ioutil.NopCloser(strings.NewReader("foo"))
439                 client.Called = true
440                 return FileWrapper{rdr, 3}, nil
441         }
442         return nil, nil
443 }
444
445 type apiStubServer struct {
446         server    *httptest.Server
447         proxy     *httputil.ReverseProxy
448         intercept func(http.ResponseWriter, *http.Request) bool
449
450         container arvados.Container
451         logs      map[string]string
452 }
453
454 func apiStub() (*arvados.Client, *apiStubServer) {
455         client := arvados.NewClientFromEnv()
456         apistub := &apiStubServer{}
457         apistub.server = httptest.NewTLSServer(apistub)
458         apistub.proxy = httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "https", Host: client.APIHost})
459         if client.Insecure {
460                 apistub.proxy.Transport = arvados.InsecureHTTPClient.Transport
461         }
462         client.APIHost = apistub.server.Listener.Addr().String()
463         return client, apistub
464 }
465
466 func (apistub *apiStubServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
467         if apistub.intercept != nil && apistub.intercept(w, r) {
468                 return
469         }
470         if r.Method == "POST" && r.URL.Path == "/arvados/v1/logs" {
471                 var body struct {
472                         Log struct {
473                                 EventType  string `json:"event_type"`
474                                 Properties struct {
475                                         Text string
476                                 }
477                         }
478                 }
479                 json.NewDecoder(r.Body).Decode(&body)
480                 apistub.logs[body.Log.EventType] += body.Log.Properties.Text
481                 return
482         }
483         if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+hwPDH {
484                 json.NewEncoder(w).Encode(arvados.Collection{ManifestText: hwManifest})
485                 return
486         }
487         if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+otherPDH {
488                 json.NewEncoder(w).Encode(arvados.Collection{ManifestText: otherManifest})
489                 return
490         }
491         if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+normalizedWithSubdirsPDH {
492                 json.NewEncoder(w).Encode(arvados.Collection{ManifestText: normalizedManifestWithSubdirs})
493                 return
494         }
495         if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+denormalizedWithSubdirsPDH {
496                 json.NewEncoder(w).Encode(arvados.Collection{ManifestText: denormalizedManifestWithSubdirs})
497                 return
498         }
499         if r.Method == "GET" && r.URL.Path == "/arvados/v1/containers/"+apistub.container.UUID {
500                 json.NewEncoder(w).Encode(apistub.container)
501                 return
502         }
503         apistub.proxy.ServeHTTP(w, r)
504 }
505
506 func (s *TestSuite) TestLoadImage(c *C) {
507         s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
508         s.runner.Container.Mounts = map[string]arvados.Mount{
509                 "/out": {Kind: "tmp", Writable: true},
510         }
511         s.runner.Container.OutputPath = "/out"
512
513         _, err := s.runner.SetupMounts()
514         c.Assert(err, IsNil)
515
516         imageID, err := s.runner.LoadImage()
517         c.Check(err, IsNil)
518         c.Check(s.executor.loaded, Matches, ".*"+regexp.QuoteMeta(arvadostest.DockerImage112Filename))
519         c.Check(imageID, Equals, strings.TrimSuffix(arvadostest.DockerImage112Filename, ".tar"))
520
521         s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
522         s.executor.imageLoaded = false
523         s.executor.loaded = ""
524         s.executor.loadErr = errors.New("bork")
525         imageID, err = s.runner.LoadImage()
526         c.Check(err, ErrorMatches, ".*bork")
527         c.Check(s.executor.loaded, Matches, ".*"+regexp.QuoteMeta(arvadostest.DockerImage112Filename))
528
529         s.runner.Container.ContainerImage = fakeInputCollectionPDH
530         s.executor.imageLoaded = false
531         s.executor.loaded = ""
532         s.executor.loadErr = nil
533         imageID, err = s.runner.LoadImage()
534         c.Check(err, ErrorMatches, "image collection does not include a \\.tar image file")
535         c.Check(s.executor.loaded, Equals, "")
536 }
537
538 type ArvErrorTestClient struct{}
539
540 func (ArvErrorTestClient) Create(resourceType string,
541         parameters arvadosclient.Dict,
542         output interface{}) error {
543         return nil
544 }
545
546 func (ArvErrorTestClient) Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error {
547         if method == "GET" && resourceType == "containers" && action == "auth" {
548                 return nil
549         }
550         return errors.New("ArvError")
551 }
552
553 func (ArvErrorTestClient) CallRaw(method, resourceType, uuid, action string,
554         parameters arvadosclient.Dict) (reader io.ReadCloser, err error) {
555         return nil, errors.New("ArvError")
556 }
557
558 func (ArvErrorTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
559         return errors.New("ArvError")
560 }
561
562 func (ArvErrorTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
563         return nil
564 }
565
566 func (ArvErrorTestClient) Discovery(key string) (interface{}, error) {
567         return discoveryMap[key], nil
568 }
569
570 type KeepErrorTestClient struct {
571         KeepTestClient
572 }
573
574 func (*KeepErrorTestClient) ManifestFileReader(manifest.Manifest, string) (arvados.File, error) {
575         return nil, errors.New("KeepError")
576 }
577
578 func (*KeepErrorTestClient) BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
579         return arvados.BlockWriteResponse{}, errors.New("KeepError")
580 }
581
582 func (*KeepErrorTestClient) LocalLocator(string) (string, error) {
583         return "", errors.New("KeepError")
584 }
585
586 type KeepReadErrorTestClient struct {
587         KeepTestClient
588 }
589
590 func (*KeepReadErrorTestClient) ReadAt(string, []byte, int) (int, error) {
591         return 0, errors.New("KeepError")
592 }
593
594 type ErrorReader struct {
595         FileWrapper
596 }
597
598 func (ErrorReader) Read(p []byte) (n int, err error) {
599         return 0, errors.New("ErrorReader")
600 }
601
602 func (ErrorReader) Seek(int64, int) (int64, error) {
603         return 0, errors.New("ErrorReader")
604 }
605
606 func (*KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
607         return ErrorReader{}, nil
608 }
609
610 type ClosableBuffer struct {
611         bytes.Buffer
612 }
613
614 func (*ClosableBuffer) Close() error {
615         return nil
616 }
617
618 type TestLogs struct {
619         Stdout ClosableBuffer
620         Stderr ClosableBuffer
621 }
622
623 func (tl *TestLogs) NewTestLoggingWriter(logstr string) (io.WriteCloser, error) {
624         if logstr == "stdout" {
625                 return &tl.Stdout, nil
626         }
627         if logstr == "stderr" {
628                 return &tl.Stderr, nil
629         }
630         return nil, errors.New("???")
631 }
632
633 func dockerLog(fd byte, msg string) []byte {
634         by := []byte(msg)
635         header := make([]byte, 8+len(by))
636         header[0] = fd
637         header[7] = byte(len(by))
638         copy(header[8:], by)
639         return header
640 }
641
642 func (s *TestSuite) TestRunContainer(c *C) {
643         s.executor.runFunc = func() int {
644                 fmt.Fprintf(s.executor.created.Stdout, "Hello world\n")
645                 return 0
646         }
647
648         var logs TestLogs
649         s.runner.NewLogWriter = logs.NewTestLoggingWriter
650         s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
651         s.runner.Container.Command = []string{"./hw"}
652         s.runner.Container.OutputStorageClasses = []string{"default"}
653
654         imageID, err := s.runner.LoadImage()
655         c.Assert(err, IsNil)
656
657         err = s.runner.CreateContainer(imageID, nil)
658         c.Assert(err, IsNil)
659
660         err = s.runner.StartContainer()
661         c.Assert(err, IsNil)
662
663         err = s.runner.WaitFinish()
664         c.Assert(err, IsNil)
665
666         c.Check(logs.Stdout.String(), Matches, ".*Hello world\n")
667         c.Check(logs.Stderr.String(), Equals, "")
668 }
669
670 func (s *TestSuite) TestCommitLogs(c *C) {
671         api := &ArvTestClient{}
672         kc := &KeepTestClient{}
673         defer kc.Close()
674         cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
675         c.Assert(err, IsNil)
676         cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
677
678         cr.CrunchLog.Print("Hello world!")
679         cr.CrunchLog.Print("Goodbye")
680         cr.finalState = "Complete"
681
682         err = cr.CommitLogs()
683         c.Check(err, IsNil)
684
685         c.Check(api.Calls, Equals, 2)
686         c.Check(api.Content[1]["ensure_unique_name"], Equals, true)
687         c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
688         c.Check(api.Content[1]["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
689         c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60")
690 }
691
692 func (s *TestSuite) TestUpdateContainerRunning(c *C) {
693         api := &ArvTestClient{}
694         kc := &KeepTestClient{}
695         defer kc.Close()
696         cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
697         c.Assert(err, IsNil)
698
699         err = cr.UpdateContainerRunning("")
700         c.Check(err, IsNil)
701
702         c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
703 }
704
705 func (s *TestSuite) TestUpdateContainerComplete(c *C) {
706         api := &ArvTestClient{}
707         kc := &KeepTestClient{}
708         defer kc.Close()
709         cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
710         c.Assert(err, IsNil)
711
712         cr.LogsPDH = new(string)
713         *cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
714
715         cr.ExitCode = new(int)
716         *cr.ExitCode = 42
717         cr.finalState = "Complete"
718
719         err = cr.UpdateContainerFinal()
720         c.Check(err, IsNil)
721
722         c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
723         c.Check(api.Content[0]["container"].(arvadosclient.Dict)["exit_code"], Equals, *cr.ExitCode)
724         c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
725 }
726
727 func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
728         api := &ArvTestClient{}
729         kc := &KeepTestClient{}
730         defer kc.Close()
731         cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
732         c.Assert(err, IsNil)
733         cr.cCancelled = true
734         cr.finalState = "Cancelled"
735
736         err = cr.UpdateContainerFinal()
737         c.Check(err, IsNil)
738
739         c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], IsNil)
740         c.Check(api.Content[0]["container"].(arvadosclient.Dict)["exit_code"], IsNil)
741         c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
742 }
743
744 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
745 // dress rehearsal of the Run() function, starting from a JSON container record.
746 func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn func() int) (*ArvTestClient, *ContainerRunner, string) {
747         err := json.Unmarshal([]byte(record), &s.api.Container)
748         c.Assert(err, IsNil)
749         initialState := s.api.Container.State
750
751         var sm struct {
752                 SecretMounts map[string]arvados.Mount `json:"secret_mounts"`
753         }
754         err = json.Unmarshal([]byte(record), &sm)
755         c.Check(err, IsNil)
756         secretMounts, err := json.Marshal(sm)
757         c.Assert(err, IsNil)
758         c.Logf("SecretMounts decoded %v json %q", sm, secretMounts)
759
760         s.executor.runFunc = fn
761
762         s.runner.statInterval = 100 * time.Millisecond
763         s.runner.containerWatchdogInterval = time.Second
764
765         realTemp := c.MkDir()
766         tempcount := 0
767         s.runner.MkTempDir = func(_, prefix string) (string, error) {
768                 tempcount++
769                 d := fmt.Sprintf("%s/%s%d", realTemp, prefix, tempcount)
770                 err := os.Mkdir(d, os.ModePerm)
771                 if err != nil && strings.Contains(err.Error(), ": file exists") {
772                         // Test case must have pre-populated the tempdir
773                         err = nil
774                 }
775                 return d, err
776         }
777         client, _ := apiStub()
778         s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
779                 return &ArvTestClient{secretMounts: secretMounts}, &s.testContainerKeepClient, client, nil
780         }
781
782         if extraMounts != nil && len(extraMounts) > 0 {
783                 err := s.runner.SetupArvMountPoint("keep")
784                 c.Check(err, IsNil)
785
786                 for _, m := range extraMounts {
787                         os.MkdirAll(s.runner.ArvMountPoint+"/by_id/"+m, os.ModePerm)
788                 }
789         }
790
791         err = s.runner.Run()
792         if s.api.CalledWith("container.state", "Complete") != nil {
793                 c.Check(err, IsNil)
794         }
795         if s.executor.loadErr == nil && s.executor.createErr == nil && initialState != "Running" {
796                 c.Check(s.api.WasSetRunning, Equals, true)
797                 var lastupdate arvadosclient.Dict
798                 for _, content := range s.api.Content {
799                         if content["container"] != nil {
800                                 lastupdate = content["container"].(arvadosclient.Dict)
801                         }
802                 }
803                 if lastupdate["log"] == nil {
804                         c.Errorf("no container update with non-nil log -- updates were: %v", s.api.Content)
805                 }
806         }
807
808         if err != nil {
809                 for k, v := range s.api.Logs {
810                         c.Log(k)
811                         c.Log(v.String())
812                 }
813         }
814
815         return s.api, s.runner, realTemp
816 }
817
818 func (s *TestSuite) TestFullRunHello(c *C) {
819         s.runner.enableMemoryLimit = true
820         s.runner.networkMode = "default"
821         s.fullRunHelper(c, `{
822     "command": ["echo", "hello world"],
823     "container_image": "`+arvadostest.DockerImage112PDH+`",
824     "cwd": ".",
825     "environment": {"foo":"bar","baz":"waz"},
826     "mounts": {"/tmp": {"kind": "tmp"} },
827     "output_path": "/tmp",
828     "priority": 1,
829     "runtime_constraints": {"vcpus":1,"ram":1000000},
830     "state": "Locked",
831     "output_storage_classes": ["default"]
832 }`, nil, func() int {
833                 c.Check(s.executor.created.Command, DeepEquals, []string{"echo", "hello world"})
834                 c.Check(s.executor.created.Image, Equals, "sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678")
835                 c.Check(s.executor.created.Env, DeepEquals, map[string]string{"foo": "bar", "baz": "waz"})
836                 c.Check(s.executor.created.VCPUs, Equals, 1)
837                 c.Check(s.executor.created.RAM, Equals, int64(1000000))
838                 c.Check(s.executor.created.NetworkMode, Equals, "default")
839                 c.Check(s.executor.created.EnableNetwork, Equals, false)
840                 c.Check(s.executor.created.CUDADeviceCount, Equals, 0)
841                 fmt.Fprintln(s.executor.created.Stdout, "hello world")
842                 return 0
843         })
844
845         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
846         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
847         c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello world\n")
848         c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"default"})
849         c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"default"})
850 }
851
852 func (s *TestSuite) TestRunAlreadyRunning(c *C) {
853         var ran bool
854         s.fullRunHelper(c, `{
855     "command": ["sleep", "3"],
856     "container_image": "`+arvadostest.DockerImage112PDH+`",
857     "cwd": ".",
858     "environment": {},
859     "mounts": {"/tmp": {"kind": "tmp"} },
860     "output_path": "/tmp",
861     "priority": 1,
862     "runtime_constraints": {},
863     "scheduling_parameters":{"max_run_time": 1},
864     "state": "Running"
865 }`, nil, func() int {
866                 ran = true
867                 return 2
868         })
869         c.Check(s.api.CalledWith("container.state", "Cancelled"), IsNil)
870         c.Check(s.api.CalledWith("container.state", "Complete"), IsNil)
871         c.Check(ran, Equals, false)
872 }
873
874 func ec2MetadataServerStub(c *C, token *string, failureRate float64, stoptime *atomic.Value) *httptest.Server {
875         failedOnce := false
876         return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
877                 if !failedOnce || rand.Float64() < failureRate {
878                         w.WriteHeader(http.StatusServiceUnavailable)
879                         failedOnce = true
880                         return
881                 }
882                 switch r.URL.Path {
883                 case "/latest/api/token":
884                         fmt.Fprintln(w, *token)
885                 case "/latest/meta-data/spot/instance-action":
886                         if r.Header.Get("X-aws-ec2-metadata-token") != *token {
887                                 w.WriteHeader(http.StatusUnauthorized)
888                         } else if t, _ := stoptime.Load().(time.Time); t.IsZero() {
889                                 w.WriteHeader(http.StatusNotFound)
890                         } else {
891                                 fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, t.Format(time.RFC3339))
892                         }
893                 default:
894                         w.WriteHeader(http.StatusNotFound)
895                 }
896         }))
897 }
898
899 func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
900         s.testSpotInterruptionNotice(c, 0.1)
901 }
902
903 func (s *TestSuite) TestSpotInterruptionNoticeNotAvailable(c *C) {
904         s.testSpotInterruptionNotice(c, 1)
905 }
906
907 func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
908         var stoptime atomic.Value
909         token := "fake-ec2-metadata-token"
910         stub := ec2MetadataServerStub(c, &token, failureRate, &stoptime)
911         defer stub.Close()
912
913         defer func(i time.Duration, u string) {
914                 spotInterruptionCheckInterval = i
915                 ec2MetadataBaseURL = u
916         }(spotInterruptionCheckInterval, ec2MetadataBaseURL)
917         spotInterruptionCheckInterval = time.Second / 8
918         ec2MetadataBaseURL = stub.URL
919
920         go s.runner.checkSpotInterruptionNotices()
921         s.fullRunHelper(c, `{
922     "command": ["sleep", "3"],
923     "container_image": "`+arvadostest.DockerImage112PDH+`",
924     "cwd": ".",
925     "environment": {},
926     "mounts": {"/tmp": {"kind": "tmp"} },
927     "output_path": "/tmp",
928     "priority": 1,
929     "runtime_constraints": {},
930     "state": "Locked"
931 }`, nil, func() int {
932                 time.Sleep(time.Second)
933                 stoptime.Store(time.Now().Add(time.Minute).UTC())
934                 token = "different-fake-ec2-metadata-token"
935                 time.Sleep(time.Second)
936                 return 0
937         })
938         c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
939         c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
940         if failureRate == 1 {
941                 c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
942         } else {
943                 text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
944                 c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*`+text+`.*`)
945                 c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
946                 c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
947                 c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
948         }
949 }
950
951 func (s *TestSuite) TestRunTimeExceeded(c *C) {
952         s.fullRunHelper(c, `{
953     "command": ["sleep", "3"],
954     "container_image": "`+arvadostest.DockerImage112PDH+`",
955     "cwd": ".",
956     "environment": {},
957     "mounts": {"/tmp": {"kind": "tmp"} },
958     "output_path": "/tmp",
959     "priority": 1,
960     "runtime_constraints": {},
961     "scheduling_parameters":{"max_run_time": 1},
962     "state": "Locked"
963 }`, nil, func() int {
964                 time.Sleep(3 * time.Second)
965                 return 0
966         })
967
968         c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
969         c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
970 }
971
972 func (s *TestSuite) TestContainerWaitFails(c *C) {
973         s.fullRunHelper(c, `{
974     "command": ["sleep", "3"],
975     "container_image": "`+arvadostest.DockerImage112PDH+`",
976     "cwd": ".",
977     "mounts": {"/tmp": {"kind": "tmp"} },
978     "output_path": "/tmp",
979     "priority": 1,
980     "state": "Locked"
981 }`, nil, func() int {
982                 s.executor.waitErr = errors.New("Container is not running")
983                 return 0
984         })
985
986         c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
987         c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Container is not running.*")
988 }
989
990 func (s *TestSuite) TestCrunchstat(c *C) {
991         s.runner.crunchstatFakeFS = os.DirFS("../crunchstat/testdata/debian12")
992         s.fullRunHelper(c, `{
993                 "command": ["sleep", "1"],
994                 "container_image": "`+arvadostest.DockerImage112PDH+`",
995                 "cwd": ".",
996                 "environment": {},
997                 "mounts": {"/tmp": {"kind": "tmp"} },
998                 "output_path": "/tmp",
999                 "priority": 1,
1000                 "runtime_constraints": {},
1001                 "state": "Locked"
1002         }`, nil, func() int {
1003                 time.Sleep(time.Second)
1004                 return 0
1005         })
1006
1007         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1008         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1009
1010         c.Assert(s.api.Logs["crunchstat"], NotNil)
1011         c.Check(s.api.Logs["crunchstat"].String(), Matches, `(?ms).*mem \d+ swap \d+ pgmajfault \d+ rss.*`)
1012
1013         // Check that we called (*crunchstat.Reporter)Stop().
1014         c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Maximum crunch-run memory rss usage was \d+ bytes\n.*`)
1015 }
1016
1017 func (s *TestSuite) TestNodeInfoLog(c *C) {
1018         s.fullRunHelper(c, `{
1019                 "command": ["true"],
1020                 "container_image": "`+arvadostest.DockerImage112PDH+`",
1021                 "cwd": ".",
1022                 "environment": {},
1023                 "mounts": {"/tmp": {"kind": "tmp"} },
1024                 "output_path": "/tmp",
1025                 "priority": 1,
1026                 "runtime_constraints": {},
1027                 "state": "Locked"
1028         }`, nil, func() int {
1029                 return 0
1030         })
1031
1032         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1033         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1034
1035         buf, err := fs.ReadFile(arvados.FS(s.runner.LogCollection), "/node.json")
1036         c.Assert(err, IsNil)
1037         json := string(buf)
1038         c.Check(json, Matches, `(?ms).*"ProviderType": *"a1\.2xlarge".*`)
1039         c.Check(json, Matches, `(?ms).*"Price": *1\.2.*`)
1040
1041         c.Assert(s.api.Logs["node-info"], NotNil)
1042         json = s.api.Logs["node-info"].String()
1043         c.Check(json, Matches, `(?ms).*Host Information.*`)
1044         c.Check(json, Matches, `(?ms).*CPU Information.*`)
1045         c.Check(json, Matches, `(?ms).*Memory Information.*`)
1046         c.Check(json, Matches, `(?ms).*Disk Space.*`)
1047         c.Check(json, Matches, `(?ms).*Disk INodes.*`)
1048 }
1049
1050 func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
1051         s.fullRunHelper(c, `{
1052                 "command": ["sleep", "1"],
1053                 "container_image": "`+arvadostest.DockerImage112PDH+`",
1054                 "cwd": ".",
1055                 "environment": {},
1056                 "mounts": {"/tmp": {"kind": "tmp"} },
1057                 "output_path": "/tmp",
1058                 "priority": 1,
1059                 "runtime_constraints": {},
1060                 "state": "Locked"
1061         }`, nil, func() int {
1062                 return 0
1063         })
1064
1065         c.Assert(s.api.Logs["crunch-run"], NotNil)
1066         c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
1067         c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
1068         c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
1069         c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
1070 }
1071
1072 func (s *TestSuite) testLogRSSThresholds(c *C, ram int64, expected []int, notExpected int) {
1073         s.runner.crunchstatFakeFS = os.DirFS("../crunchstat/testdata/debian12")
1074         s.fullRunHelper(c, `{
1075                 "command": ["true"],
1076                 "container_image": "`+arvadostest.DockerImage112PDH+`",
1077                 "cwd": ".",
1078                 "environment": {},
1079                 "mounts": {"/tmp": {"kind": "tmp"} },
1080                 "output_path": "/tmp",
1081                 "priority": 1,
1082                 "runtime_constraints": {"ram": `+fmt.Sprintf("%d", ram)+`},
1083                 "state": "Locked"
1084         }`, nil, func() int { return 0 })
1085         c.Logf("=== crunchstat logs\n%s\n", s.api.Logs["crunchstat"].String())
1086         logs := s.api.Logs["crunch-run"].String()
1087         pattern := logLineStart + `Container using over %d%% of memory \(rss %d/%d bytes\)`
1088         var threshold int
1089         for _, threshold = range expected {
1090                 c.Check(logs, Matches, fmt.Sprintf(pattern, threshold, s.debian12MemoryCurrent, ram))
1091         }
1092         if notExpected > threshold {
1093                 c.Check(logs, Not(Matches), fmt.Sprintf(pattern, notExpected, s.debian12MemoryCurrent, ram))
1094         }
1095 }
1096
1097 func (s *TestSuite) TestLogNoRSSThresholds(c *C) {
1098         s.testLogRSSThresholds(c, s.debian12MemoryCurrent*10, []int{}, 90)
1099 }
1100
1101 func (s *TestSuite) TestLogSomeRSSThresholds(c *C) {
1102         onePercentRSS := s.debian12MemoryCurrent / 100
1103         s.testLogRSSThresholds(c, 102*onePercentRSS, []int{90, 95}, 99)
1104 }
1105
1106 func (s *TestSuite) TestLogAllRSSThresholds(c *C) {
1107         s.testLogRSSThresholds(c, s.debian12MemoryCurrent, []int{90, 95, 99}, 0)
1108 }
1109
1110 func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
1111         s.runner.crunchstatFakeFS = os.DirFS("../crunchstat/testdata/debian12")
1112         s.runner.parentTemp = c.MkDir()
1113         s.fullRunHelper(c, `{
1114         "command": ["true"],
1115         "container_image": "`+arvadostest.DockerImage112PDH+`",
1116         "cwd": ".",
1117         "environment": {},
1118         "mounts": {"/tmp": {"kind": "tmp"} },
1119         "output_path": "/tmp",
1120         "priority": 1,
1121         "runtime_constraints": {"ram": `+fmt.Sprintf("%d", s.debian12MemoryCurrent*10)+`},
1122         "state": "Locked"
1123     }`, nil, func() int { return 0 })
1124         logs := s.api.Logs["crunch-run"].String()
1125         for _, expected := range []string{
1126                 `Maximum disk usage was \d+%, \d+/\d+ bytes`,
1127                 fmt.Sprintf(`Maximum container memory swap usage was %d bytes`, s.debian12SwapCurrent),
1128                 `Maximum container memory pgmajfault usage was \d+ faults`,
1129                 fmt.Sprintf(`Maximum container memory rss usage was 10%%, %d/%d bytes`, s.debian12MemoryCurrent, s.debian12MemoryCurrent*10),
1130                 `Maximum crunch-run memory rss usage was \d+ bytes`,
1131         } {
1132                 c.Check(logs, Matches, logLineStart+expected)
1133         }
1134 }
1135
1136 func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
1137         var collection_create, container_update arvadosclient.Dict
1138         s.fullRunHelper(c, `{
1139                 "command": ["true"],
1140                 "container_image": "`+arvadostest.DockerImage112PDH+`",
1141                 "cwd": ".",
1142                 "environment": {},
1143                 "mounts": {"/tmp": {"kind": "tmp"} },
1144                 "output_path": "/tmp",
1145                 "priority": 1,
1146                 "runtime_constraints": {},
1147                 "state": "Locked",
1148                 "uuid": "zzzzz-dz642-202301121543210"
1149         }`, nil, func() int {
1150                 collection_create = s.api.CalledWith("ensure_unique_name", true)
1151                 container_update = s.api.CalledWith("container.state", "Running")
1152                 return 0
1153         })
1154
1155         c.Assert(collection_create, NotNil)
1156         log_collection := collection_create["collection"].(arvadosclient.Dict)
1157         c.Check(log_collection["name"], Equals, "logs for zzzzz-dz642-202301121543210")
1158         manifest_text := log_collection["manifest_text"].(string)
1159         // We check that the file size is at least two digits as an easy way to
1160         // check the file isn't empty.
1161         c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node-info\.txt( .+)?\n`)
1162         c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node\.json( .+)?\n`)
1163
1164         c.Assert(container_update, NotNil)
1165         // As of Arvados 2.5.0, the container update must specify its log in PDH
1166         // format for the API server to propagate it to container requests, which
1167         // is what we care about for this test.
1168         expect_pdh := fmt.Sprintf("%x+%d", md5.Sum([]byte(manifest_text)), len(manifest_text))
1169         c.Check(container_update["container"].(arvadosclient.Dict)["log"], Equals, expect_pdh)
1170 }
1171
1172 func (s *TestSuite) TestContainerRecordLog(c *C) {
1173         s.fullRunHelper(c, `{
1174                 "command": ["sleep", "1"],
1175                 "container_image": "`+arvadostest.DockerImage112PDH+`",
1176                 "cwd": ".",
1177                 "environment": {},
1178                 "mounts": {"/tmp": {"kind": "tmp"} },
1179                 "output_path": "/tmp",
1180                 "priority": 1,
1181                 "runtime_constraints": {},
1182                 "state": "Locked"
1183         }`, nil,
1184                 func() int {
1185                         time.Sleep(time.Second)
1186                         return 0
1187                 })
1188
1189         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1190         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1191
1192         c.Assert(s.api.Logs["container"], NotNil)
1193         c.Check(s.api.Logs["container"].String(), Matches, `(?ms).*container_image.*`)
1194 }
1195
1196 func (s *TestSuite) TestFullRunStderr(c *C) {
1197         s.fullRunHelper(c, `{
1198     "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
1199     "container_image": "`+arvadostest.DockerImage112PDH+`",
1200     "cwd": ".",
1201     "environment": {},
1202     "mounts": {"/tmp": {"kind": "tmp"} },
1203     "output_path": "/tmp",
1204     "priority": 1,
1205     "runtime_constraints": {},
1206     "state": "Locked"
1207 }`, nil, func() int {
1208                 fmt.Fprintln(s.executor.created.Stdout, "hello")
1209                 fmt.Fprintln(s.executor.created.Stderr, "world")
1210                 return 1
1211         })
1212
1213         final := s.api.CalledWith("container.state", "Complete")
1214         c.Assert(final, NotNil)
1215         c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
1216         c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
1217
1218         c.Check(s.api.Logs["stdout"].String(), Matches, ".*hello\n")
1219         c.Check(s.api.Logs["stderr"].String(), Matches, ".*world\n")
1220 }
1221
1222 func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
1223         s.fullRunHelper(c, `{
1224     "command": ["pwd"],
1225     "container_image": "`+arvadostest.DockerImage112PDH+`",
1226     "cwd": ".",
1227     "environment": {},
1228     "mounts": {"/tmp": {"kind": "tmp"} },
1229     "output_path": "/tmp",
1230     "priority": 1,
1231     "runtime_constraints": {},
1232     "state": "Locked"
1233 }`, nil, func() int {
1234                 fmt.Fprintf(s.executor.created.Stdout, "workdir=%q", s.executor.created.WorkingDir)
1235                 return 0
1236         })
1237
1238         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1239         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1240         c.Log(s.api.Logs["stdout"])
1241         c.Check(s.api.Logs["stdout"].String(), Matches, `.*workdir=""\n`)
1242 }
1243
1244 func (s *TestSuite) TestFullRunSetCwd(c *C) {
1245         s.fullRunHelper(c, `{
1246     "command": ["pwd"],
1247     "container_image": "`+arvadostest.DockerImage112PDH+`",
1248     "cwd": "/bin",
1249     "environment": {},
1250     "mounts": {"/tmp": {"kind": "tmp"} },
1251     "output_path": "/tmp",
1252     "priority": 1,
1253     "runtime_constraints": {},
1254     "state": "Locked"
1255 }`, nil, func() int {
1256                 fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir)
1257                 return 0
1258         })
1259
1260         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1261         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1262         c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n")
1263 }
1264
1265 func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) {
1266         s.fullRunHelper(c, `{
1267     "command": ["pwd"],
1268     "container_image": "`+arvadostest.DockerImage112PDH+`",
1269     "cwd": "/bin",
1270     "environment": {},
1271     "mounts": {"/tmp": {"kind": "tmp"} },
1272     "output_path": "/tmp",
1273     "priority": 1,
1274     "runtime_constraints": {},
1275     "state": "Locked",
1276     "output_storage_classes": ["foo", "bar"]
1277 }`, nil, func() int {
1278                 fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir)
1279                 return 0
1280         })
1281
1282         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1283         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1284         c.Check(s.api.Logs["stdout"].String(), Matches, ".*/bin\n")
1285         c.Check(s.testDispatcherKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"})
1286         c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"})
1287 }
1288
1289 func (s *TestSuite) TestEnableCUDADeviceCount(c *C) {
1290         s.fullRunHelper(c, `{
1291     "command": ["pwd"],
1292     "container_image": "`+arvadostest.DockerImage112PDH+`",
1293     "cwd": "/bin",
1294     "environment": {},
1295     "mounts": {"/tmp": {"kind": "tmp"} },
1296     "output_path": "/tmp",
1297     "priority": 1,
1298     "runtime_constraints": {"cuda": {"device_count": 2}},
1299     "state": "Locked",
1300     "output_storage_classes": ["foo", "bar"]
1301 }`, nil, func() int {
1302                 fmt.Fprintln(s.executor.created.Stdout, "ok")
1303                 return 0
1304         })
1305         c.Check(s.executor.created.CUDADeviceCount, Equals, 2)
1306 }
1307
1308 func (s *TestSuite) TestEnableCUDAHardwareCapability(c *C) {
1309         s.fullRunHelper(c, `{
1310     "command": ["pwd"],
1311     "container_image": "`+arvadostest.DockerImage112PDH+`",
1312     "cwd": "/bin",
1313     "environment": {},
1314     "mounts": {"/tmp": {"kind": "tmp"} },
1315     "output_path": "/tmp",
1316     "priority": 1,
1317     "runtime_constraints": {"cuda": {"hardware_capability": "foo"}},
1318     "state": "Locked",
1319     "output_storage_classes": ["foo", "bar"]
1320 }`, nil, func() int {
1321                 fmt.Fprintln(s.executor.created.Stdout, "ok")
1322                 return 0
1323         })
1324         c.Check(s.executor.created.CUDADeviceCount, Equals, 0)
1325 }
1326
1327 func (s *TestSuite) TestStopOnSignal(c *C) {
1328         s.executor.runFunc = func() int {
1329                 s.executor.created.Stdout.Write([]byte("foo\n"))
1330                 s.runner.SigChan <- syscall.SIGINT
1331                 time.Sleep(10 * time.Second)
1332                 return 0
1333         }
1334         s.testStopContainer(c)
1335 }
1336
1337 func (s *TestSuite) TestStopOnArvMountDeath(c *C) {
1338         s.executor.runFunc = func() int {
1339                 s.executor.created.Stdout.Write([]byte("foo\n"))
1340                 s.runner.ArvMountExit <- nil
1341                 close(s.runner.ArvMountExit)
1342                 time.Sleep(10 * time.Second)
1343                 return 0
1344         }
1345         s.runner.ArvMountExit = make(chan error)
1346         s.testStopContainer(c)
1347 }
1348
1349 func (s *TestSuite) testStopContainer(c *C) {
1350         record := `{
1351     "command": ["/bin/sh", "-c", "echo foo && sleep 30 && echo bar"],
1352     "container_image": "` + arvadostest.DockerImage112PDH + `",
1353     "cwd": ".",
1354     "environment": {},
1355     "mounts": {"/tmp": {"kind": "tmp"} },
1356     "output_path": "/tmp",
1357     "priority": 1,
1358     "runtime_constraints": {},
1359     "state": "Locked"
1360 }`
1361
1362         err := json.Unmarshal([]byte(record), &s.api.Container)
1363         c.Assert(err, IsNil)
1364
1365         s.runner.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
1366         s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
1367                 return &ArvTestClient{}, &KeepTestClient{}, nil, nil
1368         }
1369
1370         done := make(chan error)
1371         go func() {
1372                 done <- s.runner.Run()
1373         }()
1374         select {
1375         case <-time.After(20 * time.Second):
1376                 pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1377                 c.Fatal("timed out")
1378         case err = <-done:
1379                 c.Check(err, IsNil)
1380         }
1381         for k, v := range s.api.Logs {
1382                 c.Log(k)
1383                 c.Log(v.String(), "\n")
1384         }
1385
1386         c.Check(s.api.CalledWith("container.log", nil), NotNil)
1387         c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
1388         c.Check(s.api.Logs["stdout"].String(), Matches, "(?ms).*foo\n$")
1389 }
1390
1391 func (s *TestSuite) TestFullRunSetEnv(c *C) {
1392         s.fullRunHelper(c, `{
1393     "command": ["/bin/sh", "-c", "echo $FROBIZ"],
1394     "container_image": "`+arvadostest.DockerImage112PDH+`",
1395     "cwd": "/bin",
1396     "environment": {"FROBIZ": "bilbo"},
1397     "mounts": {"/tmp": {"kind": "tmp"} },
1398     "output_path": "/tmp",
1399     "priority": 1,
1400     "runtime_constraints": {},
1401     "state": "Locked"
1402 }`, nil, func() int {
1403                 fmt.Fprintf(s.executor.created.Stdout, "%v", s.executor.created.Env)
1404                 return 0
1405         })
1406
1407         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1408         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1409         c.Check(s.api.Logs["stdout"].String(), Matches, `.*map\[FROBIZ:bilbo\]\n`)
1410 }
1411
1412 type ArvMountCmdLine struct {
1413         Cmd   []string
1414         token string
1415 }
1416
1417 func (am *ArvMountCmdLine) ArvMountTest(c []string, token string) (*exec.Cmd, error) {
1418         am.Cmd = c
1419         am.token = token
1420         return nil, nil
1421 }
1422
1423 func stubCert(c *C, temp string) string {
1424         path := temp + "/ca-certificates.crt"
1425         err := os.WriteFile(path, []byte{}, 0666)
1426         c.Assert(err, IsNil)
1427         os.Setenv("SSL_CERT_FILE", path)
1428         return path
1429 }
1430
1431 func (s *TestSuite) TestSetupMounts(c *C) {
1432         cr := s.runner
1433         am := &ArvMountCmdLine{}
1434         cr.RunArvMount = am.ArvMountTest
1435         cr.containerClient, _ = apiStub()
1436         cr.ContainerArvClient = &ArvTestClient{}
1437         cr.ContainerKeepClient = &KeepTestClient{}
1438         cr.Container.OutputStorageClasses = []string{"default"}
1439
1440         realTemp := c.MkDir()
1441         certTemp := c.MkDir()
1442         stubCertPath := stubCert(c, certTemp)
1443         cr.parentTemp = realTemp
1444
1445         i := 0
1446         cr.MkTempDir = func(_ string, prefix string) (string, error) {
1447                 i++
1448                 d := fmt.Sprintf("%s/%s%d", realTemp, prefix, i)
1449                 err := os.Mkdir(d, os.ModePerm)
1450                 if err != nil && strings.Contains(err.Error(), ": file exists") {
1451                         // Test case must have pre-populated the tempdir
1452                         err = nil
1453                 }
1454                 return d, err
1455         }
1456
1457         checkEmpty := func() {
1458                 // Should be deleted.
1459                 _, err := os.Stat(realTemp)
1460                 c.Assert(os.IsNotExist(err), Equals, true)
1461
1462                 // Now recreate it for the next test.
1463                 c.Assert(os.Mkdir(realTemp, 0777), IsNil)
1464         }
1465
1466         {
1467                 i = 0
1468                 cr.ArvMountPoint = ""
1469                 cr.Container.Mounts = make(map[string]arvados.Mount)
1470                 cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
1471                 cr.Container.OutputPath = "/tmp"
1472                 cr.statInterval = 5 * time.Second
1473                 bindmounts, err := cr.SetupMounts()
1474                 c.Check(err, IsNil)
1475                 c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
1476                         "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
1477                         "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
1478                 c.Check(bindmounts, DeepEquals, map[string]bindmount{"/tmp": {realTemp + "/tmp2", false}})
1479                 os.RemoveAll(cr.ArvMountPoint)
1480                 cr.CleanupDirs()
1481                 checkEmpty()
1482         }
1483
1484         {
1485                 i = 0
1486                 cr.ArvMountPoint = ""
1487                 cr.Container.Mounts = make(map[string]arvados.Mount)
1488                 cr.Container.Mounts["/out"] = arvados.Mount{Kind: "tmp"}
1489                 cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
1490                 cr.Container.OutputPath = "/out"
1491                 cr.Container.OutputStorageClasses = []string{"foo", "bar"}
1492
1493                 bindmounts, err := cr.SetupMounts()
1494                 c.Check(err, IsNil)
1495                 c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
1496                         "--read-write", "--storage-classes", "foo,bar", "--crunchstat-interval=5",
1497                         "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
1498                 c.Check(bindmounts, DeepEquals, map[string]bindmount{"/out": {realTemp + "/tmp2", false}, "/tmp": {realTemp + "/tmp3", false}})
1499                 os.RemoveAll(cr.ArvMountPoint)
1500                 cr.CleanupDirs()
1501                 checkEmpty()
1502         }
1503
1504         {
1505                 i = 0
1506                 cr.ArvMountPoint = ""
1507                 cr.Container.Mounts = make(map[string]arvados.Mount)
1508                 cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
1509                 cr.Container.OutputPath = "/tmp"
1510                 cr.Container.RuntimeConstraints.API = true
1511                 cr.Container.OutputStorageClasses = []string{"default"}
1512
1513                 bindmounts, err := cr.SetupMounts()
1514                 c.Check(err, IsNil)
1515                 c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
1516                         "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
1517                         "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
1518                 c.Check(bindmounts, DeepEquals, map[string]bindmount{"/tmp": {realTemp + "/tmp2", false}, "/etc/arvados/ca-certificates.crt": {stubCertPath, true}})
1519                 os.RemoveAll(cr.ArvMountPoint)
1520                 cr.CleanupDirs()
1521                 checkEmpty()
1522
1523                 cr.Container.RuntimeConstraints.API = false
1524         }
1525
1526         {
1527                 i = 0
1528                 cr.ArvMountPoint = ""
1529                 cr.Container.Mounts = map[string]arvados.Mount{
1530                         "/keeptmp": {Kind: "collection", Writable: true},
1531                 }
1532                 cr.Container.OutputPath = "/keeptmp"
1533
1534                 os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
1535
1536                 bindmounts, err := cr.SetupMounts()
1537                 c.Check(err, IsNil)
1538                 c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
1539                         "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
1540                         "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
1541                 c.Check(bindmounts, DeepEquals, map[string]bindmount{"/keeptmp": {realTemp + "/keep1/tmp0", false}})
1542                 os.RemoveAll(cr.ArvMountPoint)
1543                 cr.CleanupDirs()
1544                 checkEmpty()
1545         }
1546
1547         {
1548                 i = 0
1549                 cr.ArvMountPoint = ""
1550                 cr.Container.Mounts = map[string]arvados.Mount{
1551                         "/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
1552                         "/keepout": {Kind: "collection", Writable: true},
1553                 }
1554                 cr.Container.OutputPath = "/keepout"
1555
1556                 os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
1557                 os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
1558
1559                 bindmounts, err := cr.SetupMounts()
1560                 c.Check(err, IsNil)
1561                 c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
1562                         "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
1563                         "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
1564                 c.Check(bindmounts, DeepEquals, map[string]bindmount{
1565                         "/keepinp": {realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", true},
1566                         "/keepout": {realTemp + "/keep1/tmp0", false},
1567                 })
1568                 os.RemoveAll(cr.ArvMountPoint)
1569                 cr.CleanupDirs()
1570                 checkEmpty()
1571         }
1572
1573         {
1574                 i = 0
1575                 cr.ArvMountPoint = ""
1576                 cr.Container.RuntimeConstraints.KeepCacheRAM = 512
1577                 cr.Container.Mounts = map[string]arvados.Mount{
1578                         "/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
1579                         "/keepout": {Kind: "collection", Writable: true},
1580                 }
1581                 cr.Container.OutputPath = "/keepout"
1582
1583                 os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
1584                 os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
1585
1586                 bindmounts, err := cr.SetupMounts()
1587                 c.Check(err, IsNil)
1588                 c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
1589                         "--read-write", "--storage-classes", "default", "--crunchstat-interval=5", "--ram-cache",
1590                         "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
1591                 c.Check(bindmounts, DeepEquals, map[string]bindmount{
1592                         "/keepinp": {realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", true},
1593                         "/keepout": {realTemp + "/keep1/tmp0", false},
1594                 })
1595                 os.RemoveAll(cr.ArvMountPoint)
1596                 cr.CleanupDirs()
1597                 checkEmpty()
1598         }
1599
1600         for _, test := range []struct {
1601                 in  interface{}
1602                 out string
1603         }{
1604                 {in: "foo", out: `"foo"`},
1605                 {in: nil, out: `null`},
1606                 {in: map[string]int64{"foo": 123456789123456789}, out: `{"foo":123456789123456789}`},
1607         } {
1608                 i = 0
1609                 cr.ArvMountPoint = ""
1610                 cr.Container.Mounts = map[string]arvados.Mount{
1611                         "/mnt/test.json": {Kind: "json", Content: test.in},
1612                 }
1613                 bindmounts, err := cr.SetupMounts()
1614                 c.Check(err, IsNil)
1615                 c.Check(bindmounts, DeepEquals, map[string]bindmount{
1616                         "/mnt/test.json": {realTemp + "/json2/mountdata.json", true},
1617                 })
1618                 content, err := ioutil.ReadFile(realTemp + "/json2/mountdata.json")
1619                 c.Check(err, IsNil)
1620                 c.Check(content, DeepEquals, []byte(test.out))
1621                 os.RemoveAll(cr.ArvMountPoint)
1622                 cr.CleanupDirs()
1623                 checkEmpty()
1624         }
1625
1626         for _, test := range []struct {
1627                 in  interface{}
1628                 out string
1629         }{
1630                 {in: "foo", out: `foo`},
1631                 {in: nil, out: "error"},
1632                 {in: map[string]int64{"foo": 123456789123456789}, out: "error"},
1633         } {
1634                 i = 0
1635                 cr.ArvMountPoint = ""
1636                 cr.Container.Mounts = map[string]arvados.Mount{
1637                         "/mnt/test.txt": {Kind: "text", Content: test.in},
1638                 }
1639                 bindmounts, err := cr.SetupMounts()
1640                 if test.out == "error" {
1641                         c.Check(err.Error(), Equals, "content for mount \"/mnt/test.txt\" must be a string")
1642                 } else {
1643                         c.Check(err, IsNil)
1644                         c.Check(bindmounts, DeepEquals, map[string]bindmount{
1645                                 "/mnt/test.txt": {realTemp + "/text2/mountdata.text", true},
1646                         })
1647                         content, err := ioutil.ReadFile(realTemp + "/text2/mountdata.text")
1648                         c.Check(err, IsNil)
1649                         c.Check(content, DeepEquals, []byte(test.out))
1650                 }
1651                 os.RemoveAll(cr.ArvMountPoint)
1652                 cr.CleanupDirs()
1653                 checkEmpty()
1654         }
1655
1656         // Read-only mount points are allowed underneath output_dir mount point
1657         {
1658                 i = 0
1659                 cr.ArvMountPoint = ""
1660                 cr.Container.Mounts = make(map[string]arvados.Mount)
1661                 cr.Container.Mounts = map[string]arvados.Mount{
1662                         "/tmp":     {Kind: "tmp"},
1663                         "/tmp/foo": {Kind: "collection"},
1664                 }
1665                 cr.Container.OutputPath = "/tmp"
1666
1667                 os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
1668
1669                 bindmounts, err := cr.SetupMounts()
1670                 c.Check(err, IsNil)
1671                 c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
1672                         "--read-write", "--storage-classes", "default", "--crunchstat-interval=5", "--ram-cache",
1673                         "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
1674                 c.Check(bindmounts, DeepEquals, map[string]bindmount{
1675                         "/tmp":     {realTemp + "/tmp2", false},
1676                         "/tmp/foo": {realTemp + "/keep1/tmp0", true},
1677                 })
1678                 os.RemoveAll(cr.ArvMountPoint)
1679                 cr.CleanupDirs()
1680                 checkEmpty()
1681         }
1682
1683         // Writable mount points copied to output_dir mount point
1684         {
1685                 i = 0
1686                 cr.ArvMountPoint = ""
1687                 cr.Container.Mounts = make(map[string]arvados.Mount)
1688                 cr.Container.Mounts = map[string]arvados.Mount{
1689                         "/tmp": {Kind: "tmp"},
1690                         "/tmp/foo": {Kind: "collection",
1691                                 PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53",
1692                                 Writable:         true},
1693                         "/tmp/bar": {Kind: "collection",
1694                                 PortableDataHash: "59389a8f9ee9d399be35462a0f92541d+53",
1695                                 Path:             "baz",
1696                                 Writable:         true},
1697                 }
1698                 cr.Container.OutputPath = "/tmp"
1699
1700                 os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
1701                 os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm)
1702
1703                 rf, _ := os.Create(realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz/quux")
1704                 rf.Write([]byte("bar"))
1705                 rf.Close()
1706
1707                 _, err := cr.SetupMounts()
1708                 c.Check(err, IsNil)
1709                 _, err = os.Stat(cr.HostOutputDir + "/foo")
1710                 c.Check(err, IsNil)
1711                 _, err = os.Stat(cr.HostOutputDir + "/bar/quux")
1712                 c.Check(err, IsNil)
1713                 os.RemoveAll(cr.ArvMountPoint)
1714                 cr.CleanupDirs()
1715                 checkEmpty()
1716         }
1717
1718         // Only mount points of kind 'collection' are allowed underneath output_dir mount point
1719         {
1720                 i = 0
1721                 cr.ArvMountPoint = ""
1722                 cr.Container.Mounts = make(map[string]arvados.Mount)
1723                 cr.Container.Mounts = map[string]arvados.Mount{
1724                         "/tmp":     {Kind: "tmp"},
1725                         "/tmp/foo": {Kind: "tmp"},
1726                 }
1727                 cr.Container.OutputPath = "/tmp"
1728
1729                 _, err := cr.SetupMounts()
1730                 c.Check(err, NotNil)
1731                 c.Check(err, ErrorMatches, `only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path.*`)
1732                 os.RemoveAll(cr.ArvMountPoint)
1733                 cr.CleanupDirs()
1734                 checkEmpty()
1735         }
1736
1737         // Only mount point of kind 'collection' is allowed for stdin
1738         {
1739                 i = 0
1740                 cr.ArvMountPoint = ""
1741                 cr.Container.Mounts = make(map[string]arvados.Mount)
1742                 cr.Container.Mounts = map[string]arvados.Mount{
1743                         "stdin": {Kind: "tmp"},
1744                 }
1745
1746                 _, err := cr.SetupMounts()
1747                 c.Check(err, NotNil)
1748                 c.Check(err, ErrorMatches, `unsupported mount kind 'tmp' for stdin.*`)
1749                 os.RemoveAll(cr.ArvMountPoint)
1750                 cr.CleanupDirs()
1751                 checkEmpty()
1752         }
1753 }
1754
1755 func (s *TestSuite) TestStdout(c *C) {
1756         helperRecord := `{
1757                 "command": ["/bin/sh", "-c", "echo $FROBIZ"],
1758                 "container_image": "` + arvadostest.DockerImage112PDH + `",
1759                 "cwd": "/bin",
1760                 "environment": {"FROBIZ": "bilbo"},
1761                 "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} },
1762                 "output_path": "/tmp",
1763                 "priority": 1,
1764                 "runtime_constraints": {},
1765                 "state": "Locked"
1766         }`
1767
1768         s.fullRunHelper(c, helperRecord, nil, func() int {
1769                 fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
1770                 return 0
1771         })
1772
1773         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1774         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1775         c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
1776 }
1777
1778 // Used by the TestStdoutWithWrongPath*()
1779 func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func() int) (*ArvTestClient, *ContainerRunner, error) {
1780         err := json.Unmarshal([]byte(record), &s.api.Container)
1781         c.Assert(err, IsNil)
1782         s.executor.runFunc = fn
1783         s.runner.RunArvMount = (&ArvMountCmdLine{}).ArvMountTest
1784         s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
1785                 return s.api, &KeepTestClient{}, nil, nil
1786         }
1787         return s.api, s.runner, s.runner.Run()
1788 }
1789
1790 func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
1791         _, _, err := s.stdoutErrorRunHelper(c, `{
1792     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
1793     "output_path": "/tmp",
1794     "state": "Locked"
1795 }`, func() int { return 0 })
1796         c.Check(err, ErrorMatches, ".*Stdout path does not start with OutputPath.*")
1797 }
1798
1799 func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
1800         _, _, err := s.stdoutErrorRunHelper(c, `{
1801     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
1802     "output_path": "/tmp",
1803     "state": "Locked"
1804 }`, func() int { return 0 })
1805         c.Check(err, ErrorMatches, ".*unsupported mount kind 'tmp' for stdout.*")
1806 }
1807
1808 func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
1809         _, _, err := s.stdoutErrorRunHelper(c, `{
1810     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
1811     "output_path": "/tmp",
1812     "state": "Locked"
1813 }`, func() int { return 0 })
1814         c.Check(err, ErrorMatches, ".*unsupported mount kind 'collection' for stdout.*")
1815 }
1816
1817 func (s *TestSuite) TestFullRunWithAPI(c *C) {
1818         s.fullRunHelper(c, `{
1819     "command": ["/bin/sh", "-c", "true $ARVADOS_API_HOST"],
1820     "container_image": "`+arvadostest.DockerImage112PDH+`",
1821     "cwd": "/bin",
1822     "environment": {},
1823     "mounts": {"/tmp": {"kind": "tmp"} },
1824     "output_path": "/tmp",
1825     "priority": 1,
1826     "runtime_constraints": {"API": true},
1827     "state": "Locked"
1828 }`, nil, func() int {
1829                 c.Check(s.executor.created.Env["ARVADOS_API_HOST"], Equals, os.Getenv("ARVADOS_API_HOST"))
1830                 return 3
1831         })
1832         c.Check(s.api.CalledWith("container.exit_code", 3), NotNil)
1833         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1834         c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*status code 3\n.*`)
1835 }
1836
1837 func (s *TestSuite) TestFullRunSetOutput(c *C) {
1838         defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST"))
1839         os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
1840         s.fullRunHelper(c, `{
1841     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
1842     "container_image": "`+arvadostest.DockerImage112PDH+`",
1843     "cwd": "/bin",
1844     "environment": {},
1845     "mounts": {"/tmp": {"kind": "tmp"} },
1846     "output_path": "/tmp",
1847     "priority": 1,
1848     "runtime_constraints": {"API": true},
1849     "state": "Locked"
1850 }`, nil, func() int {
1851                 s.api.Container.Output = arvadostest.DockerImage112PDH
1852                 return 0
1853         })
1854
1855         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1856         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1857         c.Check(s.api.CalledWith("container.output", arvadostest.DockerImage112PDH), NotNil)
1858 }
1859
1860 func (s *TestSuite) TestArvMountRuntimeStatusWarning(c *C) {
1861         s.runner.RunArvMount = func([]string, string) (*exec.Cmd, error) {
1862                 os.Mkdir(s.runner.ArvMountPoint+"/by_id", 0666)
1863                 ioutil.WriteFile(s.runner.ArvMountPoint+"/by_id/README", nil, 0666)
1864                 return s.runner.ArvMountCmd([]string{"bash", "-c", "echo >&2 Test: Keep write error: I am a teapot; sleep 3"}, "")
1865         }
1866         s.executor.runFunc = func() int {
1867                 time.Sleep(time.Second)
1868                 return 137
1869         }
1870         record := `{
1871     "command": ["sleep", "1"],
1872     "container_image": "` + arvadostest.DockerImage112PDH + `",
1873     "cwd": "/bin",
1874     "environment": {},
1875     "mounts": {"/tmp": {"kind": "tmp"} },
1876     "output_path": "/tmp",
1877     "priority": 1,
1878     "runtime_constraints": {"API": true},
1879     "state": "Locked"
1880 }`
1881         err := json.Unmarshal([]byte(record), &s.api.Container)
1882         c.Assert(err, IsNil)
1883         err = s.runner.Run()
1884         c.Assert(err, IsNil)
1885         c.Check(s.api.CalledWith("container.exit_code", 137), NotNil)
1886         c.Check(s.api.CalledWith("container.runtime_status.warning", "arv-mount: Keep write error"), NotNil)
1887         c.Check(s.api.CalledWith("container.runtime_status.warningDetail", "Test: Keep write error: I am a teapot"), NotNil)
1888         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1889         c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`)
1890 }
1891
1892 func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) {
1893         helperRecord := `{
1894                 "command": ["/bin/sh", "-c", "echo $FROBIZ"],
1895                 "container_image": "` + arvadostest.DockerImage112PDH + `",
1896                 "cwd": "/bin",
1897                 "environment": {"FROBIZ": "bilbo"},
1898                 "mounts": {
1899         "/tmp": {"kind": "tmp"},
1900         "/tmp/foo": {"kind": "collection",
1901                      "portable_data_hash": "a3e8f74c6f101eae01fa08bfb4e49b3a+54",
1902                      "exclude_from_output": true
1903         },
1904         "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
1905     },
1906                 "output_path": "/tmp",
1907                 "priority": 1,
1908                 "runtime_constraints": {},
1909                 "state": "Locked"
1910         }`
1911
1912         extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
1913
1914         s.fullRunHelper(c, helperRecord, extraMounts, func() int {
1915                 fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
1916                 return 0
1917         })
1918
1919         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
1920         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
1921         c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
1922 }
1923
1924 func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
1925         helperRecord := `{
1926                 "command": ["/bin/sh", "-c", "echo $FROBIZ"],
1927                 "container_image": "` + arvadostest.DockerImage112PDH + `",
1928                 "cwd": "/bin",
1929                 "environment": {"FROBIZ": "bilbo"},
1930                 "mounts": {
1931         "/tmp": {"kind": "tmp"},
1932         "/tmp/foo/bar": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367", "path":"/file2_in_main.txt"},
1933         "/tmp/foo/sub1": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367", "path":"/subdir1"},
1934         "/tmp/foo/sub1file2": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367", "path":"/subdir1/file2_in_subdir1.txt"},
1935         "/tmp/foo/baz/sub2file2": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367", "path":"/subdir1/subdir2/file2_in_subdir2.txt"},
1936         "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
1937     },
1938                 "output_path": "/tmp",
1939                 "priority": 1,
1940                 "runtime_constraints": {},
1941                 "state": "Locked",
1942                 "uuid": "zzzzz-dz642-202301130848001"
1943         }`
1944
1945         extraMounts := []string{
1946                 "a0def87f80dd594d4675809e83bd4f15+367/file2_in_main.txt",
1947                 "a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
1948                 "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
1949         }
1950
1951         api, _, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, func() int {
1952                 fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
1953                 return 0
1954         })
1955
1956         c.Check(s.executor.created.BindMounts, DeepEquals, map[string]bindmount{
1957                 "/tmp":                   {realtemp + "/tmp1", false},
1958                 "/tmp/foo/bar":           {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/file2_in_main.txt", true},
1959                 "/tmp/foo/baz/sub2file2": {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt", true},
1960                 "/tmp/foo/sub1":          {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1", true},
1961                 "/tmp/foo/sub1file2":     {s.keepmount + "/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt", true},
1962         })
1963
1964         c.Check(api.CalledWith("container.exit_code", 0), NotNil)
1965         c.Check(api.CalledWith("container.state", "Complete"), NotNil)
1966         output_count := uint(0)
1967         for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content {
1968                 if v["collection"] == nil {
1969                         continue
1970                 }
1971                 collection := v["collection"].(arvadosclient.Dict)
1972                 if collection["name"].(string) != "output for zzzzz-dz642-202301130848001" {
1973                         continue
1974                 }
1975                 c.Check(v["ensure_unique_name"], Equals, true)
1976                 c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
1977 ./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:bar 36:18:sub1file2
1978 ./foo/baz 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 9:18:sub2file2
1979 ./foo/sub1 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
1980 ./foo/sub1/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
1981 `)
1982                 output_count++
1983         }
1984         c.Check(output_count, Not(Equals), uint(0))
1985 }
1986
1987 func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(c *C) {
1988         helperRecord := `{
1989                 "command": ["/bin/sh", "-c", "echo $FROBIZ"],
1990                 "container_image": "` + arvadostest.DockerImage112PDH + `",
1991                 "cwd": "/bin",
1992                 "environment": {"FROBIZ": "bilbo"},
1993                 "mounts": {
1994         "/tmp": {"kind": "tmp"},
1995         "/tmp/foo/bar": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/subdir1/file2_in_subdir1.txt"},
1996         "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
1997     },
1998                 "output_path": "/tmp",
1999                 "priority": 1,
2000                 "runtime_constraints": {},
2001                 "state": "Locked",
2002                 "uuid": "zzzzz-dz642-202301130848002"
2003         }`
2004
2005         extraMounts := []string{
2006                 "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
2007         }
2008
2009         s.fullRunHelper(c, helperRecord, extraMounts, func() int {
2010                 fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
2011                 return 0
2012         })
2013
2014         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
2015         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
2016         output_count := uint(0)
2017         for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content {
2018                 if v["collection"] == nil {
2019                         continue
2020                 }
2021                 collection := v["collection"].(arvadosclient.Dict)
2022                 if collection["name"].(string) != "output for zzzzz-dz642-202301130848002" {
2023                         continue
2024                 }
2025                 c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
2026 ./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 10:17:bar
2027 `)
2028                 output_count++
2029         }
2030         c.Check(output_count, Not(Equals), uint(0))
2031 }
2032
2033 func (s *TestSuite) TestOutputError(c *C) {
2034         helperRecord := `{
2035                 "command": ["/bin/sh", "-c", "echo $FROBIZ"],
2036                 "container_image": "` + arvadostest.DockerImage112PDH + `",
2037                 "cwd": "/bin",
2038                 "environment": {"FROBIZ": "bilbo"},
2039                 "mounts": {
2040                         "/tmp": {"kind": "tmp"}
2041                 },
2042                 "output_path": "/tmp",
2043                 "priority": 1,
2044                 "runtime_constraints": {},
2045                 "state": "Locked"
2046         }`
2047         s.fullRunHelper(c, helperRecord, nil, func() int {
2048                 os.Symlink("/etc/hosts", s.runner.HostOutputDir+"/baz")
2049                 return 0
2050         })
2051
2052         c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
2053 }
2054
2055 func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
2056         helperRecord := `{
2057                 "command": ["/bin/sh", "-c", "echo $FROBIZ"],
2058                 "container_image": "` + arvadostest.DockerImage112PDH + `",
2059                 "cwd": "/bin",
2060                 "environment": {"FROBIZ": "bilbo"},
2061                 "mounts": {
2062         "/tmp": {"kind": "tmp"},
2063         "stdin": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/file1_in_main.txt"},
2064         "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
2065     },
2066                 "output_path": "/tmp",
2067                 "priority": 1,
2068                 "runtime_constraints": {},
2069                 "state": "Locked"
2070         }`
2071
2072         extraMounts := []string{
2073                 "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
2074         }
2075
2076         api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, func() int {
2077                 fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
2078                 return 0
2079         })
2080
2081         c.Check(api.CalledWith("container.exit_code", 0), NotNil)
2082         c.Check(api.CalledWith("container.state", "Complete"), NotNil)
2083         for _, v := range api.Content {
2084                 if v["collection"] != nil {
2085                         collection := v["collection"].(arvadosclient.Dict)
2086                         if strings.Index(collection["name"].(string), "output") == 0 {
2087                                 manifest := collection["manifest_text"].(string)
2088                                 c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
2089 `)
2090                         }
2091                 }
2092         }
2093 }
2094
2095 func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
2096         helperRecord := `{
2097                 "command": ["/bin/sh", "-c", "echo $FROBIZ"],
2098                 "container_image": "` + arvadostest.DockerImage112PDH + `",
2099                 "cwd": "/bin",
2100                 "environment": {"FROBIZ": "bilbo"},
2101                 "mounts": {
2102         "/tmp": {"kind": "tmp"},
2103         "stdin": {"kind": "json", "content": "foo"},
2104         "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
2105     },
2106                 "output_path": "/tmp",
2107                 "priority": 1,
2108                 "runtime_constraints": {},
2109                 "state": "Locked"
2110         }`
2111
2112         api, _, _ := s.fullRunHelper(c, helperRecord, nil, func() int {
2113                 fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
2114                 return 0
2115         })
2116
2117         c.Check(api.CalledWith("container.exit_code", 0), NotNil)
2118         c.Check(api.CalledWith("container.state", "Complete"), NotNil)
2119         for _, v := range api.Content {
2120                 if v["collection"] != nil {
2121                         collection := v["collection"].(arvadosclient.Dict)
2122                         if strings.Index(collection["name"].(string), "output") == 0 {
2123                                 manifest := collection["manifest_text"].(string)
2124                                 c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
2125 `)
2126                         }
2127                 }
2128         }
2129 }
2130
2131 func (s *TestSuite) TestStderrMount(c *C) {
2132         api, cr, _ := s.fullRunHelper(c, `{
2133     "command": ["/bin/sh", "-c", "echo hello;exit 1"],
2134     "container_image": "`+arvadostest.DockerImage112PDH+`",
2135     "cwd": ".",
2136     "environment": {},
2137     "mounts": {"/tmp": {"kind": "tmp"},
2138                "stdout": {"kind": "file", "path": "/tmp/a/out.txt"},
2139                "stderr": {"kind": "file", "path": "/tmp/b/err.txt"}},
2140     "output_path": "/tmp",
2141     "priority": 1,
2142     "runtime_constraints": {},
2143     "state": "Locked"
2144 }`, nil, func() int {
2145                 fmt.Fprintln(s.executor.created.Stdout, "hello")
2146                 fmt.Fprintln(s.executor.created.Stderr, "oops")
2147                 return 1
2148         })
2149
2150         final := api.CalledWith("container.state", "Complete")
2151         c.Assert(final, NotNil)
2152         c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
2153         c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
2154
2155         c.Check(cr.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
2156 }
2157
2158 func (s *TestSuite) TestNumberRoundTrip(c *C) {
2159         s.api.callraw = true
2160         err := s.runner.fetchContainerRecord()
2161         c.Assert(err, IsNil)
2162         jsondata, err := json.Marshal(s.runner.Container.Mounts["/json"].Content)
2163         c.Logf("%#v", s.runner.Container)
2164         c.Check(err, IsNil)
2165         c.Check(string(jsondata), Equals, `{"number":123456789123456789}`)
2166 }
2167
2168 func (s *TestSuite) TestFullBrokenDocker(c *C) {
2169         nextState := ""
2170         for _, setup := range []func(){
2171                 func() {
2172                         c.Log("// waitErr = ocl runtime error")
2173                         s.executor.waitErr = errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
2174                         nextState = "Cancelled"
2175                 },
2176                 func() {
2177                         c.Log("// loadErr = cannot connect")
2178                         s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
2179                         s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook"
2180                         err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
2181                         c.Assert(err, IsNil)
2182                         nextState = "Queued"
2183                 },
2184         } {
2185                 s.SetUpTest(c)
2186                 setup()
2187                 s.fullRunHelper(c, `{
2188     "command": ["echo", "hello world"],
2189     "container_image": "`+arvadostest.DockerImage112PDH+`",
2190     "cwd": ".",
2191     "environment": {},
2192     "mounts": {"/tmp": {"kind": "tmp"} },
2193     "output_path": "/tmp",
2194     "priority": 1,
2195     "runtime_constraints": {},
2196     "state": "Locked"
2197 }`, nil, func() int { return 0 })
2198                 c.Check(s.api.CalledWith("container.state", nextState), NotNil)
2199                 c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
2200                 if s.runner.brokenNodeHook != "" {
2201                         c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
2202                         c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
2203                         c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
2204                 } else {
2205                         c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
2206                 }
2207         }
2208 }
2209
2210 func (s *TestSuite) TestBadCommand(c *C) {
2211         for _, startError := range []string{
2212                 `panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`,
2213                 `Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`,
2214                 `Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`,
2215         } {
2216                 s.SetUpTest(c)
2217                 s.executor.startErr = errors.New(startError)
2218                 s.fullRunHelper(c, `{
2219     "command": ["echo", "hello world"],
2220     "container_image": "`+arvadostest.DockerImage112PDH+`",
2221     "cwd": ".",
2222     "environment": {},
2223     "mounts": {"/tmp": {"kind": "tmp"} },
2224     "output_path": "/tmp",
2225     "priority": 1,
2226     "runtime_constraints": {},
2227     "state": "Locked"
2228 }`, nil, func() int { return 0 })
2229                 c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
2230                 c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
2231         }
2232 }
2233
2234 func (s *TestSuite) TestSecretTextMountPoint(c *C) {
2235         helperRecord := `{
2236                 "command": ["true"],
2237                 "container_image": "` + arvadostest.DockerImage112PDH + `",
2238                 "cwd": "/bin",
2239                 "mounts": {
2240                     "/tmp": {"kind": "tmp"},
2241                     "/tmp/secret.conf": {"kind": "text", "content": "mypassword"}
2242                 },
2243                 "secret_mounts": {
2244                 },
2245                 "output_path": "/tmp",
2246                 "priority": 1,
2247                 "runtime_constraints": {},
2248                 "state": "Locked"
2249         }`
2250
2251         s.fullRunHelper(c, helperRecord, nil, func() int {
2252                 content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
2253                 c.Check(err, IsNil)
2254                 c.Check(string(content), Equals, "mypassword")
2255                 return 0
2256         })
2257
2258         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
2259         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
2260         c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), NotNil)
2261         c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), IsNil)
2262
2263         // under secret mounts, not captured in output
2264         helperRecord = `{
2265                 "command": ["true"],
2266                 "container_image": "` + arvadostest.DockerImage112PDH + `",
2267                 "cwd": "/bin",
2268                 "mounts": {
2269                     "/tmp": {"kind": "tmp"}
2270                 },
2271                 "secret_mounts": {
2272                     "/tmp/secret.conf": {"kind": "text", "content": "mypassword"}
2273                 },
2274                 "output_path": "/tmp",
2275                 "priority": 1,
2276                 "runtime_constraints": {},
2277                 "state": "Locked"
2278         }`
2279
2280         s.SetUpTest(c)
2281         s.fullRunHelper(c, helperRecord, nil, func() int {
2282                 content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
2283                 c.Check(err, IsNil)
2284                 c.Check(string(content), Equals, "mypassword")
2285                 return 0
2286         })
2287
2288         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
2289         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
2290         c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil)
2291         c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), NotNil)
2292
2293         // under secret mounts, output dir is a collection, not captured in output
2294         helperRecord = `{
2295                 "command": ["true"],
2296                 "container_image": "` + arvadostest.DockerImage112PDH + `",
2297                 "cwd": "/bin",
2298                 "mounts": {
2299                     "/tmp": {"kind": "collection", "writable": true}
2300                 },
2301                 "secret_mounts": {
2302                     "/tmp/secret.conf": {"kind": "text", "content": "mypassword"}
2303                 },
2304                 "output_path": "/tmp",
2305                 "priority": 1,
2306                 "runtime_constraints": {},
2307                 "state": "Locked"
2308         }`
2309
2310         s.SetUpTest(c)
2311         _, _, realtemp := s.fullRunHelper(c, helperRecord, nil, func() int {
2312                 // secret.conf should be provisioned as a separate
2313                 // bind mount, i.e., it should not appear in the
2314                 // (fake) fuse filesystem as viewed from the host.
2315                 content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
2316                 if !c.Check(errors.Is(err, os.ErrNotExist), Equals, true) {
2317                         c.Logf("secret.conf: content %q, err %#v", content, err)
2318                 }
2319                 err = ioutil.WriteFile(s.runner.HostOutputDir+"/.arvados#collection", []byte(`{"manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"}`), 0700)
2320                 c.Check(err, IsNil)
2321                 return 0
2322         })
2323
2324         content, err := ioutil.ReadFile(realtemp + "/text1/mountdata.text")
2325         c.Check(err, IsNil)
2326         c.Check(string(content), Equals, "mypassword")
2327         c.Check(s.executor.created.BindMounts["/tmp/secret.conf"], DeepEquals, bindmount{realtemp + "/text1/mountdata.text", true})
2328         c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
2329         c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
2330         c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"), NotNil)
2331 }
2332
2333 func (s *TestSuite) TestCalculateCost(c *C) {
2334         defer func(s string) { lockdir = s }(lockdir)
2335         lockdir = c.MkDir()
2336         now := time.Now()
2337         cr := s.runner
2338         cr.costStartTime = now.Add(-time.Hour)
2339         var logbuf bytes.Buffer
2340         cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
2341
2342         // if there's no InstanceType env var, cost is calculated as 0
2343         os.Unsetenv("InstanceType")
2344         cost := cr.calculateCost(now)
2345         c.Check(cost, Equals, 0.0)
2346
2347         // with InstanceType env var and loadPrices() hasn't run (or
2348         // hasn't found any data), cost is calculated based on
2349         // InstanceType env var
2350         os.Setenv("InstanceType", `{"Price":1.2}`)
2351         cost = cr.calculateCost(now)
2352         c.Check(cost, Equals, 1.2)
2353
2354         // first update tells us the spot price was $1/h until 30
2355         // minutes ago when it increased to $2/h
2356         j, err := json.Marshal([]cloud.InstancePrice{
2357                 {StartTime: now.Add(-4 * time.Hour), Price: 1.0},
2358                 {StartTime: now.Add(-time.Hour / 2), Price: 2.0},
2359         })
2360         c.Assert(err, IsNil)
2361         os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
2362         cr.loadPrices()
2363         cost = cr.calculateCost(now)
2364         c.Check(cost, Equals, 1.5)
2365
2366         // next update (via --list + SIGUSR2) tells us the spot price
2367         // increased to $3/h 15 minutes ago
2368         j, err = json.Marshal([]cloud.InstancePrice{
2369                 {StartTime: now.Add(-time.Hour / 3), Price: 2.0}, // dup of -time.Hour/2 price
2370                 {StartTime: now.Add(-time.Hour / 4), Price: 3.0},
2371         })
2372         c.Assert(err, IsNil)
2373         os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
2374         cr.loadPrices()
2375         cost = cr.calculateCost(now)
2376         c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
2377
2378         cost = cr.calculateCost(now.Add(-time.Hour / 2))
2379         c.Check(cost, Equals, 0.5)
2380
2381         c.Logf("%s", logbuf.String())
2382         c.Check(logbuf.String(), Matches, `(?ms).*Instance price changed to 1\.00 at 20.* changed to 2\.00 .* changed to 3\.00 .*`)
2383         c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
2384 }
2385
2386 func (s *TestSuite) TestSIGUSR2CostUpdate(c *C) {
2387         pid := os.Getpid()
2388         now := time.Now()
2389         pricesJSON, err := json.Marshal([]cloud.InstancePrice{
2390                 {StartTime: now.Add(-4 * time.Hour), Price: 2.4},
2391                 {StartTime: now.Add(-2 * time.Hour), Price: 2.6},
2392         })
2393         c.Assert(err, IsNil)
2394
2395         os.Setenv("InstanceType", `{"Price":2.2}`)
2396         defer func(s string) { lockdir = s }(lockdir)
2397         lockdir = c.MkDir()
2398
2399         // We can't use s.api.CalledWith because timing differences will yield
2400         // different cost values across runs. getCostUpdate iterates over API
2401         // calls until it finds one that sets the cost, then writes that value
2402         // to the next index of costUpdates.
2403         deadline := now.Add(time.Second)
2404         costUpdates := make([]float64, 2)
2405         costIndex := 0
2406         apiIndex := 0
2407         getCostUpdate := func() {
2408                 for ; time.Now().Before(deadline); time.Sleep(time.Second / 10) {
2409                         for apiIndex < len(s.api.Content) {
2410                                 update := s.api.Content[apiIndex]
2411                                 apiIndex++
2412                                 var ok bool
2413                                 var cost float64
2414                                 if update, ok = update["container"].(arvadosclient.Dict); !ok {
2415                                         continue
2416                                 }
2417                                 if cost, ok = update["cost"].(float64); !ok {
2418                                         continue
2419                                 }
2420                                 c.Logf("API call #%d updates cost to %v", apiIndex-1, cost)
2421                                 costUpdates[costIndex] = cost
2422                                 costIndex++
2423                                 return
2424                         }
2425                 }
2426         }
2427
2428         s.fullRunHelper(c, `{
2429                 "command": ["true"],
2430                 "container_image": "`+arvadostest.DockerImage112PDH+`",
2431                 "cwd": ".",
2432                 "environment": {},
2433                 "mounts": {"/tmp": {"kind": "tmp"} },
2434                 "output_path": "/tmp",
2435                 "priority": 1,
2436                 "runtime_constraints": {},
2437                 "state": "Locked",
2438                 "uuid": "zzzzz-dz642-20230320101530a"
2439         }`, nil, func() int {
2440                 s.runner.costStartTime = now.Add(-3 * time.Hour)
2441                 err := syscall.Kill(pid, syscall.SIGUSR2)
2442                 c.Check(err, IsNil, Commentf("error sending first SIGUSR2 to runner"))
2443                 getCostUpdate()
2444
2445                 err = os.WriteFile(path.Join(lockdir, pricesfile), pricesJSON, 0o700)
2446                 c.Check(err, IsNil, Commentf("error writing JSON prices file"))
2447                 err = syscall.Kill(pid, syscall.SIGUSR2)
2448                 c.Check(err, IsNil, Commentf("error sending second SIGUSR2 to runner"))
2449                 getCostUpdate()
2450
2451                 return 0
2452         })
2453         // Comparing with format strings makes it easy to ignore minor variations
2454         // in cost across runs while keeping diagnostics pretty.
2455         c.Check(fmt.Sprintf("%.3f", costUpdates[0]), Equals, "6.600")
2456         c.Check(fmt.Sprintf("%.3f", costUpdates[1]), Equals, "7.600")
2457 }
2458
2459 type FakeProcess struct {
2460         cmdLine []string
2461 }
2462
2463 func (fp FakeProcess) CmdlineSlice() ([]string, error) {
2464         return fp.cmdLine, nil
2465 }