#
#
-# Copyright (C) 2006, 2007, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""Ganeti node daemon"""
-# pylint: disable-msg=C0103,W0142
+# pylint: disable=C0103,W0142
# C0103: Functions in this module need to have a given name structure,
# and the name of the daemon doesn't match
import sys
import logging
import signal
+import codecs
from optparse import OptionParser
from ganeti import daemon
from ganeti import http
from ganeti import utils
-from ganeti import storage
+from ganeti.storage import container
from ganeti import serializer
from ganeti import netutils
+from ganeti import pathutils
+from ganeti import ssconf
-import ganeti.http.server # pylint: disable-msg=W0611
+import ganeti.http.server # pylint: disable=W0611
queue_lock = None
+def _extendReasonTrail(trail, source, reason=""):
+ """Extend the reason trail with noded information
+
+ The trail is extended by appending the name of the noded functionality
+ """
+ assert trail is not None
+ trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source)
+ trail.append((trail_source, reason, utils.EpochNano()))
+
+
def _PrepareQueueLock():
"""Try to prepare the queue lock.
@return: None for success, otherwise an exception object
"""
- global queue_lock # pylint: disable-msg=W0603
+ global queue_lock # pylint: disable=W0603
if queue_lock is not None:
return None
return ieioargs
+def _DefaultAlternative(value, default):
+ """Returns value or, if evaluating to False, a default value.
+
+ Returns the given value, unless it evaluates to False. In the latter case the
+ default value is returned.
+
+ @param value: Value to return if it doesn't evaluate to False
+ @param default: Default value
+ @return: Given value or the default
+
+ """
+ if value:
+ return value
+
+ return default
+
+
class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
- """Custom Request Executor class that ensures NodeHttpServer children are
- locked in ram.
+ """Subclass ensuring request handlers are locked in RAM.
"""
def __init__(self, *args, **kwargs):
http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
-class NodeHttpServer(http.server.HttpServer):
+class NodeRequestHandler(http.server.HttpServerHandler):
"""The server implementation.
This class holds all methods exposed over the RPC interface.
"""
# too many public methods, and unused args - all methods get params
# due to the API
- # pylint: disable-msg=R0904,W0613
- def __init__(self, *args, **kwargs):
- http.server.HttpServer.__init__(self, *args, **kwargs)
+ # pylint: disable=R0904,W0613
+ def __init__(self):
+ http.server.HttpServerHandler.__init__(self)
self.noded_pid = os.getpid()
def HandleRequest(self, req):
"""Handle a request.
"""
- if req.request_method.upper() != http.HTTP_PUT:
- raise http.HttpBadRequest()
+ if req.request_method.upper() != http.HTTP_POST:
+ raise http.HttpBadRequest("Only the POST method is supported")
path = req.request_path
if path.startswith("/"):
logging.exception("Error in RPC call")
result = (False, "Error while executing backend function: %s" % str(err))
- return serializer.DumpJson(result, indent=False)
+ return serializer.DumpJson(result)
# the new block devices --------------------------
"""Create a block device.
"""
- bdev_s, size, owner, on_primary, info = params
+ (bdev_s, size, owner, on_primary, info, excl_stor) = params
bdev = objects.Disk.FromDict(bdev_s)
if bdev is None:
raise ValueError("can't unserialize data!")
- return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
+ return backend.BlockdevCreate(bdev, size, owner, on_primary, info,
+ excl_stor)
@staticmethod
def perspective_blockdev_pause_resume_sync(params):
"""Remove a block device.
"""
- devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
+ devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]]
return backend.BlockdevRename(devlist)
@staticmethod
"""Assemble a block device.
"""
- bdev_s, owner, on_primary = params
+ bdev_s, owner, on_primary, idx = params
bdev = objects.Disk.FromDict(bdev_s)
if bdev is None:
raise ValueError("can't unserialize data!")
- return backend.BlockdevAssemble(bdev, owner, on_primary)
+ return backend.BlockdevAssemble(bdev, owner, on_primary, idx)
@staticmethod
def perspective_blockdev_shutdown(params):
"""
disks = [objects.Disk.FromDict(dsk_s)
- for dsk_s in params]
+ for dsk_s in params[0]]
return [status.ToDict()
for status in backend.BlockdevGetmirrorstatus(disks)]
"""
(node_disks, ) = params
- node_name = netutils.Hostname.GetSysName()
-
- disks = [objects.Disk.FromDict(dsk_s)
- for dsk_s in node_disks.get(node_name, [])]
+ disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks]
result = []
"""Grow a stack of devices.
"""
+ if len(params) < 4:
+ raise ValueError("Received only 3 parameters in blockdev_grow,"
+ " old master?")
cfbd = objects.Disk.FromDict(params[0])
amount = params[1]
- return backend.BlockdevGrow(cfbd, amount)
+ dryrun = params[2]
+ backingstore = params[3]
+ return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore)
@staticmethod
def perspective_blockdev_close(params):
return backend.BlockdevClose(params[0], disks)
@staticmethod
- def perspective_blockdev_getsize(params):
+ def perspective_blockdev_getdimensions(params):
"""Compute the sizes of the given block devices.
"""
disks = [objects.Disk.FromDict(cf) for cf in params[0]]
- return backend.BlockdevGetsize(disks)
+ return backend.BlockdevGetdimensions(disks)
@staticmethod
def perspective_blockdev_export(params):
dest_node, dest_path, cluster_name = params[1:]
return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
+ @staticmethod
+ def perspective_blockdev_setinfo(params):
+ """Sets metadata information on the given block device.
+
+ """
+ (disk, info) = params
+ disk = objects.Disk.FromDict(disk)
+ return backend.BlockdevSetInfo(disk, info)
+
# blockdev/drbd specific methods ----------
@staticmethod
Note that as opposed to export_info, which may query data about an
export in any path, this only queries the standard Ganeti path
- (constants.EXPORT_DIR).
+ (pathutils.EXPORT_DIR).
"""
return backend.ListExports()
export = params[0]
return backend.RemoveExport(export)
+ # block device ---------------------
+ @staticmethod
+ def perspective_bdev_sizes(params):
+ """Query the list of block devices
+
+ """
+ devices = params[0]
+ return backend.GetBlockDevSizes(devices)
+
# volume --------------------------
@staticmethod
"""
(su_name, su_args, name, fields) = params
- return storage.GetStorage(su_name, *su_args).List(name, fields)
+ return container.GetStorage(su_name, *su_args).List(name, fields)
@staticmethod
def perspective_storage_modify(params):
"""
(su_name, su_args, name, changes) = params
- return storage.GetStorage(su_name, *su_args).Modify(name, changes)
+ return container.GetStorage(su_name, *su_args).Modify(name, changes)
@staticmethod
def perspective_storage_execute(params):
"""
(su_name, su_args, name, op) = params
- return storage.GetStorage(su_name, *su_args).Execute(name, op)
+ return container.GetStorage(su_name, *su_args).Execute(name, op)
# bridge --------------------------
"""
instance = objects.Instance.FromDict(params[0])
timeout = params[1]
- return backend.InstanceShutdown(instance, timeout)
+ trail = params[2]
+ _extendReasonTrail(trail, "shutdown")
+ return backend.InstanceShutdown(instance, timeout, trail)
@staticmethod
def perspective_instance_start(params):
"""Start an instance.
"""
- instance = objects.Instance.FromDict(params[0])
- return backend.StartInstance(instance)
+ (instance_name, startup_paused, trail) = params
+ instance = objects.Instance.FromDict(instance_name)
+ _extendReasonTrail(trail, "start")
+ return backend.StartInstance(instance, startup_paused, trail)
@staticmethod
def perspective_migration_info(params):
return backend.AcceptInstance(instance, info, target)
@staticmethod
- def perspective_finalize_migration(params):
- """Finalize the instance migration.
+ def perspective_instance_finalize_migration_dst(params):
+ """Finalize the instance migration on the destination node.
"""
instance, info, success = params
instance = objects.Instance.FromDict(instance)
- return backend.FinalizeMigration(instance, info, success)
+ return backend.FinalizeMigrationDst(instance, info, success)
@staticmethod
def perspective_instance_migrate(params):
return backend.MigrateInstance(instance, target, live)
@staticmethod
+ def perspective_instance_finalize_migration_src(params):
+ """Finalize the instance migration on the source node.
+
+ """
+ instance, success, live = params
+ instance = objects.Instance.FromDict(instance)
+ return backend.FinalizeMigrationSource(instance, success, live)
+
+ @staticmethod
+ def perspective_instance_get_migration_status(params):
+ """Reports migration status.
+
+ """
+ instance = objects.Instance.FromDict(params[0])
+ return backend.GetMigrationStatus(instance).ToDict()
+
+ @staticmethod
def perspective_instance_reboot(params):
"""Reboot an instance.
instance = objects.Instance.FromDict(params[0])
reboot_type = params[1]
shutdown_timeout = params[2]
- return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
+ trail = params[3]
+ _extendReasonTrail(trail, "reboot")
+ return backend.InstanceReboot(instance, reboot_type, shutdown_timeout,
+ trail)
+
+ @staticmethod
+ def perspective_instance_balloon_memory(params):
+ """Modify instance runtime memory.
+
+ """
+ instance_dict, memory = params
+ instance = objects.Instance.FromDict(instance_dict)
+ return backend.InstanceBalloonMemory(instance, memory)
@staticmethod
def perspective_instance_info(params):
# node --------------------------
@staticmethod
- def perspective_node_tcp_ping(params):
- """Do a TcpPing on the remote node.
-
- """
- return netutils.TcpPing(params[1], params[2], timeout=params[3],
- live_port_needed=params[4], source=params[0])
-
- @staticmethod
def perspective_node_has_ip_address(params):
"""Checks if a node has the given ip address.
"""Query node information.
"""
- vgname, hypervisor_type = params
- return backend.GetNodeInfo(vgname, hypervisor_type)
+ (storage_units, hv_names, excl_stor) = params
+ return backend.GetNodeInfo(storage_units, hv_names, excl_stor)
@staticmethod
def perspective_etc_hosts_modify(params):
"""
return backend.VerifyNode(params[0], params[1])
+ @classmethod
+ def perspective_node_verify_light(cls, params):
+ """Run a light verify sequence on this node.
+
+ """
+ # So far it's the same as the normal node_verify
+ return cls.perspective_node_verify(params)
+
+ @staticmethod
+ def perspective_node_start_master_daemons(params):
+ """Start the master daemons on this node.
+
+ """
+ return backend.StartMasterDaemons(params[0])
+
@staticmethod
- def perspective_node_start_master(params):
- """Promote this node to master status.
+ def perspective_node_activate_master_ip(params):
+ """Activate the master IP on this node.
"""
- return backend.StartMaster(params[0], params[1])
+ master_params = objects.MasterNetworkParameters.FromDict(params[0])
+ return backend.ActivateMasterIp(master_params, params[1])
+
+ @staticmethod
+ def perspective_node_deactivate_master_ip(params):
+ """Deactivate the master IP on this node.
+
+ """
+ master_params = objects.MasterNetworkParameters.FromDict(params[0])
+ return backend.DeactivateMasterIp(master_params, params[1])
@staticmethod
def perspective_node_stop_master(params):
- """Demote this node from master status.
+ """Stops master daemons on this node.
+
+ """
+ return backend.StopMasterDaemons()
+
+ @staticmethod
+ def perspective_node_change_master_netmask(params):
+ """Change the master IP netmask.
"""
- return backend.StopMaster(params[0])
+ return backend.ChangeMasterNetmask(params[0], params[1], params[2],
+ params[3])
@staticmethod
def perspective_node_leave_cluster(params):
"""
return backend.DemoteFromMC()
-
@staticmethod
def perspective_node_powercycle(params):
"""Tries to powercycle the nod.
hypervisor_type = params[0]
return backend.PowercycleNode(hypervisor_type)
-
# cluster --------------------------
@staticmethod
files are accepted.
"""
- return backend.UploadFile(*params)
+ return backend.UploadFile(*(params[0]))
@staticmethod
def perspective_master_info(params):
return result
@staticmethod
+ def perspective_restricted_command(params):
+ """Runs a restricted command.
+
+ """
+ (cmd, ) = params
+
+ return backend.RunRestrictedCmd(cmd)
+
+ @staticmethod
def perspective_write_ssconf_files(params):
"""Write ssconf files.
"""
(values,) = params
- return backend.WriteSsconfFiles(values)
+ return ssconf.WriteSsconfFiles(values)
+
+ @staticmethod
+ def perspective_get_watcher_pause(params):
+ """Get watcher pause end.
+
+ """
+ return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE)
+
+ @staticmethod
+ def perspective_set_watcher_pause(params):
+ """Set watcher pause.
+
+ """
+ (until, ) = params
+ return backend.SetWatcherPause(until)
# os -----------------------
required, name, checks, params = params
return backend.ValidateOS(required, name, checks, params)
+ # extstorage -----------------------
+
+ @staticmethod
+ def perspective_extstorage_diagnose(params):
+ """Query detailed information about existing extstorage providers.
+
+ """
+ return backend.DiagnoseExtStorage()
+
# hooks -----------------------
@staticmethod
"""
# TODO: What if a file fails to rename?
- return [backend.JobQueueRename(old, new) for old, new in params]
+ return [backend.JobQueueRename(old, new) for old, new in params[0]]
+
+ @staticmethod
+ @_RequireJobQueueLock
+ def perspective_jobqueue_set_drain_flag(params):
+ """Set job queue's drain flag.
+
+ """
+ (flag, ) = params
+
+ return jstore.SetDrainFlag(flag)
# hypervisor ---------------
"""Starts an import daemon.
"""
- (opts_s, instance, dest, dest_args) = params
+ (opts_s, instance, component, (dest, dest_args)) = params
opts = objects.ImportExportOptions.FromDict(opts_s)
return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
None, None,
objects.Instance.FromDict(instance),
- dest,
+ component, dest,
_DecodeImportExportIO(dest,
dest_args))
"""Starts an export daemon.
"""
- (opts_s, host, port, instance, source, source_args) = params
+ (opts_s, host, port, instance, component, (source, source_args)) = params
opts = objects.ImportExportOptions.FromDict(opts_s)
return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
host, port,
objects.Instance.FromDict(instance),
- source,
+ component, source,
_DecodeImportExportIO(source,
source_args))
print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
sys.argv[0])
sys.exit(constants.EXIT_FAILURE)
+ try:
+ codecs.lookup("string-escape")
+ except LookupError:
+ print >> sys.stderr, ("Can't load the string-escape code which is part"
+ " of the Python installation. Is your installation"
+ " complete/correct? Aborting.")
+ sys.exit(constants.EXIT_FAILURE)
def PrepNoded(options, _):
# startup of the whole node daemon because of this
logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
+ handler = NodeRequestHandler()
+
mainloop = daemon.Mainloop()
- server = NodeHttpServer(mainloop, options.bind_address, options.port,
- ssl_params=ssl_params, ssl_verify_peer=True,
- request_executor_class=request_executor_class)
+ server = \
+ http.server.HttpServer(mainloop, options.bind_address, options.port,
+ handler, ssl_params=ssl_params, ssl_verify_peer=True,
+ request_executor_class=request_executor_class)
server.Start()
+
return (mainloop, server)
-def ExecNoded(options, args, prep_data): # pylint: disable-msg=W0613
+def ExecNoded(options, args, prep_data): # pylint: disable=W0613
"""Main node daemon function, executed with the PID file held.
"""
"""
parser = OptionParser(description="Ganeti node daemon",
- usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
+ usage="%prog [-f] [-d] [-p port] [-b ADDRESS]\
+ \ [-i INTERFACE]",
version="%%prog (ganeti) %s" %
constants.RELEASE_VERSION)
parser.add_option("--no-mlock", dest="mlock",
default=True, action="store_false")
daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
- default_ssl_cert=constants.NODED_CERT_FILE,
- default_ssl_key=constants.NODED_CERT_FILE,
+ default_ssl_cert=pathutils.NODED_CERT_FILE,
+ default_ssl_key=pathutils.NODED_CERT_FILE,
console_logging=True)