#
#
-# Copyright (C) 2006, 2007, 2010 Google Inc.
+# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
"""
-# pylint: disable-msg=C0103
+# pylint: disable=C0103
# C0103: Invalid name ganeti-masterd
import grp
class ClientRequestWorker(workerpool.BaseWorker):
- # pylint: disable-msg=W0221
+ # pylint: disable=W0221
def RunTask(self, server, message, client):
"""Process the request.
client.send_message(reply)
# awake the main thread so that it can write out the data.
server.awaker.signal()
- except: # pylint: disable-msg=W0702
+ except: # pylint: disable=W0702
logging.exception("Send error")
client.close_log()
"""
_MAX_UNHANDLED = 1
+
def __init__(self, server, connected_socket, client_address, family):
daemon.AsyncTerminatedMessageStream.__init__(self, connected_socket,
client_address,
def __init__(self, server):
self.server = server
- def handle_request(self, method, args): # pylint: disable-msg=R0911
+ def handle_request(self, method, args): # pylint: disable=R0911
queue = self.server.context.jobqueue
# TODO: Parameter validation
+ if not isinstance(args, (tuple, list)):
+ logging.info("Received invalid arguments of type '%s'", type(args))
+ raise ValueError("Invalid arguments type '%s'" % type(args))
# TODO: Rewrite to not exit in each 'if/elif' branch
return queue.SubmitManyJobs(jobs)
elif method == luxi.REQ_CANCEL_JOB:
- job_id = args
+ (job_id, ) = args
logging.info("Received job cancel request for %s", job_id)
return queue.CancelJob(job_id)
elif method == luxi.REQ_ARCHIVE_JOB:
- job_id = args
+ (job_id, ) = args
logging.info("Received job archive request for %s", job_id)
return queue.ArchiveJob(job_id)
prev_log_serial, timeout)
elif method == luxi.REQ_QUERY:
- req = objects.QueryRequest.FromDict(args)
+ (what, fields, qfilter) = args
+ req = objects.QueryRequest(what=what, fields=fields, qfilter=qfilter)
- if req.what in constants.QR_OP_QUERY:
+ if req.what in constants.QR_VIA_OP:
result = self._Query(opcodes.OpQuery(what=req.what, fields=req.fields,
- filter=req.filter))
+ qfilter=req.qfilter))
elif req.what == constants.QR_LOCK:
- if req.filter is not None:
+ if req.qfilter is not None:
raise errors.OpPrereqError("Lock queries can't be filtered")
return self.server.context.glm.QueryLocks(req.fields)
- elif req.what in constants.QR_OP_LUXI:
+ elif req.what in constants.QR_VIA_LUXI:
raise NotImplementedError
else:
raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
return result
elif method == luxi.REQ_QUERY_FIELDS:
- req = objects.QueryFieldsRequest.FromDict(args)
+ (what, fields) = args
+ req = objects.QueryFieldsRequest(what=what, fields=fields)
- if req.what in constants.QR_OP_QUERY:
- result = self._Query(opcodes.OpQueryFields(what=req.what,
- fields=req.fields))
- elif req.what == constants.QR_LOCK:
- return query.QueryFields(query.LOCK_FIELDS, req.fields)
- elif req.what in constants.QR_OP_LUXI:
- raise NotImplementedError
- else:
+ try:
+ fielddefs = query.ALL_FIELDS[req.what]
+ except KeyError:
raise errors.OpPrereqError("Resource type '%s' unknown" % req.what,
errors.ECODE_INVAL)
- return result
+ return query.QueryFields(fielddefs, req.fields)
elif method == luxi.REQ_QUERY_JOBS:
(job_ids, fields) = args
if use_locking:
raise errors.OpPrereqError("Sync queries are not allowed",
errors.ECODE_INVAL)
- op = opcodes.OpQueryInstances(names=names, output_fields=fields,
- use_locking=use_locking)
+ op = opcodes.OpInstanceQuery(names=names, output_fields=fields,
+ use_locking=use_locking)
return self._Query(op)
elif method == luxi.REQ_QUERY_NODES:
if use_locking:
raise errors.OpPrereqError("Sync queries are not allowed",
errors.ECODE_INVAL)
- op = opcodes.OpQueryNodes(names=names, output_fields=fields,
- use_locking=use_locking)
+ op = opcodes.OpNodeQuery(names=names, output_fields=fields,
+ use_locking=use_locking)
return self._Query(op)
elif method == luxi.REQ_QUERY_GROUPS:
if use_locking:
raise errors.OpPrereqError("Sync queries are not allowed",
errors.ECODE_INVAL)
- op = opcodes.OpQueryGroups(names=names, output_fields=fields)
+ op = opcodes.OpGroupQuery(names=names, output_fields=fields)
return self._Query(op)
elif method == luxi.REQ_QUERY_EXPORTS:
- nodes, use_locking = args
+ (nodes, use_locking) = args
if use_locking:
raise errors.OpPrereqError("Sync queries are not allowed",
errors.ECODE_INVAL)
return self._Query(op)
elif method == luxi.REQ_QUERY_CONFIG_VALUES:
- fields = args
+ (fields, ) = args
logging.info("Received config values query request for %s", fields)
op = opcodes.OpClusterConfigQuery(output_fields=fields)
return self._Query(op)
return self._Query(op)
elif method == luxi.REQ_QUERY_TAGS:
- kind, name = args
+ (kind, name) = args
logging.info("Received tags query request")
- op = opcodes.OpGetTags(kind=kind, name=name)
+ op = opcodes.OpTagsGet(kind=kind, name=name)
return self._Query(op)
elif method == luxi.REQ_QUERY_LOCKS:
return self.server.context.glm.OldStyleQueryLocks(fields)
elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
- drain_flag = args
+ (drain_flag, ) = args
logging.info("Received queue drain flag change request to %s",
drain_flag)
return queue.SetDrainFlag(drain_flag)
This class creates and holds common objects shared by all threads.
"""
- # pylint: disable-msg=W0212
+ # pylint: disable=W0212
# we do want to ensure a singleton here
_instance = None
# Job queue
self.jobqueue = jqueue.JobQueue(self)
+ # RPC runner
+ self.rpc = rpc.RpcRunner(self)
+
# setting this also locks the class against attribute modifications
self.__class__._instance = self
# Add the new node to the Ganeti Lock Manager
self.glm.add(locking.LEVEL_NODE, node.name)
+ self.glm.add(locking.LEVEL_NODE_RES, node.name)
def ReaddNode(self, node):
"""Updates a node that's already in the configuration
# Remove the node from the Ganeti Lock Manager
self.glm.remove(locking.LEVEL_NODE, name)
+ self.glm.remove(locking.LEVEL_NODE_RES, name)
def _SetWatcherPause(until):
@rpc.RunWithRPC
def ActivateMasterIP():
# activate ip
- master_node = ssconf.SimpleStore().GetMasterNode()
- result = rpc.RpcRunner.call_node_start_master(master_node, False, False)
+ cfg = config.ConfigWriter()
+ (master, ip, dev, netmask, family) = cfg.GetMasterNetworkParameters()
+ runner = rpc.BootstrapRunner()
+ result = runner.call_node_activate_master_ip(master, ip, netmask, dev, family)
+
msg = result.fail_msg
if msg:
logging.error("Can't activate master IP address: %s", msg)
# If CheckMaster didn't fail we believe we are the master, but we have to
# confirm with the other nodes.
if options.no_voting:
- if options.yes_do_it:
- return
+ if not options.yes_do_it:
+ sys.stdout.write("The 'no voting' option has been selected.\n")
+ sys.stdout.write("This is dangerous, please confirm by"
+ " typing uppercase 'yes': ")
+ sys.stdout.flush()
- sys.stdout.write("The 'no voting' option has been selected.\n")
- sys.stdout.write("This is dangerous, please confirm by"
- " typing uppercase 'yes': ")
- sys.stdout.flush()
+ confirmation = sys.stdin.readline().strip()
+ if confirmation != "YES":
+ print >> sys.stderr, "Aborting."
+ sys.exit(constants.EXIT_FAILURE)
- confirmation = sys.stdin.readline().strip()
- if confirmation != "YES":
- print >> sys.stderr, "Aborting."
+ else:
+ # CheckAgreement uses RPC and threads, hence it needs to be run in
+ # a separate process before we call utils.Daemonize in the current
+ # process.
+ if not utils.RunInSeparateProcess(CheckAgreement):
sys.exit(constants.EXIT_FAILURE)
- return
-
- # CheckAgreement uses RPC and threads, hence it needs to be run in a separate
- # process before we call utils.Daemonize in the current process.
- if not utils.RunInSeparateProcess(CheckAgreement):
- sys.exit(constants.EXIT_FAILURE)
-
# ActivateMasterIP also uses RPC/threads, so we run it again via a
# separate process.
return (mainloop, master)
-def ExecMasterd(options, args, prep_data): # pylint: disable-msg=W0613
+def ExecMasterd(options, args, prep_data): # pylint: disable=W0613
"""Main master daemon function, executed with the PID file held.
"""