3761: update GetContent function to use read_content test data info.
[arvados.git] / services / keepstore / keepstore.go
index 2437638cff8aa2bc9491a8af4ed86e4605f07ff8..1e8c3d1e0fbd62f9dea1a2efce54a0a73b911a41 100644 (file)
@@ -13,6 +13,8 @@ import (
        "strings"
        "syscall"
        "time"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
 // ======================
@@ -91,12 +93,16 @@ func (e *KeepError) Error() string {
 // Initialized by the --volumes flag (or by FindKeepVolumes).
 var KeepVM VolumeManager
 
-// The pull list queue is a singleton pull list (a list of blocks
-// that the current keepstore process should be pulling from remote
-// keepstore servers in order to increase data replication) with
-// atomic update methods that are safe to use from multiple
-// goroutines.
+// The pull list manager and trash queue are threadsafe queues which
+// support atomic update operations. The PullHandler and TrashHandler
+// store results from Data Manager /pull and /trash requests here.
+//
+// See the Keep and Data Manager design documents for more details:
+// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
+// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
+//
 var pullq *WorkQueue
+var trashq *WorkQueue
 
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing
@@ -258,9 +264,11 @@ func main() {
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(goodvols)
 
-       // Tell the built-in HTTP server to direct all requests to the REST
-       // router.
-       http.Handle("/", MakeRESTRouter())
+       // Tell the built-in HTTP server to direct all requests to the REST router.
+       loggingRouter := MakeLoggingRESTRouter()
+       http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
+               loggingRouter.ServeHTTP(resp, req)
+       })
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", listen)
@@ -268,6 +276,20 @@ func main() {
                log.Fatal(err)
        }
 
+       // Initialize Pull queue and worker
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               log.Fatalf("Error setting up arvados client %s", err.Error())
+       }
+
+       keepClient, err := keepclient.MakeKeepClient(&arv)
+       if err != nil {
+               log.Fatalf("Error setting up keep client %s", err.Error())
+       }
+
+       pullq = NewWorkQueue()
+       go RunPullWorker(pullq, keepClient)
+
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)