X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/2be7273ccdad115169ab0f8765f3fe66d0aeff47..802ed2aa856d2cee888f8972e5ac7e865f1d936c:/lib/server/noded.py diff --git a/lib/server/noded.py b/lib/server/noded.py index 0090efb..aedd321 100644 --- a/lib/server/noded.py +++ b/lib/server/noded.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -21,7 +21,7 @@ """Ganeti node daemon""" -# pylint: disable-msg=C0103,W0142 +# pylint: disable=C0103,W0142 # C0103: Functions in this module need to have a given name structure, # and the name of the daemon doesn't match @@ -33,6 +33,7 @@ import os import sys import logging import signal +import codecs from optparse import OptionParser @@ -44,23 +45,35 @@ from ganeti import jstore from ganeti import daemon from ganeti import http from ganeti import utils -from ganeti import storage +from ganeti.storage import container from ganeti import serializer from ganeti import netutils +from ganeti import pathutils +from ganeti import ssconf -import ganeti.http.server # pylint: disable-msg=W0611 +import ganeti.http.server # pylint: disable=W0611 queue_lock = None +def _extendReasonTrail(trail, source, reason=""): + """Extend the reason trail with noded information + + The trail is extended by appending the name of the noded functionality + """ + assert trail is not None + trail_source = "%s:%s" % (constants.OPCODE_REASON_SRC_NODED, source) + trail.append((trail_source, reason, utils.EpochNano())) + + def _PrepareQueueLock(): """Try to prepare the queue lock. @return: None for success, otherwise an exception object """ - global queue_lock # pylint: disable-msg=W0603 + global queue_lock # pylint: disable=W0603 if queue_lock is not None: return None @@ -109,9 +122,25 @@ def _DecodeImportExportIO(ieio, ieioargs): return ieioargs +def _DefaultAlternative(value, default): + """Returns value or, if evaluating to False, a default value. + + Returns the given value, unless it evaluates to False. In the latter case the + default value is returned. + + @param value: Value to return if it doesn't evaluate to False + @param default: Default value + @return: Given value or the default + + """ + if value: + return value + + return default + + class MlockallRequestExecutor(http.server.HttpServerRequestExecutor): - """Custom Request Executor class that ensures NodeHttpServer children are - locked in ram. + """Subclass ensuring request handlers are locked in RAM. """ def __init__(self, *args, **kwargs): @@ -120,7 +149,7 @@ class MlockallRequestExecutor(http.server.HttpServerRequestExecutor): http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs) -class NodeHttpServer(http.server.HttpServer): +class NodeRequestHandler(http.server.HttpServerHandler): """The server implementation. This class holds all methods exposed over the RPC interface. @@ -128,17 +157,17 @@ class NodeHttpServer(http.server.HttpServer): """ # too many public methods, and unused args - all methods get params # due to the API - # pylint: disable-msg=R0904,W0613 - def __init__(self, *args, **kwargs): - http.server.HttpServer.__init__(self, *args, **kwargs) + # pylint: disable=R0904,W0613 + def __init__(self): + http.server.HttpServerHandler.__init__(self) self.noded_pid = os.getpid() def HandleRequest(self, req): """Handle a request. """ - if req.request_method.upper() != http.HTTP_PUT: - raise http.HttpBadRequest() + if req.request_method.upper() != http.HTTP_POST: + raise http.HttpBadRequest("Only the POST method is supported") path = req.request_path if path.startswith("/"): @@ -168,7 +197,7 @@ class NodeHttpServer(http.server.HttpServer): logging.exception("Error in RPC call") result = (False, "Error while executing backend function: %s" % str(err)) - return serializer.DumpJson(result, indent=False) + return serializer.DumpJson(result) # the new block devices -------------------------- @@ -177,11 +206,12 @@ class NodeHttpServer(http.server.HttpServer): """Create a block device. """ - bdev_s, size, owner, on_primary, info = params + (bdev_s, size, owner, on_primary, info, excl_stor) = params bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") - return backend.BlockdevCreate(bdev, size, owner, on_primary, info) + return backend.BlockdevCreate(bdev, size, owner, on_primary, info, + excl_stor) @staticmethod def perspective_blockdev_pause_resume_sync(params): @@ -215,7 +245,7 @@ class NodeHttpServer(http.server.HttpServer): """Remove a block device. """ - devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params] + devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params[0]] return backend.BlockdevRename(devlist) @staticmethod @@ -276,7 +306,7 @@ class NodeHttpServer(http.server.HttpServer): """ disks = [objects.Disk.FromDict(dsk_s) - for dsk_s in params] + for dsk_s in params[0]] return [status.ToDict() for status in backend.BlockdevGetmirrorstatus(disks)] @@ -287,10 +317,7 @@ class NodeHttpServer(http.server.HttpServer): """ (node_disks, ) = params - node_name = netutils.Hostname.GetSysName() - - disks = [objects.Disk.FromDict(dsk_s) - for dsk_s in node_disks.get(node_name, [])] + disks = [objects.Disk.FromDict(dsk_s) for dsk_s in node_disks] result = [] @@ -334,9 +361,14 @@ class NodeHttpServer(http.server.HttpServer): """Grow a stack of devices. """ + if len(params) < 4: + raise ValueError("Received only 3 parameters in blockdev_grow," + " old master?") cfbd = objects.Disk.FromDict(params[0]) amount = params[1] - return backend.BlockdevGrow(cfbd, amount) + dryrun = params[2] + backingstore = params[3] + return backend.BlockdevGrow(cfbd, amount, dryrun, backingstore) @staticmethod def perspective_blockdev_close(params): @@ -347,12 +379,12 @@ class NodeHttpServer(http.server.HttpServer): return backend.BlockdevClose(params[0], disks) @staticmethod - def perspective_blockdev_getsize(params): + def perspective_blockdev_getdimensions(params): """Compute the sizes of the given block devices. """ disks = [objects.Disk.FromDict(cf) for cf in params[0]] - return backend.BlockdevGetsize(disks) + return backend.BlockdevGetdimensions(disks) @staticmethod def perspective_blockdev_export(params): @@ -363,6 +395,15 @@ class NodeHttpServer(http.server.HttpServer): dest_node, dest_path, cluster_name = params[1:] return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name) + @staticmethod + def perspective_blockdev_setinfo(params): + """Sets metadata information on the given block device. + + """ + (disk, info) = params + disk = objects.Disk.FromDict(disk) + return backend.BlockdevSetInfo(disk, info) + # blockdev/drbd specific methods ---------- @staticmethod @@ -444,7 +485,7 @@ class NodeHttpServer(http.server.HttpServer): Note that as opposed to export_info, which may query data about an export in any path, this only queries the standard Ganeti path - (constants.EXPORT_DIR). + (pathutils.EXPORT_DIR). """ return backend.ListExports() @@ -491,7 +532,7 @@ class NodeHttpServer(http.server.HttpServer): """ (su_name, su_args, name, fields) = params - return storage.GetStorage(su_name, *su_args).List(name, fields) + return container.GetStorage(su_name, *su_args).List(name, fields) @staticmethod def perspective_storage_modify(params): @@ -499,7 +540,7 @@ class NodeHttpServer(http.server.HttpServer): """ (su_name, su_args, name, changes) = params - return storage.GetStorage(su_name, *su_args).Modify(name, changes) + return container.GetStorage(su_name, *su_args).Modify(name, changes) @staticmethod def perspective_storage_execute(params): @@ -507,7 +548,7 @@ class NodeHttpServer(http.server.HttpServer): """ (su_name, su_args, name, op) = params - return storage.GetStorage(su_name, *su_args).Execute(name, op) + return container.GetStorage(su_name, *su_args).Execute(name, op) # bridge -------------------------- @@ -548,15 +589,19 @@ class NodeHttpServer(http.server.HttpServer): """ instance = objects.Instance.FromDict(params[0]) timeout = params[1] - return backend.InstanceShutdown(instance, timeout) + trail = params[2] + _extendReasonTrail(trail, "shutdown") + return backend.InstanceShutdown(instance, timeout, trail) @staticmethod def perspective_instance_start(params): """Start an instance. """ - instance = objects.Instance.FromDict(params[0]) - return backend.StartInstance(instance) + (instance_name, startup_paused, trail) = params + instance = objects.Instance.FromDict(instance_name) + _extendReasonTrail(trail, "start") + return backend.StartInstance(instance, startup_paused, trail) @staticmethod def perspective_migration_info(params): @@ -576,13 +621,13 @@ class NodeHttpServer(http.server.HttpServer): return backend.AcceptInstance(instance, info, target) @staticmethod - def perspective_finalize_migration(params): - """Finalize the instance migration. + def perspective_instance_finalize_migration_dst(params): + """Finalize the instance migration on the destination node. """ instance, info, success = params instance = objects.Instance.FromDict(instance) - return backend.FinalizeMigration(instance, info, success) + return backend.FinalizeMigrationDst(instance, info, success) @staticmethod def perspective_instance_migrate(params): @@ -594,6 +639,23 @@ class NodeHttpServer(http.server.HttpServer): return backend.MigrateInstance(instance, target, live) @staticmethod + def perspective_instance_finalize_migration_src(params): + """Finalize the instance migration on the source node. + + """ + instance, success, live = params + instance = objects.Instance.FromDict(instance) + return backend.FinalizeMigrationSource(instance, success, live) + + @staticmethod + def perspective_instance_get_migration_status(params): + """Reports migration status. + + """ + instance = objects.Instance.FromDict(params[0]) + return backend.GetMigrationStatus(instance).ToDict() + + @staticmethod def perspective_instance_reboot(params): """Reboot an instance. @@ -601,7 +663,19 @@ class NodeHttpServer(http.server.HttpServer): instance = objects.Instance.FromDict(params[0]) reboot_type = params[1] shutdown_timeout = params[2] - return backend.InstanceReboot(instance, reboot_type, shutdown_timeout) + trail = params[3] + _extendReasonTrail(trail, "reboot") + return backend.InstanceReboot(instance, reboot_type, shutdown_timeout, + trail) + + @staticmethod + def perspective_instance_balloon_memory(params): + """Modify instance runtime memory. + + """ + instance_dict, memory = params + instance = objects.Instance.FromDict(instance_dict) + return backend.InstanceBalloonMemory(instance, memory) @staticmethod def perspective_instance_info(params): @@ -635,14 +709,6 @@ class NodeHttpServer(http.server.HttpServer): # node -------------------------- @staticmethod - def perspective_node_tcp_ping(params): - """Do a TcpPing on the remote node. - - """ - return netutils.TcpPing(params[1], params[2], timeout=params[3], - live_port_needed=params[4], source=params[0]) - - @staticmethod def perspective_node_has_ip_address(params): """Checks if a node has the given ip address. @@ -654,8 +720,8 @@ class NodeHttpServer(http.server.HttpServer): """Query node information. """ - vgname, hypervisor_type = params - return backend.GetNodeInfo(vgname, hypervisor_type) + (storage_units, hv_names, excl_stor) = params + return backend.GetNodeInfo(storage_units, hv_names, excl_stor) @staticmethod def perspective_etc_hosts_modify(params): @@ -673,19 +739,51 @@ class NodeHttpServer(http.server.HttpServer): """ return backend.VerifyNode(params[0], params[1]) + @classmethod + def perspective_node_verify_light(cls, params): + """Run a light verify sequence on this node. + + """ + # So far it's the same as the normal node_verify + return cls.perspective_node_verify(params) + + @staticmethod + def perspective_node_start_master_daemons(params): + """Start the master daemons on this node. + + """ + return backend.StartMasterDaemons(params[0]) + @staticmethod - def perspective_node_start_master(params): - """Promote this node to master status. + def perspective_node_activate_master_ip(params): + """Activate the master IP on this node. """ - return backend.StartMaster(params[0], params[1]) + master_params = objects.MasterNetworkParameters.FromDict(params[0]) + return backend.ActivateMasterIp(master_params, params[1]) + + @staticmethod + def perspective_node_deactivate_master_ip(params): + """Deactivate the master IP on this node. + + """ + master_params = objects.MasterNetworkParameters.FromDict(params[0]) + return backend.DeactivateMasterIp(master_params, params[1]) @staticmethod def perspective_node_stop_master(params): - """Demote this node from master status. + """Stops master daemons on this node. + + """ + return backend.StopMasterDaemons() + + @staticmethod + def perspective_node_change_master_netmask(params): + """Change the master IP netmask. """ - return backend.StopMaster(params[0]) + return backend.ChangeMasterNetmask(params[0], params[1], params[2], + params[3]) @staticmethod def perspective_node_leave_cluster(params): @@ -708,7 +806,6 @@ class NodeHttpServer(http.server.HttpServer): """ return backend.DemoteFromMC() - @staticmethod def perspective_node_powercycle(params): """Tries to powercycle the nod. @@ -717,7 +814,6 @@ class NodeHttpServer(http.server.HttpServer): hypervisor_type = params[0] return backend.PowercycleNode(hypervisor_type) - # cluster -------------------------- @staticmethod @@ -735,7 +831,7 @@ class NodeHttpServer(http.server.HttpServer): files are accepted. """ - return backend.UploadFile(*params) + return backend.UploadFile(*(params[0])) @staticmethod def perspective_master_info(params): @@ -757,12 +853,36 @@ class NodeHttpServer(http.server.HttpServer): return result @staticmethod + def perspective_restricted_command(params): + """Runs a restricted command. + + """ + (cmd, ) = params + + return backend.RunRestrictedCmd(cmd) + + @staticmethod def perspective_write_ssconf_files(params): """Write ssconf files. """ (values,) = params - return backend.WriteSsconfFiles(values) + return ssconf.WriteSsconfFiles(values) + + @staticmethod + def perspective_get_watcher_pause(params): + """Get watcher pause end. + + """ + return utils.ReadWatcherPauseFile(pathutils.WATCHER_PAUSEFILE) + + @staticmethod + def perspective_set_watcher_pause(params): + """Set watcher pause. + + """ + (until, ) = params + return backend.SetWatcherPause(until) # os ----------------------- @@ -790,6 +910,15 @@ class NodeHttpServer(http.server.HttpServer): required, name, checks, params = params return backend.ValidateOS(required, name, checks, params) + # extstorage ----------------------- + + @staticmethod + def perspective_extstorage_diagnose(params): + """Query detailed information about existing extstorage providers. + + """ + return backend.DiagnoseExtStorage() + # hooks ----------------------- @staticmethod @@ -879,7 +1008,17 @@ class NodeHttpServer(http.server.HttpServer): """ # TODO: What if a file fails to rename? - return [backend.JobQueueRename(old, new) for old, new in params] + return [backend.JobQueueRename(old, new) for old, new in params[0]] + + @staticmethod + @_RequireJobQueueLock + def perspective_jobqueue_set_drain_flag(params): + """Set job queue's drain flag. + + """ + (flag, ) = params + + return jstore.SetDrainFlag(flag) # hypervisor --------------- @@ -916,14 +1055,14 @@ class NodeHttpServer(http.server.HttpServer): """Starts an import daemon. """ - (opts_s, instance, dest, dest_args) = params + (opts_s, instance, component, (dest, dest_args)) = params opts = objects.ImportExportOptions.FromDict(opts_s) return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts, None, None, objects.Instance.FromDict(instance), - dest, + component, dest, _DecodeImportExportIO(dest, dest_args)) @@ -932,14 +1071,14 @@ class NodeHttpServer(http.server.HttpServer): """Starts an export daemon. """ - (opts_s, host, port, instance, source, source_args) = params + (opts_s, host, port, instance, component, (source, source_args)) = params opts = objects.ImportExportOptions.FromDict(opts_s) return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts, host, port, objects.Instance.FromDict(instance), - source, + component, source, _DecodeImportExportIO(source, source_args)) @@ -973,6 +1112,13 @@ def CheckNoded(_, args): print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % sys.argv[0]) sys.exit(constants.EXIT_FAILURE) + try: + codecs.lookup("string-escape") + except LookupError: + print >> sys.stderr, ("Can't load the string-escape code which is part" + " of the Python installation. Is your installation" + " complete/correct? Aborting.") + sys.exit(constants.EXIT_FAILURE) def PrepNoded(options, _): @@ -1003,15 +1149,19 @@ def PrepNoded(options, _): # startup of the whole node daemon because of this logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) + handler = NodeRequestHandler() + mainloop = daemon.Mainloop() - server = NodeHttpServer(mainloop, options.bind_address, options.port, - ssl_params=ssl_params, ssl_verify_peer=True, - request_executor_class=request_executor_class) + server = \ + http.server.HttpServer(mainloop, options.bind_address, options.port, + handler, ssl_params=ssl_params, ssl_verify_peer=True, + request_executor_class=request_executor_class) server.Start() + return (mainloop, server) -def ExecNoded(options, args, prep_data): # pylint: disable-msg=W0613 +def ExecNoded(options, args, prep_data): # pylint: disable=W0613 """Main node daemon function, executed with the PID file held. """ @@ -1027,7 +1177,8 @@ def Main(): """ parser = OptionParser(description="Ganeti node daemon", - usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", + usage="%prog [-f] [-d] [-p port] [-b ADDRESS]\ + \ [-i INTERFACE]", version="%%prog (ganeti) %s" % constants.RELEASE_VERSION) parser.add_option("--no-mlock", dest="mlock", @@ -1035,6 +1186,6 @@ def Main(): default=True, action="store_false") daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded, - default_ssl_cert=constants.NODED_CERT_FILE, - default_ssl_key=constants.NODED_CERT_FILE, + default_ssl_cert=pathutils.NODED_CERT_FILE, + default_ssl_key=pathutils.NODED_CERT_FILE, console_logging=True)