Move function cleaning directory to module level
[ganeti-local] / lib / rpc.py
index b840e72..68ac0bc 100644 (file)
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#
 #
 
 # Copyright (C) 2006, 2007 Google Inc.
 # pylint: disable-msg=C0103
 
 import os
+import socket
+import httplib
 
-from twisted.internet.pollreactor import PollReactor
-
-class ReReactor(PollReactor):
-  """A re-startable Reactor implementation.
-
-  """
-  def run(self, installSignalHandlers=1):
-    """Custom run method.
-
-    This is customized run that, before calling Reactor.run, will
-    reinstall the shutdown events and re-create the threadpool in case
-    these are not present (as will happen on the second run of the
-    reactor).
-
-    """
-    if not 'shutdown' in self._eventTriggers:
-      # the shutdown queue has been killed, we are most probably
-      # at the second run, thus recreate the queue
-      self.addSystemEventTrigger('during', 'shutdown', self.crash)
-      self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
-    if self.threadpool is not None and self.threadpool.joined == 1:
-      # in case the threadpool has been stopped, re-start it
-      # and add a trigger to stop it at reactor shutdown
-      self.threadpool.start()
-      self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
-
-    return PollReactor.run(self, installSignalHandlers)
-
-
-import twisted.internet.main
-twisted.internet.main.installReactor(ReReactor())
-
-from twisted.spread import pb
-from twisted.internet import reactor
-from twisted.cred import credentials
-from OpenSSL import SSL, crypto
+import simplejson
 
 from ganeti import logger
 from ganeti import utils
-from ganeti import errors
-from ganeti import constants
 from ganeti import objects
 from ganeti import ssconf
 
+
 class NodeController:
   """Node-handling class.
 
@@ -82,112 +48,46 @@ class NodeController:
   def __init__(self, parent, node):
     self.parent = parent
     self.node = node
+    self.failed = False
 
-  def _check_end(self):
-    """Stop the reactor if we got all the results.
-
-    """
-    if len(self.parent.results) == len(self.parent.nc):
-      reactor.stop()
-
-  def cb_call(self, obj):
-    """Callback for successful connect.
-
-    If the connect and login sequence succeeded, we proceed with
-    making the actual call.
-
-    """
-    deferred = obj.callRemote(self.parent.procedure, self.parent.args)
-    deferred.addCallbacks(self.cb_done, self.cb_err2)
-
-  def cb_done(self, result):
-    """Callback for successful call.
-
-    When we receive the result from a call, we check if it was an
-    error and if so we raise a generic RemoteError (we can't pass yet
-    the actual exception over). If there was no error, we store the
-    result.
-
-    """
-    tb, self.parent.results[self.node] = result
-    self._check_end()
-    if tb:
-      raise errors.RemoteError("Remote procedure error calling %s on %s:"
-                               "\n%s" % (self.parent.procedure,
-                                         self.node,
-                                         tb))
-
-  def cb_err1(self, reason):
-    """Error callback for unsuccessful connect.
-
-    """
-    logger.Error("caller_connect: could not connect to remote host %s,"
-                 " reason %s" % (self.node, reason))
-    self.parent.results[self.node] = False
-    self._check_end()
-
-  def cb_err2(self, reason):
-    """Error callback for unsuccessful call.
-
-    This is when the call didn't return anything, not even an error,
-    or when it time out, etc.
-
-    """
-    logger.Error("caller_call: could not call %s on node %s,"
-                 " reason %s" % (self.parent.procedure, self.node, reason))
-    self.parent.results[self.node] = False
-    self._check_end()
-
-
-class MirrorContextFactory:
-  """Certificate verifier factory.
-
-  This factory creates contexts that verify if the remote end has a
-  specific certificate (i.e. our own certificate).
-
-  The checks we do are that the PEM dump of the certificate is the
-  same as our own and (somewhat redundantly) that the SHA checksum is
-  the same.
-
-  """
-  isClient = 1
-
-  def __init__(self):
+    self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
     try:
-      fd = open(constants.SSL_CERT_FILE, 'r')
-      try:
-        data = fd.read(16384)
-      finally:
-        fd.close()
-    except EnvironmentError, err:
-      raise errors.ConfigurationError("missing SSL certificate: %s" %
-                                      str(err))
-    self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
-    self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
-    self.mydigest = self.mycert.digest('SHA')
-
-  def verifier(self, conn, x509, errno, err_depth, retcode):
-    """Certificate verify method.
+      hc.connect()
+      hc.putrequest('PUT', "/%s" % self.parent.procedure,
+                    skip_accept_encoding=True)
+      hc.putheader('Content-Length', str(len(parent.body)))
+      hc.endheaders()
+      hc.send(parent.body)
+    except socket.error, err:
+      logger.Error("Error connecting to %s: %s" % (node, str(err)))
+      self.failed = True
+
+  def get_response(self):
+    """Try to process the response from the node.
 
     """
-    if self.mydigest != x509.digest('SHA'):
+    if self.failed:
+      # we already failed in connect
       return False
-    if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
+    resp = self.http_conn.getresponse()
+    if resp.status != 200:
       return False
-    return True
-
-  def getContext(self):
-    """Context generator.
+    try:
+      length = int(resp.getheader('Content-Length', '0'))
+    except ValueError:
+      return False
+    if not length:
+      logger.Error("Zero-length reply from %s" % self.node)
+      return False
+    payload = resp.read(length)
+    unload = simplejson.loads(payload)
+    return unload
 
-    """
-    context = SSL.Context(SSL.TLSv1_METHOD)
-    context.set_verify(SSL.VERIFY_PEER, self.verifier)
-    return context
 
 class Client:
   """RPC Client class.
 
-  This class, given a (remote) ethod name, a list of parameters and a
+  This class, given a (remote) method name, a list of parameters and a
   list of nodes, will contact (in parallel) all nodes, and return a
   dict of results (key: node name, value: result).
 
@@ -208,6 +108,7 @@ class Client:
     self.results = {}
     self.procedure = procedure
     self.args = args
+    self.body = simplejson.dumps(args)
 
   #--- generic connector -------------
 
@@ -222,13 +123,7 @@ class Client:
     """Add a node to the target list.
 
     """
-    factory = pb.PBClientFactory()
     self.nc[connect_node] = nc = NodeController(self, connect_node)
-    reactor.connectSSL(connect_node, self.port, factory,
-                       MirrorContextFactory())
-    #d = factory.getRootObject()
-    d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
-    d.addCallbacks(nc.cb_call, nc.cb_err1)
 
   def getresult(self):
     """Return the results of the call.
@@ -243,8 +138,8 @@ class Client:
     queued, otherwise it does nothing.
 
     """
-    if self.nc:
-      reactor.run()
+    for node, nc in self.nc.items():
+      self.results[node] = nc.get_response()
 
 
 def call_volume_list(node_list, vg_name):
@@ -288,7 +183,7 @@ def call_bridges_exist(node, bridges_list):
 
 
 def call_instance_start(node, instance, extra_args):
-  """Stars an instance.
+  """Starts an instance.
 
   This is a single-node call.
 
@@ -311,6 +206,30 @@ def call_instance_shutdown(node, instance):
   return c.getresult().get(node, False)
 
 
+def call_instance_migrate(node, instance, target, live):
+  """Migrate an instance.
+
+  This is a single-node call.
+
+  """
+  c = Client("instance_migrate", [instance.name, target, live])
+  c.connect(node)
+  c.run()
+  return c.getresult().get(node, False)
+
+
+def call_instance_reboot(node, instance, reboot_type, extra_args):
+  """Reboots an instance.
+
+  This is a single-node call.
+
+  """
+  c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
+  c.connect(node)
+  c.run()
+  return c.getresult().get(node, False)
+
+
 def call_instance_os_add(node, inst, osdev, swapdev):
   """Installs an OS on the given instance.
 
@@ -373,6 +292,18 @@ def call_instance_list(node_list):
   return c.getresult()
 
 
+def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
+  """Do a TcpPing on the remote node
+
+  This is a single-node call.
+  """
+  c = Client("node_tcp_ping", [source, target, port, timeout,
+                               live_port_needed])
+  c.connect(node)
+  c.run()
+  return c.getresult().get(node, False)
+
+
 def call_node_info(node_list, vg_name):
   """Return node information.
 
@@ -429,25 +360,25 @@ def call_node_verify(node_list, checkdict):
   return c.getresult()
 
 
-def call_node_start_master(node):
+def call_node_start_master(node, start_daemons):
   """Tells a node to activate itself as a master.
 
   This is a single-node call.
 
   """
-  c = Client("node_start_master", [])
+  c = Client("node_start_master", [start_daemons])
   c.connect(node)
   c.run()
   return c.getresult().get(node, False)
 
 
-def call_node_stop_master(node):
+def call_node_stop_master(node, stop_daemons):
   """Tells a node to demote itself from master status.
 
   This is a single-node call.
 
   """
-  c = Client("node_stop_master", [])
+  c = Client("node_stop_master", [stop_daemons])
   c.connect(node)
   c.run()
   return c.getresult().get(node, False)
@@ -465,13 +396,13 @@ def call_version(node_list):
   return c.getresult()
 
 
-def call_blockdev_create(node, bdev, size, on_primary, info):
+def call_blockdev_create(node, bdev, size, owner, on_primary, info):
   """Request creation of a given block device.
 
   This is a single-node call.
 
   """
-  params = [bdev.ToDict(), size, on_primary, info]
+  params = [bdev.ToDict(), size, owner, on_primary, info]
   c = Client("blockdev_create", params)
   c.connect(node)
   c.run()
@@ -490,13 +421,26 @@ def call_blockdev_remove(node, bdev):
   return c.getresult().get(node, False)
 
 
-def call_blockdev_assemble(node, disk, on_primary):
+def call_blockdev_rename(node, devlist):
+  """Request rename of the given block devices.
+
+  This is a single-node call.
+
+  """
+  params = [(d.ToDict(), uid) for d, uid in devlist]
+  c = Client("blockdev_rename", params)
+  c.connect(node)
+  c.run()
+  return c.getresult().get(node, False)
+
+
+def call_blockdev_assemble(node, disk, owner, on_primary):
   """Request assembling of a given block device.
 
   This is a single-node call.
 
   """
-  params = [disk.ToDict(), on_primary]
+  params = [disk.ToDict(), owner, on_primary]
   c = Client("blockdev_assemble", params)
   c.connect(node)
   c.run()
@@ -515,27 +459,27 @@ def call_blockdev_shutdown(node, disk):
   return c.getresult().get(node, False)
 
 
-def call_blockdev_addchild(node, bdev, ndev):
-  """Request adding a new child to a (mirroring) device.
+def call_blockdev_addchildren(node, bdev, ndevs):
+  """Request adding a list of children to a (mirroring) device.
 
   This is a single-node call.
 
   """
-  params = [bdev.ToDict(), ndev.ToDict()]
-  c = Client("blockdev_addchild", params)
+  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
+  c = Client("blockdev_addchildren", params)
   c.connect(node)
   c.run()
   return c.getresult().get(node, False)
 
 
-def call_blockdev_removechild(node, bdev, ndev):
-  """Request removing a new child from a (mirroring) device.
+def call_blockdev_removechildren(node, bdev, ndevs):
+  """Request removing a list of children from a (mirroring) device.
 
   This is a single-node call.
 
   """
-  params = [bdev.ToDict(), ndev.ToDict()]
-  c = Client("blockdev_removechild", params)
+  params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
+  c = Client("blockdev_removechildren", params)
   c.connect(node)
   c.run()
   return c.getresult().get(node, False)
@@ -566,6 +510,19 @@ def call_blockdev_find(node, disk):
   return c.getresult().get(node, False)
 
 
+def call_blockdev_close(node, disks):
+  """Closes the given block devices.
+
+  This is a single-node call.
+
+  """
+  params = [cf.ToDict() for cf in disks]
+  c = Client("blockdev_close", params)
+  c.connect(node)
+  c.run()
+  return c.getresult().get(node, False)
+
+
 def call_upload_file(node_list, file_name):
   """Upload a file.
 
@@ -601,41 +558,28 @@ def call_os_diagnose(node_list):
   result = c.getresult()
   new_result = {}
   for node_name in result:
-    nr = []
     if result[node_name]:
-      for data in result[node_name]:
-        if data:
-          if isinstance(data, dict):
-            nr.append(objects.OS.FromDict(data))
-          elif isinstance(data, tuple) and len(data) == 3:
-            nr.append(errors.InvalidOS(data[0], data[1], data[2]))
-          else:
-            raise errors.ProgrammerError("Invalid data from"
-                                         " xcserver.os_diagnose")
+      nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
+    else:
+      nr = []
     new_result[node_name] = nr
   return new_result
 
 
-def call_os_get(node_list, name):
+def call_os_get(node, name):
   """Returns an OS definition.
 
-  This is a multi-node call.
+  This is a single-node call.
 
   """
   c = Client("os_get", [name])
-  c.connect_list(node_list)
+  c.connect(node)
   c.run()
-  result = c.getresult()
-  new_result = {}
-  for node_name in result:
-    data = result[node_name]
-    if isinstance(data, dict):
-      new_result[node_name] = objects.OS.FromDict(data)
-    elif isinstance(data, tuple) and len(data) == 3:
-      new_result[node_name] = errors.InvalidOS(data[0], data[1], data[2])
-    else:
-      new_result[node_name] = data
-  return new_result
+  result = c.getresult().get(node, False)
+  if isinstance(result, dict):
+    return objects.OS.FromDict(result)
+  else:
+    return result
 
 
 def call_hooks_runner(node_list, hpath, phase, env):
@@ -656,6 +600,36 @@ def call_hooks_runner(node_list, hpath, phase, env):
   return result
 
 
+def call_iallocator_runner(node, name, idata):
+  """Call an iallocator on a remote node
+
+  Args:
+    - name: the iallocator name
+    - input: the json-encoded input string
+
+  This is a single-node call.
+
+  """
+  params = [name, idata]
+  c = Client("iallocator_runner", params)
+  c.connect(node)
+  c.run()
+  result = c.getresult().get(node, False)
+  return result
+
+
+def call_blockdev_grow(node, cf_bdev, amount):
+  """Request a snapshot of the given block device.
+
+  This is a single-node call.
+
+  """
+  c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
+  c.connect(node)
+  c.run()
+  return c.getresult().get(node, False)
+
+
 def call_blockdev_snapshot(node, cf_bdev):
   """Request a snapshot of the given block device.
 
@@ -711,7 +685,7 @@ def call_export_info(node, path):
   result = c.getresult().get(node, False)
   if not result:
     return result
-  return objects.SerializableConfigParser.Loads(result)
+  return objects.SerializableConfigParser.Loads(str(result))
 
 
 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
@@ -777,3 +751,53 @@ def call_node_volumes(node_list):
   c.connect_list(node_list)
   c.run()
   return c.getresult()
+
+
+def call_test_delay(node_list, duration):
+  """Sleep for a fixed time on given node(s).
+
+  This is a multi-node call.
+
+  """
+  c = Client("test_delay", [duration])
+  c.connect_list(node_list)
+  c.run()
+  return c.getresult()
+
+
+def call_file_storage_dir_create(node, file_storage_dir):
+  """Create the given file storage directory.
+
+  This is a single-node call.
+
+  """
+  c = Client("file_storage_dir_create", [file_storage_dir])
+  c.connect(node)
+  c.run()
+  return c.getresult().get(node, False)
+
+
+def call_file_storage_dir_remove(node, file_storage_dir):
+  """Remove the given file storage directory.
+
+  This is a single-node call.
+
+  """
+  c = Client("file_storage_dir_remove", [file_storage_dir])
+  c.connect(node)
+  c.run()
+  return c.getresult().get(node, False)
+
+
+def call_file_storage_dir_rename(node, old_file_storage_dir,
+                                 new_file_storage_dir):
+  """Rename file storage directory.
+
+  This is a single-node call.
+
+  """
+  c = Client("file_storage_dir_rename",
+             [old_file_storage_dir, new_file_storage_dir])
+  c.connect(node)
+  c.run()
+  return c.getresult().get(node, False)