X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/6263189c05b8692cf5944fea8af3b0e66305c56f..ee844e2001c61fc404e004b8f8f4e4968ea7f9ad:/daemons/ganeti-noded?ds=sidebyside diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index 7582597..30087f1 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -21,12 +21,16 @@ """Ganeti node daemon""" -# functions in this module need to have a given name structure, so: -# pylint: disable-msg=C0103 +# pylint: disable-msg=C0103,W0142 + +# C0103: Functions in this module need to have a given name structure, +# and the name of the daemon doesn't match + +# W0142: Used * or ** magic, since we do use it extensively in this +# module import os import sys -import SocketServer import logging import signal @@ -42,12 +46,31 @@ from ganeti import http from ganeti import utils from ganeti import storage -import ganeti.http.server +import ganeti.http.server # pylint: disable-msg=W0611 queue_lock = None +def _PrepareQueueLock(): + """Try to prepare the queue lock. + + @return: None for success, otherwise an exception object + + """ + global queue_lock # pylint: disable-msg=W0603 + + if queue_lock is not None: + return None + + # Prepare job queue + try: + queue_lock = jstore.InitAndVerifyQueue(must_lock=False) + return None + except EnvironmentError, err: + return err + + def _RequireJobQueueLock(fn): """Decorator for job queue manipulating functions. @@ -57,6 +80,9 @@ def _RequireJobQueueLock(fn): def wrapper(*args, **kwargs): # Locking in exclusive, blocking mode because there could be several # children running at the same time. Waiting up to 10 seconds. + if _PrepareQueueLock() is not None: + raise errors.JobQueueError("Job queue failed initialization," + " cannot update jobs") queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) try: return fn(*args, **kwargs) @@ -72,6 +98,9 @@ class NodeHttpServer(http.server.HttpServer): This class holds all methods exposed over the RPC interface. """ + # too many public methods, and unused args - all methods get params + # due to the API + # pylint: disable-msg=R0904,W0613 def __init__(self, *args, **kwargs): http.server.HttpServer.__init__(self, *args, **kwargs) self.noded_pid = os.getpid() @@ -316,8 +345,9 @@ class NodeHttpServer(http.server.HttpServer): instance = objects.Instance.FromDict(params[2]) cluster_name = params[3] dev_idx = params[4] + debug = params[5] return backend.ExportSnapshot(disk, dest_node, instance, - cluster_name, dev_idx) + cluster_name, dev_idx, debug) @staticmethod def perspective_finalize_export(params): @@ -422,26 +452,27 @@ class NodeHttpServer(http.server.HttpServer): inst_s = params[0] inst = objects.Instance.FromDict(inst_s) reinstall = params[1] - return backend.InstanceOsAdd(inst, reinstall) + debug = params[2] + return backend.InstanceOsAdd(inst, reinstall, debug) @staticmethod def perspective_instance_run_rename(params): """Runs the OS rename script for an instance. """ - inst_s, old_name = params + inst_s, old_name, debug = params inst = objects.Instance.FromDict(inst_s) - return backend.RunRenameInstance(inst, old_name) + return backend.RunRenameInstance(inst, old_name, debug) @staticmethod def perspective_instance_os_import(params): """Run the import function of an OS onto a given instance. """ - inst_s, src_node, src_images, cluster_name = params + inst_s, src_node, src_images, cluster_name, debug = params inst = objects.Instance.FromDict(inst_s) return backend.ImportOSIntoInstance(inst, src_node, src_images, - cluster_name) + cluster_name, debug) @staticmethod def perspective_instance_shutdown(params): @@ -502,7 +533,8 @@ class NodeHttpServer(http.server.HttpServer): """ instance = objects.Instance.FromDict(params[0]) reboot_type = params[1] - return backend.InstanceReboot(instance, reboot_type) + shutdown_timeout = params[2] + return backend.InstanceReboot(instance, reboot_type, shutdown_timeout) @staticmethod def perspective_instance_info(params): @@ -592,7 +624,7 @@ class NodeHttpServer(http.server.HttpServer): """Cleanup after leaving a cluster. """ - return backend.LeaveCluster() + return backend.LeaveCluster(params[0]) @staticmethod def perspective_node_volumes(params): @@ -781,12 +813,20 @@ class NodeHttpServer(http.server.HttpServer): return backend.ValidateHVParams(hvname, hvparams) -def ExecNoded(options, args): - """Main node daemon function, executed with the PID file held. +def CheckNoded(_, args): + """Initial checks whether to run or exit with a failure. """ - global queue_lock + if args: # noded doesn't take any arguments + print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % + sys.argv[0]) + sys.exit(constants.EXIT_FAILURE) + +def ExecNoded(options, _): + """Main node daemon function, executed with the PID file held. + + """ # Read SSL certificate if options.ssl: ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, @@ -794,8 +834,12 @@ def ExecNoded(options, args): else: ssl_params = None - # Prepare job queue - queue_lock = jstore.InitAndVerifyQueue(must_lock=False) + err = _PrepareQueueLock() + if err is not None: + # this might be some kind of file-system/permission error; while + # this breaks the job queue functionality, we shouldn't prevent + # startup of the whole node daemon because of this + logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) mainloop = daemon.Mainloop() server = NodeHttpServer(mainloop, options.bind_address, options.port, @@ -818,7 +862,7 @@ def main(): dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] dirs.append((constants.LOG_OS_DIR, 0750)) dirs.append((constants.LOCK_DIR, 1777)) - daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNoded) + daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded) if __name__ == '__main__':