Index nodes by their UUID
[ganeti-local] / lib / server / noded.py
index 0377ba9..386e568 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
-# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -45,9 +45,11 @@ from ganeti import jstore
 from ganeti import daemon
 from ganeti import http
 from ganeti import utils
-from ganeti import storage
+from ganeti.storage import container
 from ganeti import serializer
 from ganeti import netutils
+from ganeti import pathutils
+from ganeti import ssconf
 
 import ganeti.http.server # pylint: disable=W0611
 
@@ -55,6 +57,16 @@ import ganeti.http.server # pylint: disable=W0611
 queue_lock = None
 
 
+def _extendReasonTrail(trail, source, reason=""):
+  """Extend the reason trail with noded information
+
+  The trail is extended by appending the name of the noded functionality
+  """
+  assert trail is not None
+  trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
+  trail.append((trail_source, reason, utils.EpochNano()))
+
+
 def _PrepareQueueLock():
   """Try to prepare the queue lock.
 
@@ -110,9 +122,25 @@ def _DecodeImportExportIO(ieio, ieioargs):
   return ieioargs
 
 
+def _DefaultAlternative(value, default):
+  """Returns value or, if evaluating to False, a default value.
+
+  Returns the given value, unless it evaluates to False. In the latter case the
+  default value is returned.
+
+  @param value: Value to return if it doesn't evaluate to False
+  @param default: Default value
+  @return: Given value or the default
+
+  """
+  if value:
+    return value
+
+  return default
+
+
 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
-  """Custom Request Executor class that ensures NodeHttpServer children are
-  locked in ram.
+  """Subclass ensuring request handlers are locked in RAM.
 
   """
   def __init__(self, *args, **kwargs):
@@ -121,7 +149,7 @@ class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
     http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
 
 
-class NodeHttpServer(http.server.HttpServer):
+class NodeRequestHandler(http.server.HttpServerHandler):
   """The server implementation.
 
   This class holds all methods exposed over the RPC interface.
@@ -130,17 +158,16 @@ class NodeHttpServer(http.server.HttpServer):
   # too many public methods, and unused args - all methods get params
   # due to the API
   # pylint: disable=R0904,W0613
-  def __init__(self, *args, **kwargs):
-    http.server.HttpServer.__init__(self, *args, **kwargs)
+  def __init__(self):
+    http.server.HttpServerHandler.__init__(self)
     self.noded_pid = os.getpid()
 
   def HandleRequest(self, req):
     """Handle a request.
 
     """
-    # FIXME: Remove HTTP_PUT in Ganeti 2.7
-    if req.request_method.upper() not in (http.HTTP_PUT, http.HTTP_POST):
-      raise http.HttpBadRequest("Only PUT and POST methods are supported")
+    if req.request_method.upper() != http.HTTP_POST:
+      raise http.HttpBadRequest("Only the POST method is supported")
 
     path = req.request_path
     if path.startswith("/"):
@@ -170,7 +197,7 @@ class NodeHttpServer(http.server.HttpServer):
       logging.exception("Error in RPC call")
       result = (False, "Error while executing backend function: %s" % str(err))
 
-    return serializer.DumpJson(result, indent=False)
+    return serializer.DumpJson(result)
 
   # the new block devices  --------------------------
 
@@ -179,11 +206,12 @@ class NodeHttpServer(http.server.HttpServer):
     """Create a block device.
 
     """
-    bdev_s, size, owner, on_primary, info = params
+    (bdev_s, size, owner, on_primary, info, excl_stor) = params
     bdev = objects.Disk.FromDict(bdev_s)
     if bdev is None:
       raise ValueError("can't unserialize data!")
-    return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
+    return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
+                                  excl_stor)
 
   @staticmethod
   def perspective_blockdev_pause_resume_sync(params):
@@ -217,7 +245,7 @@ class NodeHttpServer(http.server.HttpServer):
     """Remove a block device.
 
     """
-    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
+    devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
     return backend.BlockdevRename(devlist)
 
   @staticmethod
@@ -278,7 +306,7 @@ class NodeHttpServer(http.server.HttpServer):
 
     """
     disks = [objects.Disk.FromDict(dsk_s)
-             for dsk_s in params]
+             for dsk_s in params[0]]
     return [status.ToDict()
             for status in backend.BlockdevGetmirrorstatus(disks)]
 
@@ -289,10 +317,7 @@ class NodeHttpServer(http.server.HttpServer):
     """
     (node_disks, ) = params
 
-    node_name = netutils.Hostname.GetSysName()
-
-    disks = [objects.Disk.FromDict(dsk_s)
-             for dsk_s in node_disks.get(node_name, [])]
+    disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
 
     result = []
 
@@ -336,10 +361,14 @@ class NodeHttpServer(http.server.HttpServer):
     """Grow a stack of devices.
 
     """
+    if len(params) < 4:
+      raise ValueError("Received only 3 parameters in blockdev_grow,"
+                       " old master?")
     cfbd = objects.Disk.FromDict(params[0])
     amount = params[1]
     dryrun = params[2]
-    return backend.BlockdevGrow(cfbd, amount, dryrun)
+    backingstore = params[3]
+    return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
 
   @staticmethod
   def perspective_blockdev_close(params):
@@ -350,12 +379,12 @@ class NodeHttpServer(http.server.HttpServer):
     return backend.BlockdevClose(params[0], disks)
 
   @staticmethod
-  def perspective_blockdev_getsize(params):
+  def perspective_blockdev_getdimensions(params):
     """Compute the sizes of the given block devices.
 
     """
     disks = [objects.Disk.FromDict(cf) for cf in params[0]]
-    return backend.BlockdevGetsize(disks)
+    return backend.BlockdevGetdimensions(disks)
 
   @staticmethod
   def perspective_blockdev_export(params):
@@ -366,6 +395,15 @@ class NodeHttpServer(http.server.HttpServer):
     dest_node, dest_path, cluster_name = params[1:]
     return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
 
+  @staticmethod
+  def perspective_blockdev_setinfo(params):
+    """Sets metadata information on the given block device.
+
+    """
+    (disk, info) = params
+    disk = objects.Disk.FromDict(disk)
+    return backend.BlockdevSetInfo(disk, info)
+
   # blockdev/drbd specific methods ----------
 
   @staticmethod
@@ -376,9 +414,9 @@ class NodeHttpServer(http.server.HttpServer):
     disk list must all be drbd devices.
 
     """
-    nodes_ip, disks = params
+    nodes_ip, disks, target_node_uuid = params
     disks = [objects.Disk.FromDict(cf) for cf in disks]
-    return backend.DrbdDisconnectNet(nodes_ip, disks)
+    return backend.DrbdDisconnectNet(target_node_uuid, nodes_ip, disks)
 
   @staticmethod
   def perspective_drbd_attach_net(params):
@@ -388,10 +426,10 @@ class NodeHttpServer(http.server.HttpServer):
     disk list must all be drbd devices.
 
     """
-    nodes_ip, disks, instance_name, multimaster = params
+    nodes_ip, disks, instance_name, multimaster, target_node_uuid = params
     disks = [objects.Disk.FromDict(cf) for cf in disks]
-    return backend.DrbdAttachNet(nodes_ip, disks,
-                                     instance_name, multimaster)
+    return backend.DrbdAttachNet(target_node_uuid, nodes_ip, disks,
+                                 instance_name, multimaster)
 
   @staticmethod
   def perspective_drbd_wait_sync(params):
@@ -401,9 +439,9 @@ class NodeHttpServer(http.server.HttpServer):
     disk list must all be drbd devices.
 
     """
-    nodes_ip, disks = params
+    nodes_ip, disks, target_node_uuid = params
     disks = [objects.Disk.FromDict(cf) for cf in disks]
-    return backend.DrbdWaitSync(nodes_ip, disks)
+    return backend.DrbdWaitSync(target_node_uuid, nodes_ip, disks)
 
   @staticmethod
   def perspective_drbd_helper(params):
@@ -447,7 +485,7 @@ class NodeHttpServer(http.server.HttpServer):
 
     Note that as opposed to export_info, which may query data about an
     export in any path, this only queries the standard Ganeti path
-    (constants.EXPORT_DIR).
+    (pathutils.EXPORT_DIR).
 
     """
     return backend.ListExports()
@@ -494,7 +532,7 @@ class NodeHttpServer(http.server.HttpServer):
 
     """
     (su_name, su_args, name, fields) = params
-    return storage.GetStorage(su_name, *su_args).List(name, fields)
+    return container.GetStorage(su_name, *su_args).List(name, fields)
 
   @staticmethod
   def perspective_storage_modify(params):
@@ -502,7 +540,7 @@ class NodeHttpServer(http.server.HttpServer):
 
     """
     (su_name, su_args, name, changes) = params
-    return storage.GetStorage(su_name, *su_args).Modify(name, changes)
+    return container.GetStorage(su_name, *su_args).Modify(name, changes)
 
   @staticmethod
   def perspective_storage_execute(params):
@@ -510,7 +548,7 @@ class NodeHttpServer(http.server.HttpServer):
 
     """
     (su_name, su_args, name, op) = params
-    return storage.GetStorage(su_name, *su_args).Execute(name, op)
+    return container.GetStorage(su_name, *su_args).Execute(name, op)
 
   # bridge  --------------------------
 
@@ -551,16 +589,19 @@ class NodeHttpServer(http.server.HttpServer):
     """
     instance = objects.Instance.FromDict(params[0])
     timeout = params[1]
-    return backend.InstanceShutdown(instance, timeout)
+    trail = params[2]
+    _extendReasonTrail(trail, "shutdown")
+    return backend.InstanceShutdown(instance, timeout, trail)
 
   @staticmethod
   def perspective_instance_start(params):
     """Start an instance.
 
     """
-    (instance_name, startup_paused) = params
+    (instance_name, startup_paused, trail) = params
     instance = objects.Instance.FromDict(instance_name)
-    return backend.StartInstance(instance, startup_paused)
+    _extendReasonTrail(trail, "start")
+    return backend.StartInstance(instance, startup_paused, trail)
 
   @staticmethod
   def perspective_migration_info(params):
@@ -580,22 +621,39 @@ class NodeHttpServer(http.server.HttpServer):
     return backend.AcceptInstance(instance, info, target)
 
   @staticmethod
-  def perspective_finalize_migration(params):
-    """Finalize the instance migration.
+  def perspective_instance_finalize_migration_dst(params):
+    """Finalize the instance migration on the destination node.
 
     """
     instance, info, success = params
     instance = objects.Instance.FromDict(instance)
-    return backend.FinalizeMigration(instance, info, success)
+    return backend.FinalizeMigrationDst(instance, info, success)
 
   @staticmethod
   def perspective_instance_migrate(params):
     """Migrates an instance.
 
     """
-    instance, target, live = params
+    cluster_name, instance, target, live = params
     instance = objects.Instance.FromDict(instance)
-    return backend.MigrateInstance(instance, target, live)
+    return backend.MigrateInstance(cluster_name, instance, target, live)
+
+  @staticmethod
+  def perspective_instance_finalize_migration_src(params):
+    """Finalize the instance migration on the source node.
+
+    """
+    instance, success, live = params
+    instance = objects.Instance.FromDict(instance)
+    return backend.FinalizeMigrationSource(instance, success, live)
+
+  @staticmethod
+  def perspective_instance_get_migration_status(params):
+    """Reports migration status.
+
+    """
+    instance = objects.Instance.FromDict(params[0])
+    return backend.GetMigrationStatus(instance).ToDict()
 
   @staticmethod
   def perspective_instance_reboot(params):
@@ -605,14 +663,27 @@ class NodeHttpServer(http.server.HttpServer):
     instance = objects.Instance.FromDict(params[0])
     reboot_type = params[1]
     shutdown_timeout = params[2]
-    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
+    trail = params[3]
+    _extendReasonTrail(trail, "reboot")
+    return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
+                                  trail)
+
+  @staticmethod
+  def perspective_instance_balloon_memory(params):
+    """Modify instance runtime memory.
+
+    """
+    instance_dict, memory = params
+    instance = objects.Instance.FromDict(instance_dict)
+    return backend.InstanceBalloonMemory(instance, memory)
 
   @staticmethod
   def perspective_instance_info(params):
     """Query instance information.
 
     """
-    return backend.GetInstanceInfo(params[0], params[1])
+    (instance_name, hypervisor_name, hvparams) = params
+    return backend.GetInstanceInfo(instance_name, hypervisor_name, hvparams)
 
   @staticmethod
   def perspective_instance_migratable(params):
@@ -627,26 +698,20 @@ class NodeHttpServer(http.server.HttpServer):
     """Query information about all instances.
 
     """
-    return backend.GetAllInstancesInfo(params[0])
+    (hypervisor_list, all_hvparams) = params
+    return backend.GetAllInstancesInfo(hypervisor_list, all_hvparams)
 
   @staticmethod
   def perspective_instance_list(params):
     """Query the list of running instances.
 
     """
-    return backend.GetInstanceList(params[0])
+    (hypervisor_list, hvparams) = params
+    return backend.GetInstanceList(hypervisor_list, hvparams)
 
   # node --------------------------
 
   @staticmethod
-  def perspective_node_tcp_ping(params):
-    """Do a TcpPing on the remote node.
-
-    """
-    return netutils.TcpPing(params[1], params[2], timeout=params[3],
-                            live_port_needed=params[4], source=params[0])
-
-  @staticmethod
   def perspective_node_has_ip_address(params):
     """Checks if a node has the given ip address.
 
@@ -658,8 +723,8 @@ class NodeHttpServer(http.server.HttpServer):
     """Query node information.
 
     """
-    vgname, hypervisor_type = params
-    return backend.GetNodeInfo(vgname, hypervisor_type)
+    (storage_units, hv_specs, excl_stor) = params
+    return backend.GetNodeInfo(storage_units, hv_specs, excl_stor)
 
   @staticmethod
   def perspective_etc_hosts_modify(params):
@@ -675,7 +740,16 @@ class NodeHttpServer(http.server.HttpServer):
     """Run a verify sequence on this node.
 
     """
-    return backend.VerifyNode(params[0], params[1])
+    (what, cluster_name, hvparams) = params
+    return backend.VerifyNode(what, cluster_name, hvparams)
+
+  @classmethod
+  def perspective_node_verify_light(cls, params):
+    """Run a light verify sequence on this node.
+
+    """
+    # So far it's the same as the normal node_verify
+    return cls.perspective_node_verify(params)
 
   @staticmethod
   def perspective_node_start_master_daemons(params):
@@ -689,27 +763,33 @@ class NodeHttpServer(http.server.HttpServer):
     """Activate the master IP on this node.
 
     """
-    return backend.ActivateMasterIp()
+    master_params = objects.MasterNetworkParameters.FromDict(params[0])
+    return backend.ActivateMasterIp(master_params, params[1])
 
   @staticmethod
   def perspective_node_deactivate_master_ip(params):
     """Deactivate the master IP on this node.
 
     """
-    return backend.DeactivateMasterIp()
+    master_params = objects.MasterNetworkParameters.FromDict(params[0])
+    return backend.DeactivateMasterIp(master_params, params[1])
 
   @staticmethod
   def perspective_node_stop_master(params):
-    """Deactivate the master IP and stops master daemons on this node.
-
-    Sometimes both operations need to be executed at the same time (doing one of
-    the two would make impossible to do the other one).
+    """Stops master daemons on this node.
 
     """
-    backend.DeactivateMasterIp()
     return backend.StopMasterDaemons()
 
   @staticmethod
+  def perspective_node_change_master_netmask(params):
+    """Change the master IP netmask.
+
+    """
+    return backend.ChangeMasterNetmask(params[0], params[1], params[2],
+                                       params[3])
+
+  @staticmethod
   def perspective_node_leave_cluster(params):
     """Cleanup after leaving a cluster.
 
@@ -735,8 +815,8 @@ class NodeHttpServer(http.server.HttpServer):
     """Tries to powercycle the nod.
 
     """
-    hypervisor_type = params[0]
-    return backend.PowercycleNode(hypervisor_type)
+    (hypervisor_type, hvparams) = params
+    return backend.PowercycleNode(hypervisor_type, hvparams)
 
   # cluster --------------------------
 
@@ -755,7 +835,7 @@ class NodeHttpServer(http.server.HttpServer):
     files are accepted.
 
     """
-    return backend.UploadFile(*params)
+    return backend.UploadFile(*(params[0]))
 
   @staticmethod
   def perspective_master_info(params):
@@ -777,12 +857,36 @@ class NodeHttpServer(http.server.HttpServer):
     return result
 
   @staticmethod
+  def perspective_restricted_command(params):
+    """Runs a restricted command.
+
+    """
+    (cmd, ) = params
+
+    return backend.RunRestrictedCmd(cmd)
+
+  @staticmethod
   def perspective_write_ssconf_files(params):
     """Write ssconf files.
 
     """
     (values,) = params
-    return backend.WriteSsconfFiles(values)
+    return ssconf.WriteSsconfFiles(values)
+
+  @staticmethod
+  def perspective_get_watcher_pause(params):
+    """Get watcher pause end.
+
+    """
+    return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
+
+  @staticmethod
+  def perspective_set_watcher_pause(params):
+    """Set watcher pause.
+
+    """
+    (until, ) = params
+    return backend.SetWatcherPause(until)
 
   # os -----------------------
 
@@ -810,6 +914,15 @@ class NodeHttpServer(http.server.HttpServer):
     required, name, checks, params = params
     return backend.ValidateOS(required, name, checks, params)
 
+  # extstorage -----------------------
+
+  @staticmethod
+  def perspective_extstorage_diagnose(params):
+    """Query detailed information about existing extstorage providers.
+
+    """
+    return backend.DiagnoseExtStorage()
+
   # hooks -----------------------
 
   @staticmethod
@@ -899,7 +1012,17 @@ class NodeHttpServer(http.server.HttpServer):
 
     """
     # TODO: What if a file fails to rename?
-    return [backend.JobQueueRename(old, new) for old, new in params]
+    return [backend.JobQueueRename(old, new) for old, new in params[0]]
+
+  @staticmethod
+  @_RequireJobQueueLock
+  def perspective_jobqueue_set_drain_flag(params):
+    """Set job queue's drain flag.
+
+    """
+    (flag, ) = params
+
+    return jstore.SetDrainFlag(flag)
 
   # hypervisor ---------------
 
@@ -936,7 +1059,7 @@ class NodeHttpServer(http.server.HttpServer):
     """Starts an import daemon.
 
     """
-    (opts_s, instance, component, dest, dest_args) = params
+    (opts_s, instance, component, (dest, dest_args)) = params
 
     opts = objects.ImportExportOptions.FromDict(opts_s)
 
@@ -952,7 +1075,7 @@ class NodeHttpServer(http.server.HttpServer):
     """Starts an export daemon.
 
     """
-    (opts_s, host, port, instance, component, source, source_args) = params
+    (opts_s, host, port, instance, component, (source, source_args)) = params
 
     opts = objects.ImportExportOptions.FromDict(opts_s)
 
@@ -1030,11 +1153,15 @@ def PrepNoded(options, _):
     # startup of the whole node daemon because of this
     logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
 
+  handler = NodeRequestHandler()
+
   mainloop = daemon.Mainloop()
-  server = NodeHttpServer(mainloop, options.bind_address, options.port,
-                          ssl_params=ssl_params, ssl_verify_peer=True,
-                          request_executor_class=request_executor_class)
+  server = \
+    http.server.HttpServer(mainloop, options.bind_address, options.port,
+                           handler, ssl_params=ssl_params, ssl_verify_peer=True,
+                           request_executor_class=request_executor_class)
   server.Start()
+
   return (mainloop, server)
 
 
@@ -1054,7 +1181,8 @@ def Main():
 
   """
   parser = OptionParser(description="Ganeti node daemon",
-                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
+                        usage="%prog [-f] [-d] [-p port] [-b ADDRESS]\
+                              \ [-i INTERFACE]",
                         version="%%prog (ganeti) %s" %
                         constants.RELEASE_VERSION)
   parser.add_option("--no-mlock", dest="mlock",
@@ -1062,6 +1190,6 @@ def Main():
                     default=True, action="store_false")
 
   daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
-                     default_ssl_cert=constants.NODED_CERT_FILE,
-                     default_ssl_key=constants.NODED_CERT_FILE,
+                     default_ssl_cert=pathutils.NODED_CERT_FILE,
+                     default_ssl_key=pathutils.NODED_CERT_FILE,
                      console_logging=True)