+
+// Return a suitable local interface address for a local keepstore
+// service. Currently this is the numerically lowest non-loopback ipv4
+// address assigned to a local interface that is not in any of the
+// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8.
+func localKeepstoreAddr() string {
+ var ips []net.IP
+ // Ignore error (proceed with zero IPs)
+ addrs, _ := processIPs(os.Getpid())
+ for addr := range addrs {
+ ip := net.ParseIP(addr)
+ if ip == nil {
+ // invalid
+ continue
+ }
+ if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) ||
+ ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) ||
+ ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) {
+ // unsuitable
+ continue
+ }
+ ips = append(ips, ip)
+ }
+ if len(ips) == 0 {
+ return "0.0.0.0"
+ }
+ sort.Slice(ips, func(ii, jj int) bool {
+ i, j := ips[ii], ips[jj]
+ if len(i) != len(j) {
+ return len(i) < len(j)
+ }
+ for x := range i {
+ if i[x] != j[x] {
+ return i[x] < j[x]
+ }
+ }
+ return false
+ })
+ return ips[0].String()
+}
+
+func (cr *ContainerRunner) loadPrices() {
+ buf, err := os.ReadFile(filepath.Join(lockdir, pricesfile))
+ if err != nil {
+ if !os.IsNotExist(err) {
+ cr.CrunchLog.Printf("loadPrices: read: %s", err)
+ }
+ return
+ }
+ var prices []cloud.InstancePrice
+ err = json.Unmarshal(buf, &prices)
+ if err != nil {
+ cr.CrunchLog.Printf("loadPrices: decode: %s", err)
+ return
+ }
+ cr.pricesLock.Lock()
+ defer cr.pricesLock.Unlock()
+ var lastKnown time.Time
+ if len(cr.prices) > 0 {
+ lastKnown = cr.prices[0].StartTime
+ }
+ cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...))
+ for i := len(cr.prices) - 1; i >= 0; i-- {
+ price := cr.prices[i]
+ if price.StartTime.After(lastKnown) {
+ cr.CrunchLog.Printf("Instance price changed to %#.3g at %s", price.Price, price.StartTime.UTC())
+ }
+ }
+}
+
+func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
+ cr.pricesLock.Lock()
+ defer cr.pricesLock.Unlock()
+
+ // First, make a "prices" slice with the real data as far back
+ // as it goes, and (if needed) a "since the beginning of time"
+ // placeholder containing a reasonable guess about what the
+ // price was between cr.costStartTime and the earliest real
+ // data point.
+ prices := cr.prices
+ if len(prices) == 0 {
+ // use price info in InstanceType record initially
+ // provided by cloud dispatcher
+ var p float64
+ var it arvados.InstanceType
+ if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+ p = it.Price
+ }
+ prices = []cloud.InstancePrice{{Price: p}}
+ } else if prices[len(prices)-1].StartTime.After(cr.costStartTime) {
+ // guess earlier pricing was the same as the earliest
+ // price we know about
+ filler := prices[len(prices)-1]
+ filler.StartTime = time.Time{}
+ prices = append(prices, filler)
+ }
+
+ // Now that our history of price changes goes back at least as
+ // far as cr.costStartTime, add up the costs for each
+ // interval.
+ cost := 0.0
+ spanEnd := now
+ for _, ip := range prices {
+ spanStart := ip.StartTime
+ if spanStart.After(now) {
+ // pricing information from the future -- not
+ // expected from AWS, but possible in
+ // principle, and exercised by tests.
+ continue
+ }
+ last := false
+ if spanStart.Before(cr.costStartTime) {
+ spanStart = cr.costStartTime
+ last = true
+ }
+ cost += ip.Price * spanEnd.Sub(spanStart).Seconds() / 3600
+ if last {
+ break
+ }
+ spanEnd = spanStart
+ }
+
+ return cost
+}
+
+func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
+ for range sigchan {
+ runner.loadPrices()
+ update := arvadosclient.Dict{
+ "select": []string{"uuid"},
+ "container": arvadosclient.Dict{
+ "cost": runner.calculateCost(time.Now()),
+ },
+ }
+ runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil)
+ }
+}