Merge branch '9374-go-sdk'
[arvados.git] / sdk / go / dispatch / dispatch.go
index 54d596fee86ed400d595432fd7e75d72b0c086e7..ce536de47a07be129d7f9e1f9ed730bdf30f6918 100644 (file)
@@ -4,6 +4,7 @@
 package dispatch
 
 import (
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "log"
        "os"
@@ -13,39 +14,14 @@ import (
        "time"
 )
 
-// Constants for container states
 const (
-       Queued    = "Queued"
-       Locked    = "Locked"
-       Running   = "Running"
-       Complete  = "Complete"
-       Cancelled = "Cancelled"
+       Queued    = arvados.ContainerStateQueued
+       Locked    = arvados.ContainerStateLocked
+       Running   = arvados.ContainerStateRunning
+       Complete  = arvados.ContainerStateComplete
+       Cancelled = arvados.ContainerStateCancelled
 )
 
-type apiClientAuthorization struct {
-       UUID     string `json:"uuid"`
-       APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
-       Items []apiClientAuthorization `json:"items"`
-}
-
-// Represents an Arvados container record
-type Container struct {
-       UUID               string           `json:"uuid"`
-       State              string           `json:"state"`
-       Priority           int              `json:"priority"`
-       RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
-       LockedByUUID       string           `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-       Items          []Container `json:"items"`
-       ItemsAvailable int         `json:"items_available"`
-}
-
 // Dispatcher holds the state of the dispatcher
 type Dispatcher struct {
        // The Arvados client
@@ -63,7 +39,7 @@ type Dispatcher struct {
        // handled by this dispatcher and the goroutine should terminate.  The
        // goroutine is responsible for draining the 'status' channel, failure
        // to do so may deadlock the dispatcher.
-       RunContainer func(*Dispatcher, Container, chan Container)
+       RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
 
        // Amount of time to wait between polling for updates.
        PollInterval time.Duration
@@ -72,22 +48,22 @@ type Dispatcher struct {
        DoneProcessing chan struct{}
 
        mineMutex  sync.Mutex
-       mineMap    map[string]chan Container
-       Auth       apiClientAuthorization
-       containers chan Container
+       mineMap    map[string]chan arvados.Container
+       Auth       arvados.APIClientAuthorization
+       containers chan arvados.Container
 }
 
 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
 // for which this process is actively starting/monitoring.  Returns channel to
 // be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
        dispatcher.mineMutex.Lock()
        defer dispatcher.mineMutex.Unlock()
        if ch, ok := dispatcher.mineMap[uuid]; ok {
                return ch
        }
 
-       ch := make(chan Container)
+       ch := make(chan arvados.Container)
        dispatcher.mineMap[uuid] = ch
        return ch
 }
@@ -102,10 +78,10 @@ func (dispatcher *Dispatcher) notMine(uuid string) {
        }
 }
 
-// checkMine returns true/false if there is a channel for updates associated
+// checkMine returns true if there is a channel for updates associated
 // with container c.  If update is true, also send the container record on
 // the channel.
-func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
+func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
        dispatcher.mineMutex.Lock()
        defer dispatcher.mineMutex.Unlock()
        ch, ok := dispatcher.mineMap[c.UUID]
@@ -119,7 +95,7 @@ func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
 }
 
 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
-       var containers ContainerList
+       var containers arvados.ContainerList
        err := dispatcher.Arv.List("containers", params, &containers)
        if err != nil {
                log.Printf("Error getting list of containers: %q", err)
@@ -175,7 +151,7 @@ func (dispatcher *Dispatcher) pollContainers() {
        }
 }
 
-func (dispatcher *Dispatcher) handleUpdate(container Container) {
+func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
        if container.State == Queued && dispatcher.checkMine(container, false) {
                // If we previously started the job, something failed, and it
                // was re-queued, this dispatcher might still be monitoring it.
@@ -216,7 +192,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
 }
 
 // UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
        err := dispatcher.Arv.Update("containers", uuid,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": newState}},
@@ -237,8 +213,8 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) {
                return
        }
 
-       dispatcher.mineMap = make(map[string]chan Container)
-       dispatcher.containers = make(chan Container)
+       dispatcher.mineMap = make(map[string]chan arvados.Container)
+       dispatcher.containers = make(chan arvados.Container)
 
        // Graceful shutdown on signal
        sigChan := make(chan os.Signal)