#
#
-# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# C0103: Invalid name gnt-node
import itertools
+import errno
+import tempfile
from ganeti.cli import *
from ganeti import cli
from ganeti import constants
from ganeti import errors
from ganeti import netutils
+from ganeti import pathutils
+from ganeti import serializer
+from ganeti import ssh
from cStringIO import StringIO
+from ganeti import confd
+from ganeti.confd import client as confd_client
#: default list of field for L{ListNodes}
_LIST_DEF_FIELDS = [
errors.ECODE_INVAL)
-def _RunSetupSSH(options, nodes):
- """Wrapper around utils.RunCmd to call setup-ssh
+def _TryReadFile(path):
+ """Tries to read a file.
- @param options: The command line options
- @param nodes: The nodes to setup
+ If the file is not found, C{None} is returned.
+
+ @type path: string
+ @param path: Filename
+ @rtype: None or string
+ @todo: Consider adding a generic ENOENT wrapper
+
+ """
+ try:
+ return utils.ReadFile(path)
+ except EnvironmentError, err:
+ if err.errno == errno.ENOENT:
+ return None
+ else:
+ raise
+
+
+def _ReadSshKeys(keyfiles, _tostderr_fn=ToStderr):
+ """Reads SSH keys according to C{keyfiles}.
+
+ @type keyfiles: dict
+ @param keyfiles: Dictionary with keys of L{constants.SSHK_ALL} and two-values
+ tuples (private and public key file)
+ @rtype: list
+ @return: List of three-values tuples (L{constants.SSHK_ALL}, private and
+ public key as strings)
+
+ """
+ result = []
+
+ for (kind, (private_file, public_file)) in keyfiles.items():
+ private_key = _TryReadFile(private_file)
+ public_key = _TryReadFile(public_file)
+
+ if public_key and private_key:
+ result.append((kind, private_key, public_key))
+ elif public_key or private_key:
+ _tostderr_fn("Couldn't find a complete set of keys for kind '%s'; files"
+ " '%s' and '%s'", kind, private_file, public_file)
+
+ return result
+
+
+def _SetupSSH(options, cluster_name, node):
+ """Configures a destination node's SSH daemon.
+
+ @param options: Command line options
+ @type cluster_name
+ @param cluster_name: Cluster name
+ @type node: string
+ @param node: Destination node name
"""
- cmd = [constants.SETUP_SSH]
+ if options.force_join:
+ ToStderr("The \"--force-join\" option is no longer supported and will be"
+ " ignored.")
+
+ cmd = [pathutils.PREPARE_NODE_JOIN]
- # Pass --debug|--verbose to the external script if set on our invocation
- # --debug overrides --verbose
+ # Pass --debug/--verbose to the external script if set on our invocation
if options.debug:
cmd.append("--debug")
- elif options.verbose:
+
+ if options.verbose:
cmd.append("--verbose")
- if not options.ssh_key_check:
- cmd.append("--no-ssh-key-check")
- if options.force_join:
- cmd.append("--force-join")
- cmd.extend(nodes)
+ host_keys = _ReadSshKeys(constants.SSH_DAEMON_KEYFILES)
+
+ (_, root_keyfiles) = \
+ ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
+
+ root_keys = _ReadSshKeys(root_keyfiles)
+
+ (_, cert_pem) = \
+ utils.ExtractX509Certificate(utils.ReadFile(pathutils.NODED_CERT_FILE))
+
+ data = {
+ constants.SSHS_CLUSTER_NAME: cluster_name,
+ constants.SSHS_NODE_DAEMON_CERTIFICATE: cert_pem,
+ constants.SSHS_SSH_HOST_KEY: host_keys,
+ constants.SSHS_SSH_ROOT_KEY: root_keys,
+ }
+
+ srun = ssh.SshRunner(cluster_name)
+ scmd = srun.BuildCmd(node, constants.SSH_LOGIN_USER,
+ utils.ShellQuoteArgs(cmd),
+ batch=False, ask_key=options.ssh_key_check,
+ strict_host_check=options.ssh_key_check, quiet=False,
+ use_cluster_key=False)
+
+ tempfh = tempfile.TemporaryFile()
+ try:
+ tempfh.write(serializer.DumpJson(data))
+ tempfh.seek(0)
- result = utils.RunCmd(cmd, interactive=True)
+ result = utils.RunCmd(scmd, interactive=True, input_fd=tempfh)
+ finally:
+ tempfh.close()
if result.failed:
- errmsg = ("Command '%s' failed with exit code %s; output %r" %
- (result.cmd, result.exit_code, result.output))
- raise errors.OpExecError(errmsg)
+ raise errors.OpExecError("Command '%s' failed: %s" %
+ (result.cmd, result.fail_reason))
@UsesRPC
sip = opts.secondary_ip
# read the cluster name from the master
- output = cl.QueryConfigValues(["cluster_name"])
- cluster_name = output[0]
+ (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
if not readd and opts.node_setup:
ToStderr("-- WARNING -- \n"
"and grant full intra-cluster ssh root access to/from it\n", node)
if opts.node_setup:
- _RunSetupSSH(opts, [node])
+ _SetupSSH(opts, cluster_name, node)
bootstrap.SetupNodeDaemon(cluster_name, node, opts.ssh_key_check)
fmtoverride = dict.fromkeys(["pinst_list", "sinst_list", "tags"],
(",".join, False))
+ cl = GetClient(query=True)
+
return GenericList(constants.QR_NODE, selected_fields, args, opts.units,
opts.separator, not opts.no_headers,
format_override=fmtoverride, verbose=opts.verbose,
- force_filter=opts.force_filter)
+ force_filter=opts.force_filter, cl=cl)
def ListNodeFields(opts, args):
@return: the desired exit code
"""
+ cl = GetClient(query=True)
+
return GenericListFields(constants.QR_NODE, args, opts.separator,
- not opts.no_headers)
+ not opts.no_headers, cl=cl)
def EvacuateNode(opts, args):
cl = GetClient()
- result = cl.QueryNodes(names=args, fields=fields, use_locking=False)
+ qcl = GetClient(query=True)
+ result = qcl.QueryNodes(names=args, fields=fields, use_locking=False)
+ qcl.Close()
+
instances = set(itertools.chain(*itertools.chain(*itertools.chain(result))))
if not instances:
remote_node=opts.dst_node,
iallocator=opts.iallocator,
early_release=opts.early_release)
- result = SubmitOpCode(op, cl=cl, opts=opts)
+ result = SubmitOrSend(op, opts, cl=cl)
# Keep track of submitted jobs
jex = JobExecutor(cl=cl, opts=opts)
# these fields are static data anyway, so it doesn't matter, but
# locking=True should be safer
+ qcl = GetClient(query=True)
result = cl.QueryNodes(names=args, fields=selected_fields,
use_locking=False)
+ qcl.Close()
node, pinst = result[0]
if not pinst:
force = opts.force
selected_fields = ["name", "pinst_list"]
+ qcl = GetClient(query=True)
result = cl.QueryNodes(names=args, fields=selected_fields, use_locking=False)
+ qcl.Close()
((node, pinst), ) = result
if not pinst:
allow_runtime_changes=opts.allow_runtime_chgs,
ignore_ipolicy=opts.ignore_ipolicy)
- result = SubmitOpCode(op, cl=cl, opts=opts)
+ result = SubmitOrSend(op, opts, cl=cl)
# Keep track of submitted jobs
jex = JobExecutor(cl=cl, opts=opts)
@return: the desired exit code
"""
- cl = GetClient()
+ cl = GetClient(query=True)
result = cl.QueryNodes(fields=["name", "pip", "sip",
"pinst_list", "sinst_list",
"master_candidate", "drained", "offline",
return 2
op = opcodes.OpNodePowercycle(node_name=node, force=opts.force)
- result = SubmitOpCode(op, opts=opts)
+ result = SubmitOrSend(op, opts)
if result:
ToStderr(result)
return 0
storage_type=storage_type,
name=volume_name,
changes=changes)
- SubmitOpCode(op, opts=opts)
+ SubmitOrSend(op, opts)
else:
ToStderr("No changes to perform, exiting.")
storage_type=storage_type,
name=volume_name,
ignore_consistency=opts.ignore_consistency)
- SubmitOpCode(op, opts=opts)
+ SubmitOrSend(op, opts)
def SetNodeParams(opts, args):
return 0
+class ReplyStatus(object):
+ """Class holding a reply status for synchronous confd clients.
+
+ """
+ def __init__(self):
+ self.failure = True
+ self.answer = False
+
+
+def ListDrbd(opts, args):
+ """Modifies a node.
+
+ @param opts: the command line options selected by the user
+ @type args: list
+ @param args: should contain only one element, the node name
+ @rtype: int
+ @return: the desired exit code
+
+ """
+ if len(args) != 1:
+ ToStderr("Please give one (and only one) node.")
+ return constants.EXIT_FAILURE
+
+ if not constants.ENABLE_CONFD:
+ ToStderr("Error: this command requires confd support, but it has not"
+ " been enabled at build time.")
+ return constants.EXIT_FAILURE
+
+ status = ReplyStatus()
+
+ def ListDrbdConfdCallback(reply):
+ """Callback for confd queries"""
+ if reply.type == confd_client.UPCALL_REPLY:
+ answer = reply.server_reply.answer
+ reqtype = reply.orig_request.type
+ if reqtype == constants.CONFD_REQ_NODE_DRBD:
+ if reply.server_reply.status != constants.CONFD_REPL_STATUS_OK:
+ ToStderr("Query gave non-ok status '%s': %s" %
+ (reply.server_reply.status,
+ reply.server_reply.answer))
+ status.failure = True
+ return
+ if not confd.HTNodeDrbd(answer):
+ ToStderr("Invalid response from server: expected %s, got %s",
+ confd.HTNodeDrbd, answer)
+ status.failure = True
+ else:
+ status.failure = False
+ status.answer = answer
+ else:
+ ToStderr("Unexpected reply %s!?", reqtype)
+ status.failure = True
+
+ node = args[0]
+ hmac = utils.ReadFile(pathutils.CONFD_HMAC_KEY)
+ filter_callback = confd_client.ConfdFilterCallback(ListDrbdConfdCallback)
+ counting_callback = confd_client.ConfdCountingCallback(filter_callback)
+ cf_client = confd_client.ConfdClient(hmac, [constants.IP4_ADDRESS_LOCALHOST],
+ counting_callback)
+ req = confd_client.ConfdClientRequest(type=constants.CONFD_REQ_NODE_DRBD,
+ query=node)
+
+ def DoConfdRequestReply(req):
+ counting_callback.RegisterQuery(req.rsalt)
+ cf_client.SendRequest(req, async=False)
+ while not counting_callback.AllAnswered():
+ if not cf_client.ReceiveReply():
+ ToStderr("Did not receive all expected confd replies")
+ break
+
+ DoConfdRequestReply(req)
+
+ if status.failure:
+ return constants.EXIT_FAILURE
+
+ fields = ["node", "minor", "instance", "disk", "role", "peer"]
+ if opts.no_headers:
+ headers = None
+ else:
+ headers = {"node": "Node", "minor": "Minor", "instance": "Instance",
+ "disk": "Disk", "role": "Role", "peer": "PeerNode"}
+
+ data = GenerateTable(separator=opts.separator, headers=headers,
+ fields=fields, data=sorted(status.answer),
+ numfields=["minor"])
+ for line in data:
+ ToStdout(line)
+
+ return constants.EXIT_SUCCESS
+
commands = {
"add": (
AddNode, [ArgHost(min=1, max=1)],
"evacuate": (
EvacuateNode, ARGS_ONE_NODE,
[FORCE_OPT, IALLOCATOR_OPT, NEW_SECONDARY_OPT, EARLY_RELEASE_OPT,
- PRIORITY_OPT, PRIMARY_ONLY_OPT, SECONDARY_ONLY_OPT],
- "[-f] {-I <iallocator> | -n <dst>} <node>",
- "Relocate the secondary instances from a node"
- " to other nodes"),
+ PRIORITY_OPT, PRIMARY_ONLY_OPT, SECONDARY_ONLY_OPT, SUBMIT_OPT],
+ "[-f] {-I <iallocator> | -n <dst>} [-p | -s] [options...] <node>",
+ "Relocate the primary and/or secondary instances from a node"),
"failover": (
FailoverNode, ARGS_ONE_NODE, [FORCE_OPT, IGNORE_CONSIST_OPT,
IALLOCATOR_OPT, PRIORITY_OPT],
MigrateNode, ARGS_ONE_NODE,
[FORCE_OPT, NONLIVE_OPT, MIGRATION_MODE_OPT, DST_NODE_OPT,
IALLOCATOR_OPT, PRIORITY_OPT, IGNORE_IPOLICY_OPT,
- NORUNTIME_CHGS_OPT],
+ NORUNTIME_CHGS_OPT, SUBMIT_OPT],
"[-f] <node>",
"Migrate all the primary instance on a node away from it"
" (only for instances of type drbd)"),
"<node_name>", "Alters the parameters of a node"),
"powercycle": (
PowercycleNode, ARGS_ONE_NODE,
- [FORCE_OPT, CONFIRM_OPT, DRY_RUN_OPT, PRIORITY_OPT],
+ [FORCE_OPT, CONFIRM_OPT, DRY_RUN_OPT, PRIORITY_OPT, SUBMIT_OPT],
"<node_name>", "Tries to forcefully powercycle a node"),
"power": (
PowerNode,
[ArgNode(min=1, max=1),
ArgChoice(min=1, max=1, choices=_MODIFIABLE_STORAGE_TYPES),
ArgFile(min=1, max=1)],
- [ALLOCATABLE_OPT, DRY_RUN_OPT, PRIORITY_OPT],
+ [ALLOCATABLE_OPT, DRY_RUN_OPT, PRIORITY_OPT, SUBMIT_OPT],
"<node_name> <storage_type> <name>", "Modify storage volume on a node"),
"repair-storage": (
RepairStorage,
[ArgNode(min=1, max=1),
ArgChoice(min=1, max=1, choices=_REPAIRABLE_STORAGE_TYPES),
ArgFile(min=1, max=1)],
- [IGNORE_CONSIST_OPT, DRY_RUN_OPT, PRIORITY_OPT],
+ [IGNORE_CONSIST_OPT, DRY_RUN_OPT, PRIORITY_OPT, SUBMIT_OPT],
"<node_name> <storage_type> <name>",
"Repairs a storage volume on a node"),
"list-tags": (
ListTags, ARGS_ONE_NODE, [],
"<node_name>", "List the tags of the given node"),
"add-tags": (
- AddTags, [ArgNode(min=1, max=1), ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT],
+ AddTags, [ArgNode(min=1, max=1), ArgUnknown()],
+ [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
"<node_name> tag...", "Add tags to the given node"),
"remove-tags": (
RemoveTags, [ArgNode(min=1, max=1), ArgUnknown()],
- [TAG_SRC_OPT, PRIORITY_OPT],
+ [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
"<node_name> tag...", "Remove tags from the given node"),
"health": (
Health, ARGS_MANY_NODES,
- [NOHDR_OPT, SEP_OPT, SUBMIT_OPT, PRIORITY_OPT, OOB_TIMEOUT_OPT],
+ [NOHDR_OPT, SEP_OPT, PRIORITY_OPT, OOB_TIMEOUT_OPT],
"[<node_name>...]", "List health of node(s) using out-of-band"),
+ "list-drbd": (
+ ListDrbd, ARGS_ONE_NODE,
+ [NOHDR_OPT, SEP_OPT],
+ "[<node_name>]", "Query the list of used DRBD minors on the given node"),
+ }
+
+#: dictionary with aliases for commands
+aliases = {
+ "show": "info",
}
def Main():
- return GenericMain(commands, override={"tag_type": constants.TAG_NODE},
+ return GenericMain(commands, aliases=aliases,
+ override={"tag_type": constants.TAG_NODE},
env_override=_ENV_OVERRIDE)