4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module dealing with command line parsing"""
30 from cStringIO import StringIO
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
41 from optparse import (OptionParser, TitledHelpFormatter,
42 Option, OptionValueError)
46 # Command line options
53 "CLUSTER_DOMAIN_SECRET_OPT",
67 "FILESTORE_DRIVER_OPT",
76 "IGNORE_FAILURES_OPT",
77 "IGNORE_REMOVE_FAILURES_OPT",
78 "IGNORE_SECONDARIES_OPT",
84 "NEW_CLUSTER_CERT_OPT",
85 "NEW_CLUSTER_DOMAIN_SECRET_OPT",
86 "NEW_CONFD_HMAC_KEY_OPT",
97 "NOMODIFY_ETCHOSTS_OPT",
98 "NOMODIFY_SSH_SETUP_OPT",
104 "NOSSH_KEYCHECK_OPT",
115 "REMOVE_INSTANCE_OPT",
120 "SHUTDOWN_TIMEOUT_OPT",
134 # Generic functions for CLI programs
136 "GenericInstanceCreate",
140 "JobSubmittedException",
142 "RunWhileClusterStopped",
146 # Formatting functions
147 "ToStderr", "ToStdout",
156 # command line options support infrastructure
157 "ARGS_MANY_INSTANCES",
173 "OPT_COMPL_INST_ADD_NODES",
174 "OPT_COMPL_MANY_NODES",
175 "OPT_COMPL_ONE_IALLOCATOR",
176 "OPT_COMPL_ONE_INSTANCE",
177 "OPT_COMPL_ONE_NODE",
189 def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
194 return ("<%s min=%s max=%s>" %
195 (self.__class__.__name__, self.min, self.max))
198 class ArgSuggest(_Argument):
199 """Suggesting argument.
201 Value can be any of the ones passed to the constructor.
204 # pylint: disable-msg=W0622
205 def __init__(self, min=0, max=None, choices=None):
206 _Argument.__init__(self, min=min, max=max)
207 self.choices = choices
210 return ("<%s min=%s max=%s choices=%r>" %
211 (self.__class__.__name__, self.min, self.max, self.choices))
214 class ArgChoice(ArgSuggest):
217 Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
218 but value must be one of the choices.
223 class ArgUnknown(_Argument):
224 """Unknown argument to program (e.g. determined at runtime).
229 class ArgInstance(_Argument):
230 """Instances argument.
235 class ArgNode(_Argument):
240 class ArgJobId(_Argument):
246 class ArgFile(_Argument):
247 """File path argument.
252 class ArgCommand(_Argument):
258 class ArgHost(_Argument):
264 class ArgOs(_Argument):
271 ARGS_MANY_INSTANCES = [ArgInstance()]
272 ARGS_MANY_NODES = [ArgNode()]
273 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
274 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
275 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
278 def _ExtractTagsObject(opts, args):
279 """Extract the tag type object.
281 Note that this function will modify its args parameter.
284 if not hasattr(opts, "tag_type"):
285 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
287 if kind == constants.TAG_CLUSTER:
289 elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
291 raise errors.OpPrereqError("no arguments passed to the command")
295 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
299 def _ExtendTags(opts, args):
300 """Extend the args if a source file has been given.
302 This function will extend the tags with the contents of the file
303 passed in the 'tags_source' attribute of the opts parameter. A file
304 named '-' will be replaced by stdin.
307 fname = opts.tags_source
313 new_fh = open(fname, "r")
316 # we don't use the nice 'new_data = [line.strip() for line in fh]'
317 # because of python bug 1633941
319 line = new_fh.readline()
322 new_data.append(line.strip())
325 args.extend(new_data)
328 def ListTags(opts, args):
329 """List the tags on a given object.
331 This is a generic implementation that knows how to deal with all
332 three cases of tag objects (cluster, node, instance). The opts
333 argument is expected to contain a tag_type field denoting what
334 object type we work on.
337 kind, name = _ExtractTagsObject(opts, args)
339 result = cl.QueryTags(kind, name)
340 result = list(result)
346 def AddTags(opts, args):
347 """Add tags on a given object.
349 This is a generic implementation that knows how to deal with all
350 three cases of tag objects (cluster, node, instance). The opts
351 argument is expected to contain a tag_type field denoting what
352 object type we work on.
355 kind, name = _ExtractTagsObject(opts, args)
356 _ExtendTags(opts, args)
358 raise errors.OpPrereqError("No tags to be added")
359 op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
363 def RemoveTags(opts, args):
364 """Remove tags from a given object.
366 This is a generic implementation that knows how to deal with all
367 three cases of tag objects (cluster, node, instance). The opts
368 argument is expected to contain a tag_type field denoting what
369 object type we work on.
372 kind, name = _ExtractTagsObject(opts, args)
373 _ExtendTags(opts, args)
375 raise errors.OpPrereqError("No tags to be removed")
376 op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
380 def check_unit(option, opt, value): # pylint: disable-msg=W0613
381 """OptParsers custom converter for units.
385 return utils.ParseUnit(value)
386 except errors.UnitParseError, err:
387 raise OptionValueError("option %s: %s" % (opt, err))
390 def _SplitKeyVal(opt, data):
391 """Convert a KeyVal string into a dict.
393 This function will convert a key=val[,...] string into a dict. Empty
394 values will be converted specially: keys which have the prefix 'no_'
395 will have the value=False and the prefix stripped, the others will
399 @param opt: a string holding the option name for which we process the
400 data, used in building error messages
402 @param data: a string of the format key=val,key=val,...
404 @return: {key=val, key=val}
405 @raises errors.ParameterError: if there are duplicate keys
410 for elem in utils.UnescapeAndSplit(data, sep=","):
412 key, val = elem.split("=", 1)
414 if elem.startswith(NO_PREFIX):
415 key, val = elem[len(NO_PREFIX):], False
416 elif elem.startswith(UN_PREFIX):
417 key, val = elem[len(UN_PREFIX):], None
419 key, val = elem, True
421 raise errors.ParameterError("Duplicate key '%s' in option %s" %
427 def check_ident_key_val(option, opt, value): # pylint: disable-msg=W0613
428 """Custom parser for ident:key=val,key=val options.
430 This will store the parsed values as a tuple (ident, {key: val}). As such,
431 multiple uses of this option via action=append is possible.
435 ident, rest = value, ''
437 ident, rest = value.split(":", 1)
439 if ident.startswith(NO_PREFIX):
441 msg = "Cannot pass options when removing parameter groups: %s" % value
442 raise errors.ParameterError(msg)
443 retval = (ident[len(NO_PREFIX):], False)
444 elif ident.startswith(UN_PREFIX):
446 msg = "Cannot pass options when removing parameter groups: %s" % value
447 raise errors.ParameterError(msg)
448 retval = (ident[len(UN_PREFIX):], None)
450 kv_dict = _SplitKeyVal(opt, rest)
451 retval = (ident, kv_dict)
455 def check_key_val(option, opt, value): # pylint: disable-msg=W0613
456 """Custom parser class for key=val,key=val options.
458 This will store the parsed values as a dict {key: val}.
461 return _SplitKeyVal(opt, value)
464 # completion_suggestion is normally a list. Using numeric values not evaluating
465 # to False for dynamic completion.
466 (OPT_COMPL_MANY_NODES,
468 OPT_COMPL_ONE_INSTANCE,
470 OPT_COMPL_ONE_IALLOCATOR,
471 OPT_COMPL_INST_ADD_NODES) = range(100, 106)
473 OPT_COMPL_ALL = frozenset([
474 OPT_COMPL_MANY_NODES,
476 OPT_COMPL_ONE_INSTANCE,
478 OPT_COMPL_ONE_IALLOCATOR,
479 OPT_COMPL_INST_ADD_NODES,
483 class CliOption(Option):
484 """Custom option class for optparse.
487 ATTRS = Option.ATTRS + [
488 "completion_suggest",
490 TYPES = Option.TYPES + (
495 TYPE_CHECKER = Option.TYPE_CHECKER.copy()
496 TYPE_CHECKER["identkeyval"] = check_ident_key_val
497 TYPE_CHECKER["keyval"] = check_key_val
498 TYPE_CHECKER["unit"] = check_unit
501 # optparse.py sets make_option, so we do it for our own option class, too
502 cli_option = CliOption
505 _YESNO = ("yes", "no")
508 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
509 help="Increase debugging level")
511 NOHDR_OPT = cli_option("--no-headers", default=False,
512 action="store_true", dest="no_headers",
513 help="Don't display column headers")
515 SEP_OPT = cli_option("--separator", default=None,
516 action="store", dest="separator",
517 help=("Separator between output fields"
518 " (defaults to one space)"))
520 USEUNITS_OPT = cli_option("--units", default=None,
521 dest="units", choices=('h', 'm', 'g', 't'),
522 help="Specify units for output (one of hmgt)")
524 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
525 type="string", metavar="FIELDS",
526 help="Comma separated list of output fields")
528 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
529 default=False, help="Force the operation")
531 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
532 default=False, help="Do not require confirmation")
534 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
535 default=None, help="File with tag names")
537 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
538 default=False, action="store_true",
539 help=("Submit the job and return the job ID, but"
540 " don't wait for the job to finish"))
542 SYNC_OPT = cli_option("--sync", dest="do_locking",
543 default=False, action="store_true",
544 help=("Grab locks while doing the queries"
545 " in order to ensure more consistent results"))
547 _DRY_RUN_OPT = cli_option("--dry-run", default=False,
549 help=("Do not execute the operation, just run the"
550 " check steps and verify it it could be"
553 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
555 help="Increase the verbosity of the operation")
557 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
558 action="store_true", dest="simulate_errors",
559 help="Debugging option that makes the operation"
560 " treat most runtime checks as failed")
562 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
563 default=True, action="store_false",
564 help="Don't wait for sync (DANGEROUS!)")
566 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
567 help="Custom disk setup (diskless, file,"
569 default=None, metavar="TEMPL",
570 choices=list(constants.DISK_TEMPLATES))
572 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
573 help="Do not create any network cards for"
576 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
577 help="Relative path under default cluster-wide"
578 " file storage dir to store file-based disks",
579 default=None, metavar="<DIR>")
581 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
582 help="Driver to use for image files",
583 default="loop", metavar="<DRIVER>",
584 choices=list(constants.FILE_DRIVER))
586 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
587 help="Select nodes for the instance automatically"
588 " using the <NAME> iallocator plugin",
589 default=None, type="string",
590 completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
592 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
594 completion_suggest=OPT_COMPL_ONE_OS)
596 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
597 action="store_true", default=False,
598 help="Force an unknown variant")
600 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
601 action="store_true", default=False,
602 help="Do not install the OS (will"
605 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
606 type="keyval", default={},
607 help="Backend parameters")
609 HVOPTS_OPT = cli_option("-H", "--hypervisor-parameters", type="keyval",
610 default={}, dest="hvparams",
611 help="Hypervisor parameters")
613 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
614 help="Hypervisor and hypervisor options, in the"
615 " format hypervisor:option=value,option=value,...",
616 default=None, type="identkeyval")
618 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
619 help="Hypervisor and hypervisor options, in the"
620 " format hypervisor:option=value,option=value,...",
621 default=[], action="append", type="identkeyval")
623 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
624 action="store_false",
625 help="Don't check that the instance's IP"
628 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
629 default=True, action="store_false",
630 help="Don't check that the instance's name"
633 NET_OPT = cli_option("--net",
634 help="NIC parameters", default=[],
635 dest="nics", action="append", type="identkeyval")
637 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
638 dest="disks", action="append", type="identkeyval")
640 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
641 help="Comma-separated list of disks"
642 " indices to act on (e.g. 0,2) (optional,"
643 " defaults to all disks)")
645 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
646 help="Enforces a single-disk configuration using the"
647 " given disk size, in MiB unless a suffix is used",
648 default=None, type="unit", metavar="<size>")
650 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
651 dest="ignore_consistency",
652 action="store_true", default=False,
653 help="Ignore the consistency of the disks on"
656 NONLIVE_OPT = cli_option("--non-live", dest="live",
657 default=True, action="store_false",
658 help="Do a non-live migration (this usually means"
659 " freeze the instance, save the state, transfer and"
660 " only then resume running on the secondary node)")
662 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
663 help="Target node and optional secondary node",
664 metavar="<pnode>[:<snode>]",
665 completion_suggest=OPT_COMPL_INST_ADD_NODES)
667 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
668 action="append", metavar="<node>",
669 help="Use only this node (can be used multiple"
670 " times, if not given defaults to all nodes)",
671 completion_suggest=OPT_COMPL_ONE_NODE)
673 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
675 completion_suggest=OPT_COMPL_ONE_NODE)
677 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
678 action="store_false",
679 help="Don't start the instance after creation")
681 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
682 action="store_true", default=False,
683 help="Show command instead of executing it")
685 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
686 default=False, action="store_true",
687 help="Instead of performing the migration, try to"
688 " recover from a failed cleanup. This is safe"
689 " to run even if the instance is healthy, but it"
690 " will create extra replication traffic and "
691 " disrupt briefly the replication (like during the"
694 STATIC_OPT = cli_option("-s", "--static", dest="static",
695 action="store_true", default=False,
696 help="Only show configuration data, not runtime data")
698 ALL_OPT = cli_option("--all", dest="show_all",
699 default=False, action="store_true",
700 help="Show info on all instances on the cluster."
701 " This can take a long time to run, use wisely")
703 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
704 action="store_true", default=False,
705 help="Interactive OS reinstall, lists available"
706 " OS templates for selection")
708 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
709 action="store_true", default=False,
710 help="Remove the instance from the cluster"
711 " configuration even if there are failures"
712 " during the removal process")
714 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
715 dest="ignore_remove_failures",
716 action="store_true", default=False,
717 help="Remove the instance from the"
718 " cluster configuration even if there"
719 " are failures during the removal"
722 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
723 action="store_true", default=False,
724 help="Remove the instance from the cluster")
726 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
727 help="Specifies the new secondary node",
728 metavar="NODE", default=None,
729 completion_suggest=OPT_COMPL_ONE_NODE)
731 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
732 default=False, action="store_true",
733 help="Replace the disk(s) on the primary"
734 " node (only for the drbd template)")
736 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
737 default=False, action="store_true",
738 help="Replace the disk(s) on the secondary"
739 " node (only for the drbd template)")
741 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
742 default=False, action="store_true",
743 help="Lock all nodes and auto-promote as needed"
746 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
747 default=False, action="store_true",
748 help="Automatically replace faulty disks"
749 " (only for the drbd template)")
751 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
752 default=False, action="store_true",
753 help="Ignore current recorded size"
754 " (useful for forcing activation when"
755 " the recorded size is wrong)")
757 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
759 completion_suggest=OPT_COMPL_ONE_NODE)
761 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
764 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
765 help="Specify the secondary ip for the node",
766 metavar="ADDRESS", default=None)
768 READD_OPT = cli_option("--readd", dest="readd",
769 default=False, action="store_true",
770 help="Readd old node after replacing it")
772 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
773 default=True, action="store_false",
774 help="Disable SSH key fingerprint checking")
777 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
778 choices=_YESNO, default=None, metavar=_YORNO,
779 help="Set the master_candidate flag on the node")
781 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
782 choices=_YESNO, default=None,
783 help="Set the offline flag on the node")
785 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
786 choices=_YESNO, default=None,
787 help="Set the drained flag on the node")
789 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
790 choices=_YESNO, default=None, metavar=_YORNO,
791 help="Set the allocatable flag on a volume")
793 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
794 help="Disable support for lvm based instances"
796 action="store_false", default=True)
798 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
799 dest="enabled_hypervisors",
800 help="Comma-separated list of hypervisors",
801 type="string", default=None)
803 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
804 type="keyval", default={},
805 help="NIC parameters")
807 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
808 dest="candidate_pool_size", type="int",
809 help="Set the candidate pool size")
811 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
812 help="Enables LVM and specifies the volume group"
813 " name (cluster-wide) for disk allocation [xenvg]",
814 metavar="VG", default=None)
816 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
817 help="Destroy cluster", action="store_true")
819 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
820 help="Skip node agreement check (dangerous)",
821 action="store_true", default=False)
823 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
824 help="Specify the mac prefix for the instance IP"
825 " addresses, in the format XX:XX:XX",
829 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
830 help="Specify the node interface (cluster-wide)"
831 " on which the master IP address will be added "
832 " [%s]" % constants.DEFAULT_BRIDGE,
834 default=constants.DEFAULT_BRIDGE)
836 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
837 help="Specify the default directory (cluster-"
838 "wide) for storing the file-based disks [%s]" %
839 constants.DEFAULT_FILE_STORAGE_DIR,
841 default=constants.DEFAULT_FILE_STORAGE_DIR)
843 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
844 help="Don't modify /etc/hosts",
845 action="store_false", default=True)
847 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
848 help="Don't initialize SSH keys",
849 action="store_false", default=True)
851 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
852 help="Enable parseable error messages",
853 action="store_true", default=False)
855 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
856 help="Skip N+1 memory redundancy tests",
857 action="store_true", default=False)
859 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
860 help="Type of reboot: soft/hard/full",
861 default=constants.INSTANCE_REBOOT_HARD,
863 choices=list(constants.REBOOT_TYPES))
865 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
866 dest="ignore_secondaries",
867 default=False, action="store_true",
868 help="Ignore errors from secondaries")
870 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
871 action="store_false", default=True,
872 help="Don't shutdown the instance (unsafe)")
874 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
875 default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
876 help="Maximum time to wait")
878 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
879 dest="shutdown_timeout", type="int",
880 default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
881 help="Maximum time to wait for instance shutdown")
883 EARLY_RELEASE_OPT = cli_option("--early-release",
884 dest="early_release", default=False,
886 help="Release the locks on the secondary"
889 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
890 dest="new_cluster_cert",
891 default=False, action="store_true",
892 help="Generate a new cluster certificate")
894 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
896 help="File containing new RAPI certificate")
898 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
899 default=None, action="store_true",
900 help=("Generate a new self-signed RAPI"
903 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
904 dest="new_confd_hmac_key",
905 default=False, action="store_true",
906 help=("Create a new HMAC key for %s" %
909 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
910 dest="cluster_domain_secret",
912 help=("Load new new cluster domain"
913 " secret from file"))
915 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
916 dest="new_cluster_domain_secret",
917 default=False, action="store_true",
918 help=("Create a new cluster domain"
921 USE_REPL_NET_OPT = cli_option("--use-replication-network",
922 dest="use_replication_network",
923 help="Whether to use the replication network"
924 " for talking to the nodes",
925 action="store_true", default=False)
928 def _ParseArgs(argv, commands, aliases):
929 """Parser for the command line arguments.
931 This function parses the arguments and returns the function which
932 must be executed together with its (modified) arguments.
934 @param argv: the command line
935 @param commands: dictionary with special contents, see the design
936 doc for cmdline handling
937 @param aliases: dictionary with command aliases {'alias': 'target, ...}
943 binary = argv[0].split("/")[-1]
945 if len(argv) > 1 and argv[1] == "--version":
946 ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
947 # Quit right away. That way we don't have to care about this special
948 # argument. optparse.py does it the same.
951 if len(argv) < 2 or not (argv[1] in commands or
953 # let's do a nice thing
954 sortedcmds = commands.keys()
957 ToStdout("Usage: %s {command} [options...] [argument...]", binary)
958 ToStdout("%s <command> --help to see details, or man %s", binary, binary)
961 # compute the max line length for cmd + usage
962 mlen = max([len(" %s" % cmd) for cmd in commands])
963 mlen = min(60, mlen) # should not get here...
965 # and format a nice command list
966 ToStdout("Commands:")
967 for cmd in sortedcmds:
968 cmdstr = " %s" % (cmd,)
969 help_text = commands[cmd][4]
970 help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
971 ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
972 for line in help_lines:
973 ToStdout("%-*s %s", mlen, "", line)
977 return None, None, None
979 # get command, unalias it, and look it up in commands
983 raise errors.ProgrammerError("Alias '%s' overrides an existing"
986 if aliases[cmd] not in commands:
987 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
988 " command '%s'" % (cmd, aliases[cmd]))
992 func, args_def, parser_opts, usage, description = commands[cmd]
993 parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
994 description=description,
995 formatter=TitledHelpFormatter(),
996 usage="%%prog %s %s" % (cmd, usage))
997 parser.disable_interspersed_args()
998 options, args = parser.parse_args()
1000 if not _CheckArguments(cmd, args_def, args):
1001 return None, None, None
1003 return func, options, args
1006 def _CheckArguments(cmd, args_def, args):
1007 """Verifies the arguments using the argument definition.
1011 1. Abort with error if values specified by user but none expected.
1013 1. For each argument in definition
1015 1. Keep running count of minimum number of values (min_count)
1016 1. Keep running count of maximum number of values (max_count)
1017 1. If it has an unlimited number of values
1019 1. Abort with error if it's not the last argument in the definition
1021 1. If last argument has limited number of values
1023 1. Abort with error if number of values doesn't match or is too large
1025 1. Abort with error if user didn't pass enough values (min_count)
1028 if args and not args_def:
1029 ToStderr("Error: Command %s expects no arguments", cmd)
1036 last_idx = len(args_def) - 1
1038 for idx, arg in enumerate(args_def):
1039 if min_count is None:
1041 elif arg.min is not None:
1042 min_count += arg.min
1044 if max_count is None:
1046 elif arg.max is not None:
1047 max_count += arg.max
1050 check_max = (arg.max is not None)
1052 elif arg.max is None:
1053 raise errors.ProgrammerError("Only the last argument can have max=None")
1056 # Command with exact number of arguments
1057 if (min_count is not None and max_count is not None and
1058 min_count == max_count and len(args) != min_count):
1059 ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1062 # Command with limited number of arguments
1063 if max_count is not None and len(args) > max_count:
1064 ToStderr("Error: Command %s expects only %d argument(s)",
1068 # Command with some required arguments
1069 if min_count is not None and len(args) < min_count:
1070 ToStderr("Error: Command %s expects at least %d argument(s)",
1077 def SplitNodeOption(value):
1078 """Splits the value of a --node option.
1081 if value and ':' in value:
1082 return value.split(':', 1)
1084 return (value, None)
1087 def CalculateOSNames(os_name, os_variants):
1088 """Calculates all the names an OS can be called, according to its variants.
1090 @type os_name: string
1091 @param os_name: base name of the os
1092 @type os_variants: list or None
1093 @param os_variants: list of supported variants
1095 @return: list of valid names
1099 return ['%s+%s' % (os_name, v) for v in os_variants]
1105 def wrapper(*args, **kwargs):
1108 return fn(*args, **kwargs)
1114 def AskUser(text, choices=None):
1115 """Ask the user a question.
1117 @param text: the question to ask
1119 @param choices: list with elements tuples (input_char, return_value,
1120 description); if not given, it will default to: [('y', True,
1121 'Perform the operation'), ('n', False, 'Do no do the operation')];
1122 note that the '?' char is reserved for help
1124 @return: one of the return values from the choices list; if input is
1125 not possible (i.e. not running with a tty, we return the last
1130 choices = [('y', True, 'Perform the operation'),
1131 ('n', False, 'Do not perform the operation')]
1132 if not choices or not isinstance(choices, list):
1133 raise errors.ProgrammerError("Invalid choices argument to AskUser")
1134 for entry in choices:
1135 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1136 raise errors.ProgrammerError("Invalid choices element to AskUser")
1138 answer = choices[-1][1]
1140 for line in text.splitlines():
1141 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1142 text = "\n".join(new_text)
1144 f = file("/dev/tty", "a+")
1148 chars = [entry[0] for entry in choices]
1149 chars[-1] = "[%s]" % chars[-1]
1151 maps = dict([(entry[0], entry[1]) for entry in choices])
1155 f.write("/".join(chars))
1157 line = f.readline(2).strip().lower()
1162 for entry in choices:
1163 f.write(" %s - %s\n" % (entry[0], entry[2]))
1171 class JobSubmittedException(Exception):
1172 """Job was submitted, client should exit.
1174 This exception has one argument, the ID of the job that was
1175 submitted. The handler should print this ID.
1177 This is not an error, just a structured way to exit from clients.
1182 def SendJob(ops, cl=None):
1183 """Function to submit an opcode without waiting for the results.
1186 @param ops: list of opcodes
1187 @type cl: luxi.Client
1188 @param cl: the luxi client to use for communicating with the master;
1189 if None, a new client will be created
1195 job_id = cl.SubmitJob(ops)
1200 def PollJob(job_id, cl=None, feedback_fn=None):
1201 """Function to poll for the result of a job.
1203 @type job_id: job identified
1204 @param job_id: the job to poll for results
1205 @type cl: luxi.Client
1206 @param cl: the luxi client to use for communicating with the master;
1207 if None, a new client will be created
1213 prev_job_info = None
1214 prev_logmsg_serial = None
1218 notified_queued = False
1219 notified_waitlock = False
1222 result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1225 # job not found, go away!
1226 raise errors.JobLost("Job with id %s lost" % job_id)
1227 elif result == constants.JOB_NOTCHANGED:
1228 if status is not None and not callable(feedback_fn):
1229 if status == constants.JOB_STATUS_QUEUED and not notified_queued:
1230 ToStderr("Job %s is waiting in queue", job_id)
1231 notified_queued = True
1232 elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
1233 ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1234 notified_waitlock = True
1239 # Split result, a tuple of (field values, log entries)
1240 (job_info, log_entries) = result
1241 (status, ) = job_info
1244 for log_entry in log_entries:
1245 (serial, timestamp, _, message) = log_entry
1246 if callable(feedback_fn):
1247 feedback_fn(log_entry[1:])
1249 encoded = utils.SafeEncode(message)
1250 ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
1251 prev_logmsg_serial = max(prev_logmsg_serial, serial)
1253 # TODO: Handle canceled and archived jobs
1254 elif status in (constants.JOB_STATUS_SUCCESS,
1255 constants.JOB_STATUS_ERROR,
1256 constants.JOB_STATUS_CANCELING,
1257 constants.JOB_STATUS_CANCELED):
1260 prev_job_info = job_info
1262 jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1264 raise errors.JobLost("Job with id %s lost" % job_id)
1266 status, opstatus, result = jobs[0]
1267 if status == constants.JOB_STATUS_SUCCESS:
1269 elif status in (constants.JOB_STATUS_CANCELING,
1270 constants.JOB_STATUS_CANCELED):
1271 raise errors.OpExecError("Job was canceled")
1274 for idx, (status, msg) in enumerate(zip(opstatus, result)):
1275 if status == constants.OP_STATUS_SUCCESS:
1277 elif status == constants.OP_STATUS_ERROR:
1278 errors.MaybeRaise(msg)
1280 raise errors.OpExecError("partial failure (opcode %d): %s" %
1283 raise errors.OpExecError(str(msg))
1284 # default failure mode
1285 raise errors.OpExecError(result)
1288 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1289 """Legacy function to submit an opcode.
1291 This is just a simple wrapper over the construction of the processor
1292 instance. It should be extended to better handle feedback and
1293 interaction functions.
1299 SetGenericOpcodeOpts([op], opts)
1301 job_id = SendJob([op], cl)
1303 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1305 return op_results[0]
1308 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1309 """Wrapper around SubmitOpCode or SendJob.
1311 This function will decide, based on the 'opts' parameter, whether to
1312 submit and wait for the result of the opcode (and return it), or
1313 whether to just send the job and print its identifier. It is used in
1314 order to simplify the implementation of the '--submit' option.
1316 It will also process the opcodes if we're sending the via SendJob
1317 (otherwise SubmitOpCode does it).
1320 if opts and opts.submit_only:
1322 SetGenericOpcodeOpts(job, opts)
1323 job_id = SendJob(job, cl=cl)
1324 raise JobSubmittedException(job_id)
1326 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1329 def SetGenericOpcodeOpts(opcode_list, options):
1330 """Processor for generic options.
1332 This function updates the given opcodes based on generic command
1333 line options (like debug, dry-run, etc.).
1335 @param opcode_list: list of opcodes
1336 @param options: command line options or None
1337 @return: None (in-place modification)
1342 for op in opcode_list:
1343 op.dry_run = options.dry_run
1344 op.debug_level = options.debug
1348 # TODO: Cache object?
1350 client = luxi.Client()
1351 except luxi.NoMasterError:
1352 ss = ssconf.SimpleStore()
1354 # Try to read ssconf file
1357 except errors.ConfigurationError:
1358 raise errors.OpPrereqError("Cluster not initialized or this machine is"
1359 " not part of a cluster")
1361 master, myself = ssconf.GetMasterAndMyself(ss=ss)
1362 if master != myself:
1363 raise errors.OpPrereqError("This is not the master node, please connect"
1364 " to node '%s' and rerun the command" %
1370 def FormatError(err):
1371 """Return a formatted error message for a given error.
1373 This function takes an exception instance and returns a tuple
1374 consisting of two values: first, the recommended exit code, and
1375 second, a string describing the error message (not
1376 newline-terminated).
1382 if isinstance(err, errors.ConfigurationError):
1383 txt = "Corrupt configuration file: %s" % msg
1385 obuf.write(txt + "\n")
1386 obuf.write("Aborting.")
1388 elif isinstance(err, errors.HooksAbort):
1389 obuf.write("Failure: hooks execution failed:\n")
1390 for node, script, out in err.args[0]:
1392 obuf.write(" node: %s, script: %s, output: %s\n" %
1393 (node, script, out))
1395 obuf.write(" node: %s, script: %s (no output)\n" %
1397 elif isinstance(err, errors.HooksFailure):
1398 obuf.write("Failure: hooks general failure: %s" % msg)
1399 elif isinstance(err, errors.ResolverError):
1400 this_host = utils.HostInfo.SysName()
1401 if err.args[0] == this_host:
1402 msg = "Failure: can't resolve my own hostname ('%s')"
1404 msg = "Failure: can't resolve hostname '%s'"
1405 obuf.write(msg % err.args[0])
1406 elif isinstance(err, errors.OpPrereqError):
1407 if len(err.args) == 2:
1408 obuf.write("Failure: prerequisites not met for this"
1409 " operation:\nerror type: %s, error details:\n%s" %
1410 (err.args[1], err.args[0]))
1412 obuf.write("Failure: prerequisites not met for this"
1413 " operation:\n%s" % msg)
1414 elif isinstance(err, errors.OpExecError):
1415 obuf.write("Failure: command execution error:\n%s" % msg)
1416 elif isinstance(err, errors.TagError):
1417 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1418 elif isinstance(err, errors.JobQueueDrainError):
1419 obuf.write("Failure: the job queue is marked for drain and doesn't"
1420 " accept new requests\n")
1421 elif isinstance(err, errors.JobQueueFull):
1422 obuf.write("Failure: the job queue is full and doesn't accept new"
1423 " job submissions until old jobs are archived\n")
1424 elif isinstance(err, errors.TypeEnforcementError):
1425 obuf.write("Parameter Error: %s" % msg)
1426 elif isinstance(err, errors.ParameterError):
1427 obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1428 elif isinstance(err, luxi.NoMasterError):
1429 obuf.write("Cannot communicate with the master daemon.\nIs it running"
1430 " and listening for connections?")
1431 elif isinstance(err, luxi.TimeoutError):
1432 obuf.write("Timeout while talking to the master daemon. Error:\n"
1434 elif isinstance(err, luxi.ProtocolError):
1435 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1437 elif isinstance(err, errors.GenericError):
1438 obuf.write("Unhandled Ganeti error: %s" % msg)
1439 elif isinstance(err, JobSubmittedException):
1440 obuf.write("JobID: %s\n" % err.args[0])
1443 obuf.write("Unhandled exception: %s" % msg)
1444 return retcode, obuf.getvalue().rstrip('\n')
1447 def GenericMain(commands, override=None, aliases=None):
1448 """Generic main function for all the gnt-* commands.
1451 - commands: a dictionary with a special structure, see the design doc
1452 for command line handling.
1453 - override: if not None, we expect a dictionary with keys that will
1454 override command line options; this can be used to pass
1455 options from the scripts to generic functions
1456 - aliases: dictionary with command aliases {'alias': 'target, ...}
1459 # save the program name and the entire command line for later logging
1461 binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1462 if len(sys.argv) >= 2:
1463 binary += " " + sys.argv[1]
1464 old_cmdline = " ".join(sys.argv[2:])
1468 binary = "<unknown program>"
1475 func, options, args = _ParseArgs(sys.argv, commands, aliases)
1476 except errors.ParameterError, err:
1477 result, err_msg = FormatError(err)
1481 if func is None: # parse error
1484 if override is not None:
1485 for key, val in override.iteritems():
1486 setattr(options, key, val)
1488 utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1489 stderr_logging=True, program=binary)
1492 logging.info("run with arguments '%s'", old_cmdline)
1494 logging.info("run with no arguments")
1497 result = func(options, args)
1498 except (errors.GenericError, luxi.ProtocolError,
1499 JobSubmittedException), err:
1500 result, err_msg = FormatError(err)
1501 logging.exception("Error during command processing")
1507 def GenericInstanceCreate(mode, opts, args):
1508 """Add an instance to the cluster via either creation or import.
1510 @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1511 @param opts: the command line options selected by the user
1513 @param args: should contain only one element, the new instance name
1515 @return: the desired exit code
1520 (pnode, snode) = SplitNodeOption(opts.node)
1525 hypervisor, hvparams = opts.hypervisor
1529 nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1530 except ValueError, err:
1531 raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1532 nics = [{}] * nic_max
1533 for nidx, ndict in opts.nics:
1535 if not isinstance(ndict, dict):
1536 msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1537 raise errors.OpPrereqError(msg)
1543 # default of one nic, all auto
1546 if opts.disk_template == constants.DT_DISKLESS:
1547 if opts.disks or opts.sd_size is not None:
1548 raise errors.OpPrereqError("Diskless instance but disk"
1549 " information passed")
1552 if not opts.disks and not opts.sd_size:
1553 raise errors.OpPrereqError("No disk information specified")
1554 if opts.disks and opts.sd_size is not None:
1555 raise errors.OpPrereqError("Please use either the '--disk' or"
1557 if opts.sd_size is not None:
1558 opts.disks = [(0, {"size": opts.sd_size})]
1560 disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1561 except ValueError, err:
1562 raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1563 disks = [{}] * disk_max
1564 for didx, ddict in opts.disks:
1566 if not isinstance(ddict, dict):
1567 msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1568 raise errors.OpPrereqError(msg)
1569 elif "size" in ddict:
1570 if "adopt" in ddict:
1571 raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1572 " (disk %d)" % didx)
1574 ddict["size"] = utils.ParseUnit(ddict["size"])
1575 except ValueError, err:
1576 raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1578 elif "adopt" in ddict:
1579 if mode == constants.INSTANCE_IMPORT:
1580 raise errors.OpPrereqError("Disk adoption not allowed for instance"
1584 raise errors.OpPrereqError("Missing size or adoption source for"
1588 utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1589 utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1591 if mode == constants.INSTANCE_CREATE:
1596 no_install = opts.no_install
1597 elif mode == constants.INSTANCE_IMPORT:
1600 src_node = opts.src_node
1601 src_path = opts.src_dir
1604 raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1606 op = opcodes.OpCreateInstance(instance_name=instance,
1608 disk_template=opts.disk_template,
1610 pnode=pnode, snode=snode,
1611 ip_check=opts.ip_check,
1612 name_check=opts.name_check,
1613 wait_for_sync=opts.wait_for_sync,
1614 file_storage_dir=opts.file_storage_dir,
1615 file_driver=opts.file_driver,
1616 iallocator=opts.iallocator,
1617 hypervisor=hypervisor,
1619 beparams=opts.beparams,
1625 no_install=no_install)
1627 SubmitOrSend(op, opts)
1631 class _RunWhileClusterStoppedHelper:
1632 """Helper class for L{RunWhileClusterStopped} to simplify state management
1635 def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1636 """Initializes this class.
1638 @type feedback_fn: callable
1639 @param feedback_fn: Feedback function
1640 @type cluster_name: string
1641 @param cluster_name: Cluster name
1642 @type master_node: string
1643 @param master_node Master node name
1644 @type online_nodes: list
1645 @param online_nodes: List of names of online nodes
1648 self.feedback_fn = feedback_fn
1649 self.cluster_name = cluster_name
1650 self.master_node = master_node
1651 self.online_nodes = online_nodes
1653 self.ssh = ssh.SshRunner(self.cluster_name)
1655 self.nonmaster_nodes = [name for name in online_nodes
1656 if name != master_node]
1658 assert self.master_node not in self.nonmaster_nodes
1660 def _RunCmd(self, node_name, cmd):
1661 """Runs a command on the local or a remote machine.
1663 @type node_name: string
1664 @param node_name: Machine name
1669 if node_name is None or node_name == self.master_node:
1670 # No need to use SSH
1671 result = utils.RunCmd(cmd)
1673 result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1676 errmsg = ["Failed to run command %s" % result.cmd]
1678 errmsg.append("on node %s" % node_name)
1679 errmsg.append(": exitcode %s and error %s" %
1680 (result.exit_code, result.output))
1681 raise errors.OpExecError(" ".join(errmsg))
1683 def Call(self, fn, *args):
1684 """Call function while all daemons are stopped.
1687 @param fn: Function to be called
1690 # Pause watcher by acquiring an exclusive lock on watcher state file
1691 self.feedback_fn("Blocking watcher")
1692 watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1694 # TODO: Currently, this just blocks. There's no timeout.
1695 # TODO: Should it be a shared lock?
1696 watcher_block.Exclusive(blocking=True)
1698 # Stop master daemons, so that no new jobs can come in and all running
1700 self.feedback_fn("Stopping master daemons")
1701 self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1703 # Stop daemons on all nodes
1704 for node_name in self.online_nodes:
1705 self.feedback_fn("Stopping daemons on %s" % node_name)
1706 self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1708 # All daemons are shut down now
1710 return fn(self, *args)
1711 except Exception, err:
1712 _, errmsg = FormatError(err)
1713 logging.exception("Caught exception")
1714 self.feedback_fn(errmsg)
1717 # Start cluster again, master node last
1718 for node_name in self.nonmaster_nodes + [self.master_node]:
1719 self.feedback_fn("Starting daemons on %s" % node_name)
1720 self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1723 watcher_block.Close()
1726 def RunWhileClusterStopped(feedback_fn, fn, *args):
1727 """Calls a function while all cluster daemons are stopped.
1729 @type feedback_fn: callable
1730 @param feedback_fn: Feedback function
1732 @param fn: Function to be called when daemons are stopped
1735 feedback_fn("Gathering cluster information")
1737 # This ensures we're running on the master daemon
1740 (cluster_name, master_node) = \
1741 cl.QueryConfigValues(["cluster_name", "master_node"])
1743 online_nodes = GetOnlineNodes([], cl=cl)
1745 # Don't keep a reference to the client. The master daemon will go away.
1748 assert master_node in online_nodes
1750 return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1751 online_nodes).Call(fn, *args)
1754 def GenerateTable(headers, fields, separator, data,
1755 numfields=None, unitfields=None,
1757 """Prints a table with headers and different fields.
1760 @param headers: dictionary mapping field names to headers for
1763 @param fields: the field names corresponding to each row in
1765 @param separator: the separator to be used; if this is None,
1766 the default 'smart' algorithm is used which computes optimal
1767 field width, otherwise just the separator is used between
1770 @param data: a list of lists, each sublist being one row to be output
1771 @type numfields: list
1772 @param numfields: a list with the fields that hold numeric
1773 values and thus should be right-aligned
1774 @type unitfields: list
1775 @param unitfields: a list with the fields that hold numeric
1776 values that should be formatted with the units field
1777 @type units: string or None
1778 @param units: the units we should use for formatting, or None for
1779 automatic choice (human-readable for non-separator usage, otherwise
1780 megabytes); this is a one-letter string
1789 if numfields is None:
1791 if unitfields is None:
1794 numfields = utils.FieldSet(*numfields) # pylint: disable-msg=W0142
1795 unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1798 for field in fields:
1799 if headers and field not in headers:
1800 # TODO: handle better unknown fields (either revert to old
1801 # style of raising exception, or deal more intelligently with
1803 headers[field] = field
1804 if separator is not None:
1805 format_fields.append("%s")
1806 elif numfields.Matches(field):
1807 format_fields.append("%*s")
1809 format_fields.append("%-*s")
1811 if separator is None:
1812 mlens = [0 for name in fields]
1813 format = ' '.join(format_fields)
1815 format = separator.replace("%", "%%").join(format_fields)
1820 for idx, val in enumerate(row):
1821 if unitfields.Matches(fields[idx]):
1824 except (TypeError, ValueError):
1827 val = row[idx] = utils.FormatUnit(val, units)
1828 val = row[idx] = str(val)
1829 if separator is None:
1830 mlens[idx] = max(mlens[idx], len(val))
1835 for idx, name in enumerate(fields):
1837 if separator is None:
1838 mlens[idx] = max(mlens[idx], len(hdr))
1839 args.append(mlens[idx])
1841 result.append(format % tuple(args))
1843 if separator is None:
1844 assert len(mlens) == len(fields)
1846 if fields and not numfields.Matches(fields[-1]):
1852 line = ['-' for _ in fields]
1853 for idx in range(len(fields)):
1854 if separator is None:
1855 args.append(mlens[idx])
1856 args.append(line[idx])
1857 result.append(format % tuple(args))
1862 def FormatTimestamp(ts):
1863 """Formats a given timestamp.
1866 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
1869 @return: a string with the formatted timestamp
1872 if not isinstance (ts, (tuple, list)) or len(ts) != 2:
1875 return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
1878 def ParseTimespec(value):
1879 """Parse a time specification.
1881 The following suffixed will be recognized:
1889 Without any suffix, the value will be taken to be in seconds.
1894 raise errors.OpPrereqError("Empty time specification passed")
1902 if value[-1] not in suffix_map:
1905 except (TypeError, ValueError):
1906 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1908 multiplier = suffix_map[value[-1]]
1910 if not value: # no data left after stripping the suffix
1911 raise errors.OpPrereqError("Invalid time specification (only"
1914 value = int(value) * multiplier
1915 except (TypeError, ValueError):
1916 raise errors.OpPrereqError("Invalid time specification '%s'" % value)
1920 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
1921 filter_master=False):
1922 """Returns the names of online nodes.
1924 This function will also log a warning on stderr with the names of
1927 @param nodes: if not empty, use only this subset of nodes (minus the
1929 @param cl: if not None, luxi client to use
1930 @type nowarn: boolean
1931 @param nowarn: by default, this function will output a note with the
1932 offline nodes that are skipped; if this parameter is True the
1933 note is not displayed
1934 @type secondary_ips: boolean
1935 @param secondary_ips: if True, return the secondary IPs instead of the
1936 names, useful for doing network traffic over the replication interface
1938 @type filter_master: boolean
1939 @param filter_master: if True, do not return the master node in the list
1940 (useful in coordination with secondary_ips where we cannot check our
1941 node name against the list)
1953 master_node = cl.QueryConfigValues(["master_node"])[0]
1954 filter_fn = lambda x: x != master_node
1956 filter_fn = lambda _: True
1958 result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
1960 offline = [row[0] for row in result if row[1]]
1961 if offline and not nowarn:
1962 ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
1963 return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
1966 def _ToStream(stream, txt, *args):
1967 """Write a message to a stream, bypassing the logging system
1969 @type stream: file object
1970 @param stream: the file to which we should write
1972 @param txt: the message
1977 stream.write(txt % args)
1984 def ToStdout(txt, *args):
1985 """Write a message to stdout only, bypassing the logging system
1987 This is just a wrapper over _ToStream.
1990 @param txt: the message
1993 _ToStream(sys.stdout, txt, *args)
1996 def ToStderr(txt, *args):
1997 """Write a message to stderr only, bypassing the logging system
1999 This is just a wrapper over _ToStream.
2002 @param txt: the message
2005 _ToStream(sys.stderr, txt, *args)
2008 class JobExecutor(object):
2009 """Class which manages the submission and execution of multiple jobs.
2011 Note that instances of this class should not be reused between
2015 def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2020 self.verbose = verbose
2023 self.feedback_fn = feedback_fn
2025 def QueueJob(self, name, *ops):
2026 """Record a job for later submit.
2029 @param name: a description of the job, will be used in WaitJobSet
2031 SetGenericOpcodeOpts(ops, self.opts)
2032 self.queue.append((name, ops))
2034 def SubmitPending(self):
2035 """Submit all pending jobs.
2038 results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2039 for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2041 self.jobs.append((idx, status, data, name))
2043 def _ChooseJob(self):
2044 """Choose a non-waiting/queued job to poll next.
2047 assert self.jobs, "_ChooseJob called with empty job list"
2049 result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2052 for job_data, status in zip(self.jobs, result):
2053 if status[0] in (constants.JOB_STATUS_QUEUED,
2054 constants.JOB_STATUS_WAITLOCK,
2055 constants.JOB_STATUS_CANCELING):
2056 # job is still waiting
2058 # good candidate found
2059 self.jobs.remove(job_data)
2063 return self.jobs.pop(0)
2065 def GetResults(self):
2066 """Wait for and return the results of all jobs.
2069 @return: list of tuples (success, job results), in the same order
2070 as the submitted jobs; if a job has failed, instead of the result
2071 there will be the error message
2075 self.SubmitPending()
2078 ok_jobs = [row[2] for row in self.jobs if row[1]]
2080 ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2082 # first, remove any non-submitted jobs
2083 self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
2084 for idx, _, jid, name in failures:
2085 ToStderr("Failed to submit job for %s: %s", name, jid)
2086 results.append((idx, False, jid))
2089 (idx, _, jid, name) = self._ChooseJob()
2090 ToStdout("Waiting for job %s for %s...", jid, name)
2092 job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2094 except (errors.GenericError, luxi.ProtocolError), err:
2095 _, job_result = FormatError(err)
2097 # the error message will always be shown, verbose or not
2098 ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2100 results.append((idx, success, job_result))
2102 # sort based on the index, then drop it
2104 results = [i[1:] for i in results]
2108 def WaitOrShow(self, wait):
2109 """Wait for job results or only print the job IDs.
2112 @param wait: whether to wait or not
2116 return self.GetResults()
2119 self.SubmitPending()
2120 for status, result, name in self.jobs:
2122 ToStdout("%s: %s", result, name)
2124 ToStderr("Failure for %s: %s", name, result)