4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module dealing with command line parsing"""
30 from cStringIO import StringIO
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
41 from optparse import (OptionParser, TitledHelpFormatter,
42 Option, OptionValueError)
46 # Command line options
66 "FILESTORE_DRIVER_OPT",
75 "IGNORE_FAILURES_OPT",
76 "IGNORE_SECONDARIES_OPT",
82 "NEW_CLUSTER_CERT_OPT",
83 "NEW_CONFD_HMAC_KEY_OPT",
94 "NOMODIFY_ETCHOSTS_OPT",
95 "NOMODIFY_SSH_SETUP_OPT",
101 "NOSSH_KEYCHECK_OPT",
116 "SHUTDOWN_TIMEOUT_OPT",
130 # Generic functions for CLI programs
132 "GenericInstanceCreate",
136 "JobSubmittedException",
138 "RunWhileClusterStopped",
142 # Formatting functions
143 "ToStderr", "ToStdout",
152 # command line options support infrastructure
153 "ARGS_MANY_INSTANCES",
169 "OPT_COMPL_INST_ADD_NODES",
170 "OPT_COMPL_MANY_NODES",
171 "OPT_COMPL_ONE_IALLOCATOR",
172 "OPT_COMPL_ONE_INSTANCE",
173 "OPT_COMPL_ONE_NODE",
185 def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
190 return ("<%s min=%s max=%s>" %
191 (self.__class__.__name__, self.min, self.max))
194 class ArgSuggest(_Argument):
195 """Suggesting argument.
197 Value can be any of the ones passed to the constructor.
200 # pylint: disable-msg=W0622
201 def __init__(self, min=0, max=None, choices=None):
202 _Argument.__init__(self, min=min, max=max)
203 self.choices = choices
206 return ("<%s min=%s max=%s choices=%r>" %
207 (self.__class__.__name__, self.min, self.max, self.choices))
210 class ArgChoice(ArgSuggest):
213 Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
214 but value must be one of the choices.
219 class ArgUnknown(_Argument):
220 """Unknown argument to program (e.g. determined at runtime).
225 class ArgInstance(_Argument):
226 """Instances argument.
231 class ArgNode(_Argument):
236 class ArgJobId(_Argument):
242 class ArgFile(_Argument):
243 """File path argument.
248 class ArgCommand(_Argument):
254 class ArgHost(_Argument):
260 class ArgOs(_Argument):
267 ARGS_MANY_INSTANCES = [ArgInstance()]
268 ARGS_MANY_NODES = [ArgNode()]
269 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
270 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
271 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
274 def _ExtractTagsObject(opts, args):
275 """Extract the tag type object.
277 Note that this function will modify its args parameter.
280 if not hasattr(opts, "tag_type"):
281 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
283 if kind == constants.TAG_CLUSTER:
285 elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
287 raise errors.OpPrereqError("no arguments passed to the command")
291 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
295 def _ExtendTags(opts, args):
296 """Extend the args if a source file has been given.
298 This function will extend the tags with the contents of the file
299 passed in the 'tags_source' attribute of the opts parameter. A file
300 named '-' will be replaced by stdin.
303 fname = opts.tags_source
309 new_fh = open(fname, "r")
312 # we don't use the nice 'new_data = [line.strip() for line in fh]'
313 # because of python bug 1633941
315 line = new_fh.readline()
318 new_data.append(line.strip())
321 args.extend(new_data)
324 def ListTags(opts, args):
325 """List the tags on a given object.
327 This is a generic implementation that knows how to deal with all
328 three cases of tag objects (cluster, node, instance). The opts
329 argument is expected to contain a tag_type field denoting what
330 object type we work on.
333 kind, name = _ExtractTagsObject(opts, args)
335 result = cl.QueryTags(kind, name)
336 result = list(result)
342 def AddTags(opts, args):
343 """Add tags on a given object.
345 This is a generic implementation that knows how to deal with all
346 three cases of tag objects (cluster, node, instance). The opts
347 argument is expected to contain a tag_type field denoting what
348 object type we work on.
351 kind, name = _ExtractTagsObject(opts, args)
352 _ExtendTags(opts, args)
354 raise errors.OpPrereqError("No tags to be added")
355 op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
359 def RemoveTags(opts, args):
360 """Remove tags from a given object.
362 This is a generic implementation that knows how to deal with all
363 three cases of tag objects (cluster, node, instance). The opts
364 argument is expected to contain a tag_type field denoting what
365 object type we work on.
368 kind, name = _ExtractTagsObject(opts, args)
369 _ExtendTags(opts, args)
371 raise errors.OpPrereqError("No tags to be removed")
372 op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
376 def check_unit(option, opt, value): # pylint: disable-msg=W0613
377 """OptParsers custom converter for units.
381 return utils.ParseUnit(value)
382 except errors.UnitParseError, err:
383 raise OptionValueError("option %s: %s" % (opt, err))
386 def _SplitKeyVal(opt, data):
387 """Convert a KeyVal string into a dict.
389 This function will convert a key=val[,...] string into a dict. Empty
390 values will be converted specially: keys which have the prefix 'no_'
391 will have the value=False and the prefix stripped, the others will
395 @param opt: a string holding the option name for which we process the
396 data, used in building error messages
398 @param data: a string of the format key=val,key=val,...
400 @return: {key=val, key=val}
401 @raises errors.ParameterError: if there are duplicate keys
406 for elem in utils.UnescapeAndSplit(data, sep=","):
408 key, val = elem.split("=", 1)
410 if elem.startswith(NO_PREFIX):
411 key, val = elem[len(NO_PREFIX):], False
412 elif elem.startswith(UN_PREFIX):
413 key, val = elem[len(UN_PREFIX):], None
415 key, val = elem, True
417 raise errors.ParameterError("Duplicate key '%s' in option %s" %
423 def check_ident_key_val(option, opt, value): # pylint: disable-msg=W0613
424 """Custom parser for ident:key=val,key=val options.
426 This will store the parsed values as a tuple (ident, {key: val}). As such,
427 multiple uses of this option via action=append is possible.
431 ident, rest = value, ''
433 ident, rest = value.split(":", 1)
435 if ident.startswith(NO_PREFIX):
437 msg = "Cannot pass options when removing parameter groups: %s" % value
438 raise errors.ParameterError(msg)
439 retval = (ident[len(NO_PREFIX):], False)
440 elif ident.startswith(UN_PREFIX):
442 msg = "Cannot pass options when removing parameter groups: %s" % value
443 raise errors.ParameterError(msg)
444 retval = (ident[len(UN_PREFIX):], None)
446 kv_dict = _SplitKeyVal(opt, rest)
447 retval = (ident, kv_dict)
451 def check_key_val(option, opt, value): # pylint: disable-msg=W0613
452 """Custom parser class for key=val,key=val options.
454 This will store the parsed values as a dict {key: val}.
457 return _SplitKeyVal(opt, value)
460 # completion_suggestion is normally a list. Using numeric values not evaluating
461 # to False for dynamic completion.
462 (OPT_COMPL_MANY_NODES,
464 OPT_COMPL_ONE_INSTANCE,
466 OPT_COMPL_ONE_IALLOCATOR,
467 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
469 OPT_COMPL_ALL = frozenset([
470 OPT_COMPL_MANY_NODES,
472 OPT_COMPL_ONE_INSTANCE,
474 OPT_COMPL_ONE_IALLOCATOR,
475 OPT_COMPL_INST_ADD_NODES,
479 class CliOption(Option):
480 """Custom option class for optparse.
483 ATTRS = Option.ATTRS + [
484 "completion_suggest",
486 TYPES = Option.TYPES + (
491 TYPE_CHECKER = Option.TYPE_CHECKER.copy()
492 TYPE_CHECKER["identkeyval"] = check_ident_key_val
493 TYPE_CHECKER["keyval"] = check_key_val
494 TYPE_CHECKER["unit"] = check_unit
497 # optparse.py sets make_option, so we do it for our own option class, too
498 cli_option = CliOption
501 _YESNO = ("yes", "no")
504 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
505 help="Increase debugging level")
507 NOHDR_OPT = cli_option("--no-headers", default=False,
508 action="store_true", dest="no_headers",
509 help="Don't display column headers")
511 SEP_OPT = cli_option("--separator", default=None,
512 action="store", dest="separator",
513 help=("Separator between output fields"
514 " (defaults to one space)"))
516 USEUNITS_OPT = cli_option("--units", default=None,
517 dest="units", choices=('h', 'm', 'g', 't'),
518 help="Specify units for output (one of hmgt)")
520 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
521 type="string", metavar="FIELDS",
522 help="Comma separated list of output fields")
524 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
525 default=False, help="Force the operation")
527 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
528 default=False, help="Do not require confirmation")
530 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
531 default=None, help="File with tag names")
533 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
534 default=False, action="store_true",
535 help=("Submit the job and return the job ID, but"
536 " don't wait for the job to finish"))
538 SYNC_OPT = cli_option("--sync", dest="do_locking",
539 default=False, action="store_true",
540 help=("Grab locks while doing the queries"
541 " in order to ensure more consistent results"))
543 _DRY_RUN_OPT = cli_option("--dry-run", default=False,
545 help=("Do not execute the operation, just run the"
546 " check steps and verify it it could be"
549 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
551 help="Increase the verbosity of the operation")
553 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
554 action="store_true", dest="simulate_errors",
555 help="Debugging option that makes the operation"
556 " treat most runtime checks as failed")
558 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
559 default=True, action="store_false",
560 help="Don't wait for sync (DANGEROUS!)")
562 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
563 help="Custom disk setup (diskless, file,"
565 default=None, metavar="TEMPL",
566 choices=list(constants.DISK_TEMPLATES))
568 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
569 help="Do not create any network cards for"
572 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
573 help="Relative path under default cluster-wide"
574 " file storage dir to store file-based disks",
575 default=None, metavar="<DIR>")
577 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
578 help="Driver to use for image files",
579 default="loop", metavar="<DRIVER>",
580 choices=list(constants.FILE_DRIVER))
582 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
583 help="Select nodes for the instance automatically"
584 " using the <NAME> iallocator plugin",
585 default=None, type="string",
586 completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
588 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
590 completion_suggest=OPT_COMPL_ONE_OS)
592 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
593 action="store_true", default=False,
594 help="Force an unknown variant")
596 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
597 action="store_true", default=False,
598 help="Do not install the OS (will"
601 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
602 type="keyval", default={},
603 help="Backend parameters")
605 HVOPTS_OPT = cli_option("-H", "--hypervisor-parameters", type="keyval",
606 default={}, dest="hvparams",
607 help="Hypervisor parameters")
609 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
610 help="Hypervisor and hypervisor options, in the"
611 " format hypervisor:option=value,option=value,...",
612 default=None, type="identkeyval")
614 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
615 help="Hypervisor and hypervisor options, in the"
616 " format hypervisor:option=value,option=value,...",
617 default=[], action="append", type="identkeyval")
619 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
620 action="store_false",
621 help="Don't check that the instance's IP"
624 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
625 default=True, action="store_false",
626 help="Don't check that the instance's name"
629 NET_OPT = cli_option("--net",
630 help="NIC parameters", default=[],
631 dest="nics", action="append", type="identkeyval")
633 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
634 dest="disks", action="append", type="identkeyval")
636 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
637 help="Comma-separated list of disks"
638 " indices to act on (e.g. 0,2) (optional,"
639 " defaults to all disks)")
641 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
642 help="Enforces a single-disk configuration using the"
643 " given disk size, in MiB unless a suffix is used",
644 default=None, type="unit", metavar="<size>")
646 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
647 dest="ignore_consistency",
648 action="store_true", default=False,
649 help="Ignore the consistency of the disks on"
652 NONLIVE_OPT = cli_option("--non-live", dest="live",
653 default=True, action="store_false",
654 help="Do a non-live migration (this usually means"
655 " freeze the instance, save the state, transfer and"
656 " only then resume running on the secondary node)")
658 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
659 help="Target node and optional secondary node",
660 metavar="<pnode>[:<snode>]",
661 completion_suggest=OPT_COMPL_INST_ADD_NODES)
663 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
664 action="append", metavar="<node>",
665 help="Use only this node (can be used multiple"
666 " times, if not given defaults to all nodes)",
667 completion_suggest=OPT_COMPL_ONE_NODE)
669 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
671 completion_suggest=OPT_COMPL_ONE_NODE)
673 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
674 action="store_false",
675 help="Don't start the instance after creation")
677 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
678 action="store_true", default=False,
679 help="Show command instead of executing it")
681 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
682 default=False, action="store_true",
683 help="Instead of performing the migration, try to"
684 " recover from a failed cleanup. This is safe"
685 " to run even if the instance is healthy, but it"
686 " will create extra replication traffic and "
687 " disrupt briefly the replication (like during the"
690 STATIC_OPT = cli_option("-s", "--static", dest="static",
691 action="store_true", default=False,
692 help="Only show configuration data, not runtime data")
694 ALL_OPT = cli_option("--all", dest="show_all",
695 default=False, action="store_true",
696 help="Show info on all instances on the cluster."
697 " This can take a long time to run, use wisely")
699 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
700 action="store_true", default=False,
701 help="Interactive OS reinstall, lists available"
702 " OS templates for selection")
704 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
705 action="store_true", default=False,
706 help="Remove the instance from the cluster"
707 " configuration even if there are failures"
708 " during the removal process")
710 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
711 help="Specifies the new secondary node",
712 metavar="NODE", default=None,
713 completion_suggest=OPT_COMPL_ONE_NODE)
715 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
716 default=False, action="store_true",
717 help="Replace the disk(s) on the primary"
718 " node (only for the drbd template)")
720 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
721 default=False, action="store_true",
722 help="Replace the disk(s) on the secondary"
723 " node (only for the drbd template)")
725 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
726 default=False, action="store_true",
727 help="Lock all nodes and auto-promote as needed"
730 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
731 default=False, action="store_true",
732 help="Automatically replace faulty disks"
733 " (only for the drbd template)")
735 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
736 default=False, action="store_true",
737 help="Ignore current recorded size"
738 " (useful for forcing activation when"
739 " the recorded size is wrong)")
741 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
743 completion_suggest=OPT_COMPL_ONE_NODE)
745 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
748 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
749 help="Specify the secondary ip for the node",
750 metavar="ADDRESS", default=None)
752 READD_OPT = cli_option("--readd", dest="readd",
753 default=False, action="store_true",
754 help="Readd old node after replacing it")
756 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
757 default=True, action="store_false",
758 help="Disable SSH key fingerprint checking")
761 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
762 choices=_YESNO, default=None, metavar=_YORNO,
763 help="Set the master_candidate flag on the node")
765 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
766 choices=_YESNO, default=None,
767 help="Set the offline flag on the node")
769 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
770 choices=_YESNO, default=None,
771 help="Set the drained flag on the node")
773 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
774 choices=_YESNO, default=None, metavar=_YORNO,
775 help="Set the allocatable flag on a volume")
777 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
778 help="Disable support for lvm based instances"
780 action="store_false", default=True)
782 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
783 dest="enabled_hypervisors",
784 help="Comma-separated list of hypervisors",
785 type="string", default=None)
787 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
788 type="keyval", default={},
789 help="NIC parameters")
791 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
792 dest="candidate_pool_size", type="int",
793 help="Set the candidate pool size")
795 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
796 help="Enables LVM and specifies the volume group"
797 " name (cluster-wide) for disk allocation [xenvg]",
798 metavar="VG", default=None)
800 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
801 help="Destroy cluster", action="store_true")
803 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
804 help="Skip node agreement check (dangerous)",
805 action="store_true", default=False)
807 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
808 help="Specify the mac prefix for the instance IP"
809 " addresses, in the format XX:XX:XX",
813 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
814 help="Specify the node interface (cluster-wide)"
815 " on which the master IP address will be added "
816 " [%s]" % constants.DEFAULT_BRIDGE,
818 default=constants.DEFAULT_BRIDGE)
821 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
822 help="Specify the default directory (cluster-"
823 "wide) for storing the file-based disks [%s]" %
824 constants.DEFAULT_FILE_STORAGE_DIR,
826 default=constants.DEFAULT_FILE_STORAGE_DIR)
828 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
829 help="Don't modify /etc/hosts",
830 action="store_false", default=True)
832 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
833 help="Don't initialize SSH keys",
834 action="store_false", default=True)
836 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
837 help="Enable parseable error messages",
838 action="store_true", default=False)
840 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
841 help="Skip N+1 memory redundancy tests",
842 action="store_true", default=False)
844 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
845 help="Type of reboot: soft/hard/full",
846 default=constants.INSTANCE_REBOOT_HARD,
848 choices=list(constants.REBOOT_TYPES))
850 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
851 dest="ignore_secondaries",
852 default=False, action="store_true",
853 help="Ignore errors from secondaries")
855 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
856 action="store_false", default=True,
857 help="Don't shutdown the instance (unsafe)")
859 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
860 default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
861 help="Maximum time to wait")
863 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
864 dest="shutdown_timeout", type="int",
865 default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
866 help="Maximum time to wait for instance shutdown")
868 EARLY_RELEASE_OPT = cli_option("--early-release",
869 dest="early_release", default=False,
871 help="Release the locks on the secondary"
874 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
875 dest="new_cluster_cert",
876 default=False, action="store_true",
877 help="Generate a new cluster certificate")
879 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
881 help="File containing new RAPI certificate")
883 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
884 default=None, action="store_true",
885 help=("Generate a new self-signed RAPI"
888 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
889 dest="new_confd_hmac_key",
890 default=False, action="store_true",
891 help=("Create a new HMAC key for %s" %
894 USE_REPL_NET_OPT = cli_option("--use-replication-network",
895 dest="use_replication_network",
896 help="Whether to use the replication network"
897 " for talking to the nodes",
898 action="store_true", default=False)
901 def _ParseArgs(argv, commands, aliases):
902 """Parser for the command line arguments.
904 This function parses the arguments and returns the function which
905 must be executed together with its (modified) arguments.
907 @param argv: the command line
908 @param commands: dictionary with special contents, see the design
909 doc for cmdline handling
910 @param aliases: dictionary with command aliases {'alias': 'target, ...}
916 binary = argv[0].split("/")[-1]
918 if len(argv) > 1 and argv[1] == "--version":
919 ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
920 # Quit right away. That way we don't have to care about this special
921 # argument. optparse.py does it the same.
924 if len(argv) < 2 or not (argv[1] in commands or
926 # let's do a nice thing
927 sortedcmds = commands.keys()
930 ToStdout("Usage: %s {command} [options...] [argument...]", binary)
931 ToStdout("%s <command> --help to see details, or man %s", binary, binary)
934 # compute the max line length for cmd + usage
935 mlen = max([len(" %s" % cmd) for cmd in commands])
936 mlen = min(60, mlen) # should not get here...
938 # and format a nice command list
939 ToStdout("Commands:")
940 for cmd in sortedcmds:
941 cmdstr = " %s" % (cmd,)
942 help_text = commands[cmd][4]
943 help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
944 ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
945 for line in help_lines:
946 ToStdout("%-*s %s", mlen, "", line)
950 return None, None, None
952 # get command, unalias it, and look it up in commands
956 raise errors.ProgrammerError("Alias '%s' overrides an existing"
959 if aliases[cmd] not in commands:
960 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
961 " command '%s'" % (cmd, aliases[cmd]))
965 func, args_def, parser_opts, usage, description = commands[cmd]
966 parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
967 description=description,
968 formatter=TitledHelpFormatter(),
969 usage="%%prog %s %s" % (cmd, usage))
970 parser.disable_interspersed_args()
971 options, args = parser.parse_args()
973 if not _CheckArguments(cmd, args_def, args):
974 return None, None, None
976 return func, options, args
979 def _CheckArguments(cmd, args_def, args):
980 """Verifies the arguments using the argument definition.
984 1. Abort with error if values specified by user but none expected.
986 1. For each argument in definition
988 1. Keep running count of minimum number of values (min_count)
989 1. Keep running count of maximum number of values (max_count)
990 1. If it has an unlimited number of values
992 1. Abort with error if it's not the last argument in the definition
994 1. If last argument has limited number of values
996 1. Abort with error if number of values doesn't match or is too large
998 1. Abort with error if user didn't pass enough values (min_count)
1001 if args and not args_def:
1002 ToStderr("Error: Command %s expects no arguments", cmd)
1009 last_idx = len(args_def) - 1
1011 for idx, arg in enumerate(args_def):
1012 if min_count is None:
1014 elif arg.min is not None:
1015 min_count += arg.min
1017 if max_count is None:
1019 elif arg.max is not None:
1020 max_count += arg.max
1023 check_max = (arg.max is not None)
1025 elif arg.max is None:
1026 raise errors.ProgrammerError("Only the last argument can have max=None")
1029 # Command with exact number of arguments
1030 if (min_count is not None and max_count is not None and
1031 min_count == max_count and len(args) != min_count):
1032 ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1035 # Command with limited number of arguments
1036 if max_count is not None and len(args) > max_count:
1037 ToStderr("Error: Command %s expects only %d argument(s)",
1041 # Command with some required arguments
1042 if min_count is not None and len(args) < min_count:
1043 ToStderr("Error: Command %s expects at least %d argument(s)",
1050 def SplitNodeOption(value):
1051 """Splits the value of a --node option.
1054 if value and ':' in value:
1055 return value.split(':', 1)
1057 return (value, None)
1060 def CalculateOSNames(os_name, os_variants):
1061 """Calculates all the names an OS can be called, according to its variants.
1063 @type os_name: string
1064 @param os_name: base name of the os
1065 @type os_variants: list or None
1066 @param os_variants: list of supported variants
1068 @return: list of valid names
1072 return ['%s+%s' % (os_name, v) for v in os_variants]
1078 def wrapper(*args, **kwargs):
1081 return fn(*args, **kwargs)
1087 def AskUser(text, choices=None):
1088 """Ask the user a question.
1090 @param text: the question to ask
1092 @param choices: list with elements tuples (input_char, return_value,
1093 description); if not given, it will default to: [('y', True,
1094 'Perform the operation'), ('n', False, 'Do no do the operation')];
1095 note that the '?' char is reserved for help
1097 @return: one of the return values from the choices list; if input is
1098 not possible (i.e. not running with a tty, we return the last
1103 choices = [('y', True, 'Perform the operation'),
1104 ('n', False, 'Do not perform the operation')]
1105 if not choices or not isinstance(choices, list):
1106 raise errors.ProgrammerError("Invalid choices argument to AskUser")
1107 for entry in choices:
1108 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1109 raise errors.ProgrammerError("Invalid choices element to AskUser")
1111 answer = choices[-1][1]
1113 for line in text.splitlines():
1114 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1115 text = "\n".join(new_text)
1117 f = file("/dev/tty", "a+")
1121 chars = [entry[0] for entry in choices]
1122 chars[-1] = "[%s]" % chars[-1]
1124 maps = dict([(entry[0], entry[1]) for entry in choices])
1128 f.write("/".join(chars))
1130 line = f.readline(2).strip().lower()
1135 for entry in choices:
1136 f.write(" %s - %s\n" % (entry[0], entry[2]))
1144 class JobSubmittedException(Exception):
1145 """Job was submitted, client should exit.
1147 This exception has one argument, the ID of the job that was
1148 submitted. The handler should print this ID.
1150 This is not an error, just a structured way to exit from clients.
1155 def SendJob(ops, cl=None):
1156 """Function to submit an opcode without waiting for the results.
1159 @param ops: list of opcodes
1160 @type cl: luxi.Client
1161 @param cl: the luxi client to use for communicating with the master;
1162 if None, a new client will be created
1168 job_id = cl.SubmitJob(ops)
1173 def PollJob(job_id, cl=None, feedback_fn=None):
1174 """Function to poll for the result of a job.
1176 @type job_id: job identified
1177 @param job_id: the job to poll for results
1178 @type cl: luxi.Client
1179 @param cl: the luxi client to use for communicating with the master;
1180 if None, a new client will be created
1186 prev_job_info = None
1187 prev_logmsg_serial = None
1191 notified_queued = False
1192 notified_waitlock = False
1195 result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1198 # job not found, go away!
1199 raise errors.JobLost("Job with id %s lost" % job_id)
1200 elif result == constants.JOB_NOTCHANGED:
1201 if status is not None and not callable(feedback_fn):
1202 if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1203 ToStderr("Job %s is waiting in queue", job_id)
1204 notified_queued = True
1205 elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1206 ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1207 notified_waitlock = True
1212 # Split result, a tuple of (field values, log entries)
1213 (job_info, log_entries) = result
1214 (status, ) = job_info
1217 for log_entry in log_entries:
1218 (serial, timestamp, _, message) = log_entry
1219 if callable(feedback_fn):
1220 feedback_fn(log_entry[1:])
1222 encoded = utils.SafeEncode(message)
1223 ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1224 prev_logmsg_serial = max(prev_logmsg_serial, serial)
1226 # TODO: Handle canceled and archived jobs
1227 elif status in (constants.JOB_STATUS_SUCCESS,
1228 constants.JOB_STATUS_ERROR,
1229 constants.JOB_STATUS_CANCELING,
1230 constants.JOB_STATUS_CANCELED):
1233 prev_job_info = job_info
1235 jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1237 raise errors.JobLost("Job with id %s lost" % job_id)
1239 status, opstatus, result = jobs[0]
1240 if status == constants.JOB_STATUS_SUCCESS:
1242 elif status in (constants.JOB_STATUS_CANCELING,
1243 constants.JOB_STATUS_CANCELED):
1244 raise errors.OpExecError("Job was canceled")
1247 for idx, (status, msg) in enumerate(zip(opstatus, result)):
1248 if status == constants.OP_STATUS_SUCCESS:
1250 elif status == constants.OP_STATUS_ERROR:
1251 errors.MaybeRaise(msg)
1253 raise errors.OpExecError("partial failure (opcode %d): %s" %
1256 raise errors.OpExecError(str(msg))
1257 # default failure mode
1258 raise errors.OpExecError(result)
1261 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1262 """Legacy function to submit an opcode.
1264 This is just a simple wrapper over the construction of the processor
1265 instance. It should be extended to better handle feedback and
1266 interaction functions.
1272 SetGenericOpcodeOpts([op], opts)
1274 job_id = SendJob([op], cl)
1276 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1278 return op_results[0]
1281 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1282 """Wrapper around SubmitOpCode or SendJob.
1284 This function will decide, based on the 'opts' parameter, whether to
1285 submit and wait for the result of the opcode (and return it), or
1286 whether to just send the job and print its identifier. It is used in
1287 order to simplify the implementation of the '--submit' option.
1289 It will also process the opcodes if we're sending the via SendJob
1290 (otherwise SubmitOpCode does it).
1293 if opts and opts.submit_only:
1295 SetGenericOpcodeOpts(job, opts)
1296 job_id = SendJob(job, cl=cl)
1297 raise JobSubmittedException(job_id)
1299 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1302 def SetGenericOpcodeOpts(opcode_list, options):
1303 """Processor for generic options.
1305 This function updates the given opcodes based on generic command
1306 line options (like debug, dry-run, etc.).
1308 @param opcode_list: list of opcodes
1309 @param options: command line options or None
1310 @return: None (in-place modification)
1315 for op in opcode_list:
1316 op.dry_run = options.dry_run
1317 op.debug_level = options.debug
1321 # TODO: Cache object?
1323 client = luxi.Client()
1324 except luxi.NoMasterError:
1325 ss = ssconf.SimpleStore()
1327 # Try to read ssconf file
1330 except errors.ConfigurationError:
1331 raise errors.OpPrereqError("Cluster not initialized or this machine is"
1332 " not part of a cluster")
1334 master, myself = ssconf.GetMasterAndMyself(ss=ss)
1335 if master != myself:
1336 raise errors.OpPrereqError("This is not the master node, please connect"
1337 " to node '%s' and rerun the command" %
1343 def FormatError(err):
1344 """Return a formatted error message for a given error.
1346 This function takes an exception instance and returns a tuple
1347 consisting of two values: first, the recommended exit code, and
1348 second, a string describing the error message (not
1349 newline-terminated).
1355 if isinstance(err, errors.ConfigurationError):
1356 txt = "Corrupt configuration file: %s" % msg
1358 obuf.write(txt + "\n")
1359 obuf.write("Aborting.")
1361 elif isinstance(err, errors.HooksAbort):
1362 obuf.write("Failure: hooks execution failed:\n")
1363 for node, script, out in err.args[0]:
1365 obuf.write(" node: %s, script: %s, output: %s\n" %
1366 (node, script, out))
1368 obuf.write(" node: %s, script: %s (no output)\n" %
1370 elif isinstance(err, errors.HooksFailure):
1371 obuf.write("Failure: hooks general failure: %s" % msg)
1372 elif isinstance(err, errors.ResolverError):
1373 this_host = utils.HostInfo.SysName()
1374 if err.args[0] == this_host:
1375 msg = "Failure: can't resolve my own hostname ('%s')"
1377 msg = "Failure: can't resolve hostname '%s'"
1378 obuf.write(msg % err.args[0])
1379 elif isinstance(err, errors.OpPrereqError):
1380 if len(err.args) == 2:
1381 obuf.write("Failure: prerequisites not met for this"
1382 " operation:\nerror type: %s, error details:\n%s" %
1383 (err.args[1], err.args[0]))
1385 obuf.write("Failure: prerequisites not met for this"
1386 " operation:\n%s" % msg)
1387 elif isinstance(err, errors.OpExecError):
1388 obuf.write("Failure: command execution error:\n%s" % msg)
1389 elif isinstance(err, errors.TagError):
1390 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1391 elif isinstance(err, errors.JobQueueDrainError):
1392 obuf.write("Failure: the job queue is marked for drain and doesn't"
1393 " accept new requests\n")
1394 elif isinstance(err, errors.JobQueueFull):
1395 obuf.write("Failure: the job queue is full and doesn't accept new"
1396 " job submissions until old jobs are archived\n")
1397 elif isinstance(err, errors.TypeEnforcementError):
1398 obuf.write("Parameter Error: %s" % msg)
1399 elif isinstance(err, errors.ParameterError):
1400 obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1401 elif isinstance(err, errors.GenericError):
1402 obuf.write("Unhandled Ganeti error: %s" % msg)
1403 elif isinstance(err, luxi.NoMasterError):
1404 obuf.write("Cannot communicate with the master daemon.\nIs it running"
1405 " and listening for connections?")
1406 elif isinstance(err, luxi.TimeoutError):
1407 obuf.write("Timeout while talking to the master daemon. Error:\n"
1409 elif isinstance(err, luxi.ProtocolError):
1410 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1412 elif isinstance(err, JobSubmittedException):
1413 obuf.write("JobID: %s\n" % err.args[0])
1416 obuf.write("Unhandled exception: %s" % msg)
1417 return retcode, obuf.getvalue().rstrip('\n')
1420 def GenericMain(commands, override=None, aliases=None):
1421 """Generic main function for all the gnt-* commands.
1424 - commands: a dictionary with a special structure, see the design doc
1425 for command line handling.
1426 - override: if not None, we expect a dictionary with keys that will
1427 override command line options; this can be used to pass
1428 options from the scripts to generic functions
1429 - aliases: dictionary with command aliases {'alias': 'target, ...}
1432 # save the program name and the entire command line for later logging
1434 binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1435 if len(sys.argv) >= 2:
1436 binary += " " + sys.argv[1]
1437 old_cmdline = " ".join(sys.argv[2:])
1441 binary = "<unknown program>"
1448 func, options, args = _ParseArgs(sys.argv, commands, aliases)
1449 except errors.ParameterError, err:
1450 result, err_msg = FormatError(err)
1454 if func is None: # parse error
1457 if override is not None:
1458 for key, val in override.iteritems():
1459 setattr(options, key, val)
1461 utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1462 stderr_logging=True, program=binary)
1465 logging.info("run with arguments '%s'", old_cmdline)
1467 logging.info("run with no arguments")
1470 result = func(options, args)
1471 except (errors.GenericError, luxi.ProtocolError,
1472 JobSubmittedException), err:
1473 result, err_msg = FormatError(err)
1474 logging.exception("Error during command processing")
1480 def GenericInstanceCreate(mode, opts, args):
1481 """Add an instance to the cluster via either creation or import.
1483 @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1484 @param opts: the command line options selected by the user
1486 @param args: should contain only one element, the new instance name
1488 @return: the desired exit code
1493 (pnode, snode) = SplitNodeOption(opts.node)
1498 hypervisor, hvparams = opts.hypervisor
1502 nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1503 except ValueError, err:
1504 raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1505 nics = [{}] * nic_max
1506 for nidx, ndict in opts.nics:
1508 if not isinstance(ndict, dict):
1509 msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1510 raise errors.OpPrereqError(msg)
1516 # default of one nic, all auto
1519 if opts.disk_template == constants.DT_DISKLESS:
1520 if opts.disks or opts.sd_size is not None:
1521 raise errors.OpPrereqError("Diskless instance but disk"
1522 " information passed")
1525 if not opts.disks and not opts.sd_size:
1526 raise errors.OpPrereqError("No disk information specified")
1527 if opts.disks and opts.sd_size is not None:
1528 raise errors.OpPrereqError("Please use either the '--disk' or"
1530 if opts.sd_size is not None:
1531 opts.disks = [(0, {"size": opts.sd_size})]
1533 disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1534 except ValueError, err:
1535 raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1536 disks = [{}] * disk_max
1537 for didx, ddict in opts.disks:
1539 if not isinstance(ddict, dict):
1540 msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1541 raise errors.OpPrereqError(msg)
1542 elif "size" in ddict:
1543 if "adopt" in ddict:
1544 raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1545 " (disk %d)" % didx)
1547 ddict["size"] = utils.ParseUnit(ddict["size"])
1548 except ValueError, err:
1549 raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1551 elif "adopt" in ddict:
1552 if mode == constants.INSTANCE_IMPORT:
1553 raise errors.OpPrereqError("Disk adoption not allowed for instance"
1557 raise errors.OpPrereqError("Missing size or adoption source for"
1561 utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1562 utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1564 if mode == constants.INSTANCE_CREATE:
1569 no_install = opts.no_install
1570 elif mode == constants.INSTANCE_IMPORT:
1573 src_node = opts.src_node
1574 src_path = opts.src_dir
1577 raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1579 op = opcodes.OpCreateInstance(instance_name=instance,
1581 disk_template=opts.disk_template,
1583 pnode=pnode, snode=snode,
1584 ip_check=opts.ip_check,
1585 name_check=opts.name_check,
1586 wait_for_sync=opts.wait_for_sync,
1587 file_storage_dir=opts.file_storage_dir,
1588 file_driver=opts.file_driver,
1589 iallocator=opts.iallocator,
1590 hypervisor=hypervisor,
1592 beparams=opts.beparams,
1598 no_install=no_install)
1600 SubmitOrSend(op, opts)
1604 class _RunWhileClusterStoppedHelper:
1605 """Helper class for L{RunWhileClusterStopped} to simplify state management
1608 def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1609 """Initializes this class.
1611 @type feedback_fn: callable
1612 @param feedback_fn: Feedback function
1613 @type cluster_name: string
1614 @param cluster_name: Cluster name
1615 @type master_node: string
1616 @param master_node Master node name
1617 @type online_nodes: list
1618 @param online_nodes: List of names of online nodes
1621 self.feedback_fn = feedback_fn
1622 self.cluster_name = cluster_name
1623 self.master_node = master_node
1624 self.online_nodes = online_nodes
1626 self.ssh = ssh.SshRunner(self.cluster_name)
1628 self.nonmaster_nodes = [name for name in online_nodes
1629 if name != master_node]
1631 assert self.master_node not in self.nonmaster_nodes
1633 def _RunCmd(self, node_name, cmd):
1634 """Runs a command on the local or a remote machine.
1636 @type node_name: string
1637 @param node_name: Machine name
1642 if node_name is None or node_name == self.master_node:
1643 # No need to use SSH
1644 result = utils.RunCmd(cmd)
1646 result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1649 errmsg = ["Failed to run command %s" % result.cmd]
1651 errmsg.append("on node %s" % node_name)
1652 errmsg.append(": exitcode %s and error %s" %
1653 (result.exit_code, result.output))
1654 raise errors.OpExecError(" ".join(errmsg))
1656 def Call(self, fn, *args):
1657 """Call function while all daemons are stopped.
1660 @param fn: Function to be called
1663 # Pause watcher by acquiring an exclusive lock on watcher state file
1664 self.feedback_fn("Blocking watcher")
1665 watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1667 # TODO: Currently, this just blocks. There's no timeout.
1668 # TODO: Should it be a shared lock?
1669 watcher_block.Exclusive(blocking=True)
1671 # Stop master daemons, so that no new jobs can come in and all running
1673 self.feedback_fn("Stopping master daemons")
1674 self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1676 # Stop daemons on all nodes
1677 for node_name in self.online_nodes:
1678 self.feedback_fn("Stopping daemons on %s" % node_name)
1679 self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1681 # All daemons are shut down now
1683 return fn(self, *args)
1684 except Exception, err:
1685 _, errmsg = FormatError(err)
1686 logging.exception("Caught exception")
1687 self.feedback_fn(errmsg)
1690 # Start cluster again, master node last
1691 for node_name in self.nonmaster_nodes + [self.master_node]:
1692 self.feedback_fn("Starting daemons on %s" % node_name)
1693 self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1696 watcher_block.Close()
1699 def RunWhileClusterStopped(feedback_fn, fn, *args):
1700 """Calls a function while all cluster daemons are stopped.
1702 @type feedback_fn: callable
1703 @param feedback_fn: Feedback function
1705 @param fn: Function to be called when daemons are stopped
1708 feedback_fn("Gathering cluster information")
1710 # This ensures we're running on the master daemon
1713 (cluster_name, master_node) = \
1714 cl.QueryConfigValues(["cluster_name", "master_node"])
1716 online_nodes = GetOnlineNodes([], cl=cl)
1718 # Don't keep a reference to the client. The master daemon will go away.
1721 assert master_node in online_nodes
1723 return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1724 online_nodes).Call(fn, *args)
1727 def GenerateTable(headers, fields, separator, data,
1728 numfields=None, unitfields=None,
1730 """Prints a table with headers and different fields.
1733 @param headers: dictionary mapping field names to headers for
1736 @param fields: the field names corresponding to each row in
1738 @param separator: the separator to be used; if this is None,
1739 the default 'smart' algorithm is used which computes optimal
1740 field width, otherwise just the separator is used between
1743 @param data: a list of lists, each sublist being one row to be output
1744 @type numfields: list
1745 @param numfields: a list with the fields that hold numeric
1746 values and thus should be right-aligned
1747 @type unitfields: list
1748 @param unitfields: a list with the fields that hold numeric
1749 values that should be formatted with the units field
1750 @type units: string or None
1751 @param units: the units we should use for formatting, or None for
1752 automatic choice (human-readable for non-separator usage, otherwise
1753 megabytes); this is a one-letter string
1762 if numfields is None:
1764 if unitfields is None:
1767 numfields = utils.FieldSet(*numfields) # pylint: disable-msg=W0142
1768 unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1771 for field in fields:
1772 if headers and field not in headers:
1773 # TODO: handle better unknown fields (either revert to old
1774 # style of raising exception, or deal more intelligently with
1776 headers[field] = field
1777 if separator is not None:
1778 format_fields.append("%s")
1779 elif numfields.Matches(field):
1780 format_fields.append("%*s")
1782 format_fields.append("%-*s")
1784 if separator is None:
1785 mlens = [0 for name in fields]
1786 format = ' '.join(format_fields)
1788 format = separator.replace("%", "%%").join(format_fields)
1793 for idx, val in enumerate(row):
1794 if unitfields.Matches(fields[idx]):
1797 except (TypeError, ValueError):
1800 val = row[idx] = utils.FormatUnit(val, units)
1801 val = row[idx] = str(val)
1802 if separator is None:
1803 mlens[idx] = max(mlens[idx], len(val))
1808 for idx, name in enumerate(fields):
1810 if separator is None:
1811 mlens[idx] = max(mlens[idx], len(hdr))
1812 args.append(mlens[idx])
1814 result.append(format % tuple(args))
1816 if separator is None:
1817 assert len(mlens) == len(fields)
1819 if fields and not numfields.Matches(fields[-1]):
1825 line = ['-' for _ in fields]
1826 for idx in range(len(fields)):
1827 if separator is None:
1828 args.append(mlens[idx])
1829 args.append(line[idx])
1830 result.append(format % tuple(args))
1835 def FormatTimestamp(ts):
1836 """Formats a given timestamp.
1839 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1842 @return: a string with the formatted timestamp
1845 if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1848 return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1851 def ParseTimespec(value):
1852 """Parse a time specification.
1854 The following suffixed will be recognized:
1862 Without any suffix, the value will be taken to be in seconds.
1867 raise errors.OpPrereqError("Empty time specification passed")
1875 if value[-1] not in suffix_map:
1878 except (TypeError, ValueError):
1879 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1881 multiplier = suffix_map[value[-1]]
1883 if not value: # no data left after stripping the suffix
1884 raise errors.OpPrereqError("Invalid time specification (only"
1887 value = int(value) * multiplier
1888 except (TypeError, ValueError):
1889 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1893 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1894 filter_master=False):
1895 """Returns the names of online nodes.
1897 This function will also log a warning on stderr with the names of
1900 @param nodes: if not empty, use only this subset of nodes (minus the
1902 @param cl: if not None, luxi client to use
1903 @type nowarn: boolean
1904 @param nowarn: by default, this function will output a note with the
1905 offline nodes that are skipped; if this parameter is True the
1906 note is not displayed
1907 @type secondary_ips: boolean
1908 @param secondary_ips: if True, return the secondary IPs instead of the
1909 names, useful for doing network traffic over the replication interface
1911 @type filter_master: boolean
1912 @param filter_master: if True, do not return the master node in the list
1913 (useful in coordination with secondary_ips where we cannot check our
1914 node name against the list)
1926 master_node = cl.QueryConfigValues(["master_node"])[0]
1927 filter_fn = lambda x: x != master_node
1929 filter_fn = lambda _: True
1931 result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1933 offline = [row[0] for row in result if row[1]]
1934 if offline and not nowarn:
1935 ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1936 return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
1939 def _ToStream(stream, txt, *args):
1940 """Write a message to a stream, bypassing the logging system
1942 @type stream: file object
1943 @param stream: the file to which we should write
1945 @param txt: the message
1950 stream.write(txt % args)
1957 def ToStdout(txt, *args):
1958 """Write a message to stdout only, bypassing the logging system
1960 This is just a wrapper over _ToStream.
1963 @param txt: the message
1966 _ToStream(sys.stdout, txt, *args)
1969 def ToStderr(txt, *args):
1970 """Write a message to stderr only, bypassing the logging system
1972 This is just a wrapper over _ToStream.
1975 @param txt: the message
1978 _ToStream(sys.stderr, txt, *args)
1981 class JobExecutor(object):
1982 """Class which manages the submission and execution of multiple jobs.
1984 Note that instances of this class should not be reused between
1988 def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
1993 self.verbose = verbose
1996 self.feedback_fn = feedback_fn
1998 def QueueJob(self, name, *ops):
1999 """Record a job for later submit.
2002 @param name: a description of the job, will be used in WaitJobSet
2004 SetGenericOpcodeOpts(ops, self.opts)
2005 self.queue.append((name, ops))
2007 def SubmitPending(self):
2008 """Submit all pending jobs.
2011 results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2012 for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2014 self.jobs.append((idx, status, data, name))
2016 def _ChooseJob(self):
2017 """Choose a non-waiting/queued job to poll next.
2020 assert self.jobs, "_ChooseJob called with empty job list"
2022 result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2025 for job_data, status in zip(self.jobs, result):
2026 if status[0] in (constants.JOB_STATUS_QUEUED,
2027 constants.JOB_STATUS_WAITLOCK,
2028 constants.JOB_STATUS_CANCELING):
2029 # job is still waiting
2031 # good candidate found
2032 self.jobs.remove(job_data)
2036 return self.jobs.pop(0)
2038 def GetResults(self):
2039 """Wait for and return the results of all jobs.
2042 @return: list of tuples (success, job results), in the same order
2043 as the submitted jobs; if a job has failed, instead of the result
2044 there will be the error message
2048 self.SubmitPending()
2051 ok_jobs = [row[2] for row in self.jobs if row[1]]
2053 ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2055 # first, remove any non-submitted jobs
2056 self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2057 for idx, _, jid, name in failures:
2058 ToStderr("Failed to submit job for %s: %s", name, jid)
2059 results.append((idx, False, jid))
2062 (idx, _, jid, name) = self._ChooseJob()
2063 ToStdout("Waiting for job %s for %s...", jid, name)
2065 job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2067 except (errors.GenericError, luxi.ProtocolError), err:
2068 _, job_result = FormatError(err)
2070 # the error message will always be shown, verbose or not
2071 ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2073 results.append((idx, success, job_result))
2075 # sort based on the index, then drop it
2077 results = [i[1:] for i in results]
2081 def WaitOrShow(self, wait):
2082 """Wait for job results or only print the job IDs.
2085 @param wait: whether to wait or not
2089 return self.GetResults()
2092 self.SubmitPending()
2093 for status, result, name in self.jobs:
2095 ToStdout("%s: %s", result, name)
2097 ToStderr("Failure for %s: %s", name, result)