7492: cleanup error checking in keepproxy
[arvados.git] / services / datamanager / experimental / datamanager.py
index 4bd2a264fe29b348e7ea1cdd88ac4cfe606619f5..8207bdcd5cb77b232b5fa582d9b487f08a98251b 100755 (executable)
@@ -4,6 +4,8 @@ import arvados
 
 import argparse
 import cgi
+import csv
+import json
 import logging
 import math
 import pprint
@@ -26,6 +28,14 @@ def fileSizeFormat(value):
   return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
                          byteunits[exponent])
 
+def percentageFloor(x):
+  """ Returns a float which is the input rounded down to the neared 0.01.
+
+e.g. precentageFloor(0.941354) = 0.94
+"""
+  return math.floor(x*100) / 100.0
+
+
 def byteSizeFromValidUuid(valid_uuid):
   return int(valid_uuid.split('+')[1])
 
@@ -325,14 +335,15 @@ def logUserStorageUsage():
     # the object_type field since we don't know which we have.
     body['object_uuid'] = user
     body['event_type'] = args.user_storage_log_event_type
-    info = {}
-    info['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
-    info['read_collections_weighted_bytes'] = usage[WEIGHTED_READ_SIZE_COL]
-    info['persisted_collections_total_bytes'] = (
+    properties = {}
+    properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
+    properties['read_collections_weighted_bytes'] = (
+      usage[WEIGHTED_READ_SIZE_COL])
+    properties['persisted_collections_total_bytes'] = (
       usage[UNWEIGHTED_PERSIST_SIZE_COL])
-    info['persisted_collections_weighted_bytes'] = (
+    properties['persisted_collections_weighted_bytes'] = (
       usage[WEIGHTED_PERSIST_SIZE_COL])
-    body['properties'] = info
+    body['properties'] = properties
     # TODO(misha): Confirm that this will throw an exception if it
     # fails to create the log entry.
     arv.logs().create(body=body).execute()
@@ -347,11 +358,37 @@ def getKeepBlocks(keep_servers):
   blocks = []
   for host,port in keep_servers:
     response = urllib2.urlopen('http://%s:%d/index' % (host, port))
-    blocks.append([line.split(' ')
-                   for line in response.read().split('\n')
-                   if line])
+    server_blocks = [line.split(' ')
+                     for line in response.read().split('\n')
+                     if line]
+    server_blocks = [(block_id, int(mtime))
+                     for block_id, mtime in server_blocks]
+    blocks.append(server_blocks)
   return blocks
 
+def getKeepStats(keep_servers):
+  MOUNT_COLUMN = 5
+  TOTAL_COLUMN = 1
+  FREE_COLUMN = 3
+  DISK_BLOCK_SIZE = 1024
+  stats = []
+  for host,port in keep_servers:
+    response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
+
+    parsed_json = json.load(response)
+    df_entries = [line.split()
+                  for line in parsed_json['df'].split('\n')
+                  if line]
+    keep_volumes = [columns
+                    for columns in df_entries
+                    if 'keep' in columns[MOUNT_COLUMN]]
+    total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
+                                                  keep_volumes)))
+    free_space =  DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
+                                                  keep_volumes)))
+    stats.append([total_space, free_space])
+  return stats
+
 
 def computeReplication(keep_blocks):
   for server_blocks in keep_blocks:
@@ -376,15 +413,54 @@ def computeGarbageCollectionCandidates():
   for block,mtime in garbage_collection_priority:
     disk_size = blockDiskUsage(block)
     cumulative_disk_size += disk_size
-    garbage_collection_report.append((block,
-                                      mtime,
-                                      disk_size,
-                                      cumulative_disk_size))
+    garbage_collection_report.append(
+      (block,
+       mtime,
+       disk_size,
+       cumulative_disk_size,
+       float(free_keep_space + cumulative_disk_size)/total_keep_space))
 
   print 'The oldest Garbage Collection Candidates: '
   pprint.pprint(garbage_collection_report[:20])
 
 
+def outputGarbageCollectionReport(filename):
+  with open(filename, 'wb') as csvfile:
+    gcwriter = csv.writer(csvfile)
+    gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
+                       'cumulative size', 'disk free'])
+    for line in garbage_collection_report:
+      gcwriter.writerow(line)
+
+def computeGarbageCollectionHistogram():
+  # TODO(misha): Modify this to allow users to specify the number of
+  # histogram buckets through a flag.
+  histogram = []
+  last_percentage = -1
+  for _,mtime,_,_,disk_free in garbage_collection_report:
+    curr_percentage = percentageFloor(disk_free)
+    if curr_percentage > last_percentage:
+      histogram.append( (mtime, curr_percentage) )
+    last_percentage = curr_percentage
+
+  log.info('Garbage collection histogram is: %s', histogram)
+
+  return histogram
+
+
+def logGarbageCollectionHistogram():
+  body = {}
+  # TODO(misha): Decide whether we should specify an object_uuid in
+  # the body and if so, which uuid to use.
+  body['event_type'] = args.block_age_free_space_histogram_log_event_type
+  properties = {}
+  properties['histogram'] = garbage_collection_histogram
+  body['properties'] = properties
+  # TODO(misha): Confirm that this will throw an exception if it
+  # fails to create the log entry.
+  arv.logs().create(body=body).execute()
+
+
 def detectReplicationProblems():
   blocks_not_in_any_collections.update(
     set(block_to_replication.keys()).difference(block_to_collections.keys()))
@@ -465,6 +541,14 @@ parser.add_argument('--user-storage-log-event-type',
                     default='user-storage-report',
                     help=('The event type to set when logging user '
                           'storage usage to workbench.'))
+parser.add_argument('--block-age-free-space-histogram-log-event-type',
+                    default='block-age-free-space-histogram',
+                    help=('The event type to set when logging user '
+                          'storage usage to workbench.'))
+parser.add_argument('--garbage-collection-file',
+                    default='',
+                    help=('The file to write a garbage collection report, or '
+                          'leave empty for no report.'))
 
 args = None
 
@@ -498,6 +582,10 @@ user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
 
 keep_servers = []
 keep_blocks = []
+keep_stats = []
+total_keep_space = 0
+free_keep_space =  0
+
 block_to_replication = defaultdict(lambda: 0)
 block_to_latest_mtime = maxdict()
 
@@ -513,9 +601,19 @@ cumulative size)
 multiplied by current replication level)
 * cumulative disk size: The sum of this block's disk size and all the
 blocks listed above it
-* TODO: disk free: The proportion of our disk space that would be free
-if we deleted this block and all the above. So this is (current disk
-space used - cumulative disk size) / total disk capacity
+* disk free: The proportion of our disk space that would be free if we
+deleted this block and all the above. So this is (free disk space +
+cumulative disk size) / total disk capacity
+"""
+
+garbage_collection_histogram = []
+""" Shows the tradeoff of keep block age vs keep disk free space.
+
+Each entry is of the form (mtime, Disk Proportion).
+
+An entry of the form (1388747781, 0.52) means that if we deleted the
+oldest non-presisted blocks until we had 52% of the disk free, then
+all blocks with an mtime greater than 1388747781 would be preserved.
 """
 
 # Stuff to report on
@@ -559,14 +657,42 @@ def loadAllData():
   global keep_blocks
   keep_blocks = getKeepBlocks(keep_servers)
 
-  computeReplication(keep_blocks)
+  log.info('Getting Stats from each Keep Server.')
+  global keep_stats, total_keep_space, free_keep_space
+  keep_stats = getKeepStats(keep_servers)
 
-  computeGarbageCollectionCandidates()
+  total_keep_space = sum(map(itemgetter(0), keep_stats))
+  free_keep_space = sum(map(itemgetter(1), keep_stats))
+
+  # TODO(misha): Delete this hack when the keep servers are fixed!
+  # This hack deals with the fact that keep servers report each other's disks.
+  total_keep_space /= len(keep_stats)
+  free_keep_space /= len(keep_stats)
+
+  log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
+           (fileSizeFormat(total_keep_space),
+            fileSizeFormat(free_keep_space),
+            100*free_keep_space/total_keep_space))
+
+  computeReplication(keep_blocks)
 
   log.info('average replication level is %f',
            (float(sum(block_to_replication.values())) /
             len(block_to_replication)))
 
+  computeGarbageCollectionCandidates()
+
+  if args.garbage_collection_file:
+    log.info('Writing garbage Collection report to %s',
+             args.garbage_collection_file)
+    outputGarbageCollectionReport(args.garbage_collection_file)
+
+  global garbage_collection_histogram
+  garbage_collection_histogram = computeGarbageCollectionHistogram()
+
+  if args.log_to_workbench:
+    logGarbageCollectionHistogram()
+
   detectReplicationProblems()
 
   computeUserStorageUsage()