#
#
-# Copyright (C) 2006, 2007, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
# C0103: Invalid name ganeti-masterd
import grp
class ClientRequestWorker(workerpool.BaseWorker):
- # pylint: disable-msg=W0221
+ # pylint: disable=W0221
def RunTask(self, server, message, client):
"""Process the request.
client.send_message(reply)
# awake the main thread so that it can write out the data.
server.awaker.signal()
- except: # pylint: disable-msg=W0702
+ except: # pylint: disable=W0702
logging.exception("Send error")
client.close_log()
"""
_MAX_UNHANDLED = 1
+
def __init__(self, server, connected_socket, client_address, family):
daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
client_address,
self.server.request_workers.AddTask((self.server, message, self))
+class _MasterShutdownCheck:
+ """Logic for master daemon shutdown.
+
+ """
+ #: How long to wait between checks
+ _CHECK_INTERVAL = 5.0
+
+ #: How long to wait after all jobs are done (e.g. to give clients time to
+ #: retrieve the job status)
+ _SHUTDOWN_LINGER = 5.0
+
+ def __init__(self):
+ """Initializes this class.
+
+ """
+ self._had_active_jobs = None
+ self._linger_timeout = None
+
+ def __call__(self, jq_prepare_result):
+ """Determines if master daemon is ready for shutdown.
+
+ @param jq_prepare_result: Result of L{jqueue.JobQueue.PrepareShutdown}
+ @rtype: None or number
+ @return: None if master daemon is ready, timeout if the check must be
+ repeated
+
+ """
+ if jq_prepare_result:
+ # Check again shortly
+ logging.info("Job queue has been notified for shutdown but is still"
+ " busy; next check in %s seconds", self._CHECK_INTERVAL)
+ self._had_active_jobs = True
+ return self._CHECK_INTERVAL
+
+ if not self._had_active_jobs:
+ # Can shut down as there were no active jobs on the first check
+ return None
+
+ # No jobs are running anymore, but maybe some clients want to collect some
+ # information. Give them a short amount of time.
+ if self._linger_timeout is None:
+ self._linger_timeout = utils.RunningTimeout(self._SHUTDOWN_LINGER, True)
+
+ remaining = self._linger_timeout.Remaining()
+
+ logging.info("Job queue no longer busy; shutting down master daemon"
+ " in %s seconds", remaining)
+
+ # TODO: Should the master daemon socket be closed at this point? Doing so
+ # wouldn't affect existing connections.
+
+ if remaining < 0:
+ return None
+ else:
+ return remaining
+
+
class MasterServer(daemon.AsyncStreamServer):
"""Master Server.
"""
family = socket.AF_UNIX
- def __init__(self, mainloop, address, uid, gid):
+ def __init__(self, address, uid, gid):
"""MasterServer constructor
- @type mainloop: ganeti.daemon.Mainloop
- @param mainloop: Mainloop used to poll for I/O events
@param address: the unix socket address to bind the MasterServer to
@param uid: The uid of the owner of the socket
@param gid: The gid of the owner of the socket
os.chown(temp_name, uid, gid)
os.rename(temp_name, address)
- self.mainloop = mainloop
self.awaker = daemon.AsyncAwaker()
# We'll only start threads once we've forked.
self.context = None
self.request_workers = None
+ self._shutdown_check = None
+
def handle_connection(self, connected_socket, client_address):
# TODO: add connection count and limit the number of open connections to a
# maximum number to avoid breaking for lack of file descriptors or memory.
CLIENT_REQUEST_WORKERS,
ClientRequestWorker)
+ def WaitForShutdown(self):
+ """Prepares server for shutdown.
+
+ """
+ if self._shutdown_check is None:
+ self._shutdown_check = _MasterShutdownCheck()
+
+ return self._shutdown_check(self.context.jobqueue.PrepareShutdown())
+
def server_cleanup(self):
"""Cleanup the server.
def __init__(self, server):
self.server = server
- def handle_request(self, method, args): # pylint: disable-msg=R0911
+ def handle_request(self, method, args): # pylint: disable=R0911
queue = self.server.context.jobqueue
# TODO: Parameter validation
+ if not isinstance(args, (tuple, list)):
+ logging.info("Received invalid arguments of type '%s'", type(args))
+ raise ValueError("Invalid arguments type '%s'" % type(args))
# TODO: Rewrite to not exit in each 'if/elif' branch
return queue.SubmitManyJobs(jobs)
elif method == luxi.REQ_CANCEL_JOB:
- job_id = args
+ (job_id, ) = args
logging.info("Received job cancel request for %s", job_id)
return queue.CancelJob(job_id)
elif method == luxi.REQ_ARCHIVE_JOB:
- job_id = args
+ (job_id, ) = args
logging.info("Received job archive request for %s", job_id)
return queue.ArchiveJob(job_id)
prev_log_serial, timeout)
elif method == luxi.REQ_QUERY:
- req = objects.QueryRequest.FromDict(args)
+ (what, fields, qfilter) = args
+ req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
- if req.what in constants.QR_OP_QUERY:
+ if req.what in constants.QR_VIA_OP:
result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
- filter=req.filter))
+ qfilter=req.qfilter))
elif req.what == constants.QR_LOCK:
- if req.filter is not None:
+ if req.qfilter is not None:
raise errors.OpPrereqError("Lock queries can't be filtered")
return self.server.context.glm.QueryLocks(req.fields)
- elif req.what in constants.QR_OP_LUXI:
+ elif req.what in constants.QR_VIA_LUXI:
raise NotImplementedError
else:
raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
return result
elif method == luxi.REQ_QUERY_FIELDS:
- req = objects.QueryFieldsRequest.FromDict(args)
+ (what, fields) = args
+ req = objects.QueryFieldsRequest(what=what, fields=fields)
- if req.what in constants.QR_OP_QUERY:
- result = self._Query(opcodes.OpQueryFields(what=req.what,
- fields=req.fields))
- elif req.what == constants.QR_LOCK:
- return query.QueryFields(query.LOCK_FIELDS, req.fields)
- elif req.what in constants.QR_OP_LUXI:
- raise NotImplementedError
- else:
+ try:
+ fielddefs = query.ALL_FIELDS[req.what]
+ except KeyError:
raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
errors.ECODE_INVAL)
- return result
+ return query.QueryFields(fielddefs, req.fields)
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
if use_locking:
raise errors.OpPrereqError("Sync queries are not allowed",
errors.ECODE_INVAL)
- op = opcodes.OpQueryInstances(names=names, output_fields=fields,
- use_locking=use_locking)
+ op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
+ use_locking=use_locking)
return self._Query(op)
elif method == luxi.REQ_QUERY_NODES:
if use_locking:
raise errors.OpPrereqError("Sync queries are not allowed",
errors.ECODE_INVAL)
- op = opcodes.OpQueryNodes(names=names, output_fields=fields,
- use_locking=use_locking)
+ op = opcodes.OpNodeQuery(names=names, output_fields=fields,
+ use_locking=use_locking)
return self._Query(op)
elif method == luxi.REQ_QUERY_GROUPS:
return self._Query(op)
elif method == luxi.REQ_QUERY_EXPORTS:
- nodes, use_locking = args
+ (nodes, use_locking) = args
if use_locking:
raise errors.OpPrereqError("Sync queries are not allowed",
errors.ECODE_INVAL)
return self._Query(op)
elif method == luxi.REQ_QUERY_CONFIG_VALUES:
- fields = args
+ (fields, ) = args
logging.info("Received config values query request for %s", fields)
op = opcodes.OpClusterConfigQuery(output_fields=fields)
return self._Query(op)
return self._Query(op)
elif method == luxi.REQ_QUERY_TAGS:
- kind, name = args
+ (kind, name) = args
logging.info("Received tags query request")
- op = opcodes.OpGetTags(kind=kind, name=name)
+ op = opcodes.OpTagsGet(kind=kind, name=name)
return self._Query(op)
elif method == luxi.REQ_QUERY_LOCKS:
return self.server.context.glm.OldStyleQueryLocks(fields)
elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
- drain_flag = args
+ (drain_flag, ) = args
logging.info("Received queue drain flag change request to %s",
drain_flag)
return queue.SetDrainFlag(drain_flag)
This class creates and holds common objects shared by all threads.
"""
- # pylint: disable-msg=W0212
+ # pylint: disable=W0212
# we do want to ensure a singleton here
_instance = None
self.cfg.GetNodeGroupList(),
self.cfg.GetInstanceList())
+ self.cfg.SetContext(self)
+
+ # RPC runner
+ self.rpc = rpc.RpcRunner(self.cfg, self.glm.AddToLockMonitor)
+
# Job queue
self.jobqueue = jqueue.JobQueue(self)
# Add the new node to the Ganeti Lock Manager
self.glm.add(locking.LEVEL_NODE, node.name)
+ self.glm.add(locking.LEVEL_NODE_RES, node.name)
def ReaddNode(self, node):
"""Updates a node that's already in the configuration
# Remove the node from the Ganeti Lock Manager
self.glm.remove(locking.LEVEL_NODE, name)
+ self.glm.remove(locking.LEVEL_NODE_RES, name)
def _SetWatcherPause(until):
@rpc.RunWithRPC
def ActivateMasterIP():
# activate ip
- master_node = ssconf.SimpleStore().GetMasterNode()
- result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
+ cfg = config.ConfigWriter()
+ master_params = cfg.GetMasterNetworkParameters()
+ ems = cfg.GetUseExternalMipScript()
+ runner = rpc.BootstrapRunner()
+ result = runner.call_node_activate_master_ip(master_params.name,
+ master_params, ems)
+
msg = result.fail_msg
if msg:
logging.error("Can't activate master IP address: %s", msg)
# If CheckMaster didn't fail we believe we are the master, but we have to
# confirm with the other nodes.
if options.no_voting:
- if options.yes_do_it:
- return
+ if not options.yes_do_it:
+ sys.stdout.write("The 'no voting' option has been selected.\n")
+ sys.stdout.write("This is dangerous, please confirm by"
+ " typing uppercase 'yes': ")
+ sys.stdout.flush()
- sys.stdout.write("The 'no voting' option has been selected.\n")
- sys.stdout.write("This is dangerous, please confirm by"
- " typing uppercase 'yes': ")
- sys.stdout.flush()
+ confirmation = sys.stdin.readline().strip()
+ if confirmation != "YES":
+ print >> sys.stderr, "Aborting."
+ sys.exit(constants.EXIT_FAILURE)
- confirmation = sys.stdin.readline().strip()
- if confirmation != "YES":
- print >> sys.stderr, "Aborting."
+ else:
+ # CheckAgreement uses RPC and threads, hence it needs to be run in
+ # a separate process before we call utils.Daemonize in the current
+ # process.
+ if not utils.RunInSeparateProcess(CheckAgreement):
sys.exit(constants.EXIT_FAILURE)
- return
-
- # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
- # process before we call utils.Daemonize in the current process.
- if not utils.RunInSeparateProcess(CheckAgreement):
- sys.exit(constants.EXIT_FAILURE)
-
# ActivateMasterIP also uses RPC/threads, so we run it again via a
# separate process.
utils.RemoveFile(constants.MASTER_SOCKET)
mainloop = daemon.Mainloop()
- master = MasterServer(mainloop, constants.MASTER_SOCKET,
- options.uid, options.gid)
+ master = MasterServer(constants.MASTER_SOCKET, options.uid, options.gid)
return (mainloop, master)
-def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
+def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
"""Main master daemon function, executed with the PID file held.
"""
try:
master.setup_queue()
try:
- mainloop.Run()
+ mainloop.Run(shutdown_wait_fn=master.WaitForShutdown)
finally:
master.server_cleanup()
finally:
finally:
utils.RemoveFile(constants.MASTER_SOCKET)
+ logging.info("Clean master daemon shutdown")
+
def Main():
"""Main function"""