cluster verify and instance disks on offline nodes
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41 from ganeti import netutils
42 from ganeti import qlang
43
44 from optparse import (OptionParser, TitledHelpFormatter,
45                       Option, OptionValueError)
46
47
48 __all__ = [
49   # Command line options
50   "ADD_UIDS_OPT",
51   "ALLOCATABLE_OPT",
52   "ALLOC_POLICY_OPT",
53   "ALL_OPT",
54   "AUTO_PROMOTE_OPT",
55   "AUTO_REPLACE_OPT",
56   "BACKEND_OPT",
57   "BLK_OS_OPT",
58   "CAPAB_MASTER_OPT",
59   "CAPAB_VM_OPT",
60   "CLEANUP_OPT",
61   "CLUSTER_DOMAIN_SECRET_OPT",
62   "CONFIRM_OPT",
63   "CP_SIZE_OPT",
64   "DEBUG_OPT",
65   "DEBUG_SIMERR_OPT",
66   "DISKIDX_OPT",
67   "DISK_OPT",
68   "DISK_TEMPLATE_OPT",
69   "DRAINED_OPT",
70   "DRY_RUN_OPT",
71   "DRBD_HELPER_OPT",
72   "EARLY_RELEASE_OPT",
73   "ENABLED_HV_OPT",
74   "ERROR_CODES_OPT",
75   "FIELDS_OPT",
76   "FILESTORE_DIR_OPT",
77   "FILESTORE_DRIVER_OPT",
78   "FORCE_OPT",
79   "FORCE_VARIANT_OPT",
80   "GLOBAL_FILEDIR_OPT",
81   "HID_OS_OPT",
82   "HVLIST_OPT",
83   "HVOPTS_OPT",
84   "HYPERVISOR_OPT",
85   "IALLOCATOR_OPT",
86   "DEFAULT_IALLOCATOR_OPT",
87   "IDENTIFY_DEFAULTS_OPT",
88   "IGNORE_CONSIST_OPT",
89   "IGNORE_FAILURES_OPT",
90   "IGNORE_OFFLINE_OPT",
91   "IGNORE_REMOVE_FAILURES_OPT",
92   "IGNORE_SECONDARIES_OPT",
93   "IGNORE_SIZE_OPT",
94   "INTERVAL_OPT",
95   "MAC_PREFIX_OPT",
96   "MAINTAIN_NODE_HEALTH_OPT",
97   "MASTER_NETDEV_OPT",
98   "MC_OPT",
99   "MIGRATION_MODE_OPT",
100   "NET_OPT",
101   "NEW_CLUSTER_CERT_OPT",
102   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
103   "NEW_CONFD_HMAC_KEY_OPT",
104   "NEW_RAPI_CERT_OPT",
105   "NEW_SECONDARY_OPT",
106   "NIC_PARAMS_OPT",
107   "NODE_FORCE_JOIN_OPT",
108   "NODE_LIST_OPT",
109   "NODE_PLACEMENT_OPT",
110   "NODEGROUP_OPT",
111   "NODE_PARAMS_OPT",
112   "NODE_POWERED_OPT",
113   "NODRBD_STORAGE_OPT",
114   "NOHDR_OPT",
115   "NOIPCHECK_OPT",
116   "NO_INSTALL_OPT",
117   "NONAMECHECK_OPT",
118   "NOLVM_STORAGE_OPT",
119   "NOMODIFY_ETCHOSTS_OPT",
120   "NOMODIFY_SSH_SETUP_OPT",
121   "NONICS_OPT",
122   "NONLIVE_OPT",
123   "NONPLUS1_OPT",
124   "NOSHUTDOWN_OPT",
125   "NOSTART_OPT",
126   "NOSSH_KEYCHECK_OPT",
127   "NOVOTING_OPT",
128   "NWSYNC_OPT",
129   "ON_PRIMARY_OPT",
130   "ON_SECONDARY_OPT",
131   "OFFLINE_OPT",
132   "OSPARAMS_OPT",
133   "OS_OPT",
134   "OS_SIZE_OPT",
135   "PREALLOC_WIPE_DISKS_OPT",
136   "PRIMARY_IP_VERSION_OPT",
137   "PRIORITY_OPT",
138   "RAPI_CERT_OPT",
139   "READD_OPT",
140   "REBOOT_TYPE_OPT",
141   "REMOVE_INSTANCE_OPT",
142   "REMOVE_UIDS_OPT",
143   "RESERVED_LVS_OPT",
144   "ROMAN_OPT",
145   "SECONDARY_IP_OPT",
146   "SELECT_OS_OPT",
147   "SEP_OPT",
148   "SHOWCMD_OPT",
149   "SHUTDOWN_TIMEOUT_OPT",
150   "SINGLE_NODE_OPT",
151   "SRC_DIR_OPT",
152   "SRC_NODE_OPT",
153   "SUBMIT_OPT",
154   "STATIC_OPT",
155   "SYNC_OPT",
156   "TAG_SRC_OPT",
157   "TIMEOUT_OPT",
158   "UIDPOOL_OPT",
159   "USEUNITS_OPT",
160   "USE_REPL_NET_OPT",
161   "VERBOSE_OPT",
162   "VG_NAME_OPT",
163   "YES_DOIT_OPT",
164   # Generic functions for CLI programs
165   "GenericMain",
166   "GenericInstanceCreate",
167   "GenericList",
168   "GenericListFields",
169   "GetClient",
170   "GetOnlineNodes",
171   "JobExecutor",
172   "JobSubmittedException",
173   "ParseTimespec",
174   "RunWhileClusterStopped",
175   "SubmitOpCode",
176   "SubmitOrSend",
177   "UsesRPC",
178   # Formatting functions
179   "ToStderr", "ToStdout",
180   "FormatError",
181   "FormatQueryResult",
182   "FormatParameterDict",
183   "GenerateTable",
184   "AskUser",
185   "FormatTimestamp",
186   "FormatLogMessage",
187   # Tags functions
188   "ListTags",
189   "AddTags",
190   "RemoveTags",
191   # command line options support infrastructure
192   "ARGS_MANY_INSTANCES",
193   "ARGS_MANY_NODES",
194   "ARGS_MANY_GROUPS",
195   "ARGS_NONE",
196   "ARGS_ONE_INSTANCE",
197   "ARGS_ONE_NODE",
198   "ARGS_ONE_GROUP",
199   "ARGS_ONE_OS",
200   "ArgChoice",
201   "ArgCommand",
202   "ArgFile",
203   "ArgGroup",
204   "ArgHost",
205   "ArgInstance",
206   "ArgJobId",
207   "ArgNode",
208   "ArgOs",
209   "ArgSuggest",
210   "ArgUnknown",
211   "OPT_COMPL_INST_ADD_NODES",
212   "OPT_COMPL_MANY_NODES",
213   "OPT_COMPL_ONE_IALLOCATOR",
214   "OPT_COMPL_ONE_INSTANCE",
215   "OPT_COMPL_ONE_NODE",
216   "OPT_COMPL_ONE_NODEGROUP",
217   "OPT_COMPL_ONE_OS",
218   "cli_option",
219   "SplitNodeOption",
220   "CalculateOSNames",
221   "ParseFields",
222   "COMMON_CREATE_OPTS",
223   ]
224
225 NO_PREFIX = "no_"
226 UN_PREFIX = "-"
227
228 #: Priorities (sorted)
229 _PRIORITY_NAMES = [
230   ("low", constants.OP_PRIO_LOW),
231   ("normal", constants.OP_PRIO_NORMAL),
232   ("high", constants.OP_PRIO_HIGH),
233   ]
234
235 #: Priority dictionary for easier lookup
236 # TODO: Replace this and _PRIORITY_NAMES with a single sorted dictionary once
237 # we migrate to Python 2.6
238 _PRIONAME_TO_VALUE = dict(_PRIORITY_NAMES)
239
240 # Query result status for clients
241 (QR_NORMAL,
242  QR_UNKNOWN,
243  QR_INCOMPLETE) = range(3)
244
245
246 class _Argument:
247   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
248     self.min = min
249     self.max = max
250
251   def __repr__(self):
252     return ("<%s min=%s max=%s>" %
253             (self.__class__.__name__, self.min, self.max))
254
255
256 class ArgSuggest(_Argument):
257   """Suggesting argument.
258
259   Value can be any of the ones passed to the constructor.
260
261   """
262   # pylint: disable-msg=W0622
263   def __init__(self, min=0, max=None, choices=None):
264     _Argument.__init__(self, min=min, max=max)
265     self.choices = choices
266
267   def __repr__(self):
268     return ("<%s min=%s max=%s choices=%r>" %
269             (self.__class__.__name__, self.min, self.max, self.choices))
270
271
272 class ArgChoice(ArgSuggest):
273   """Choice argument.
274
275   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
276   but value must be one of the choices.
277
278   """
279
280
281 class ArgUnknown(_Argument):
282   """Unknown argument to program (e.g. determined at runtime).
283
284   """
285
286
287 class ArgInstance(_Argument):
288   """Instances argument.
289
290   """
291
292
293 class ArgNode(_Argument):
294   """Node argument.
295
296   """
297
298
299 class ArgGroup(_Argument):
300   """Node group argument.
301
302   """
303
304
305 class ArgJobId(_Argument):
306   """Job ID argument.
307
308   """
309
310
311 class ArgFile(_Argument):
312   """File path argument.
313
314   """
315
316
317 class ArgCommand(_Argument):
318   """Command argument.
319
320   """
321
322
323 class ArgHost(_Argument):
324   """Host argument.
325
326   """
327
328
329 class ArgOs(_Argument):
330   """OS argument.
331
332   """
333
334
335 ARGS_NONE = []
336 ARGS_MANY_INSTANCES = [ArgInstance()]
337 ARGS_MANY_NODES = [ArgNode()]
338 ARGS_MANY_GROUPS = [ArgGroup()]
339 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
340 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
341 ARGS_ONE_GROUP = [ArgInstance(min=1, max=1)]
342 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
343
344
345 def _ExtractTagsObject(opts, args):
346   """Extract the tag type object.
347
348   Note that this function will modify its args parameter.
349
350   """
351   if not hasattr(opts, "tag_type"):
352     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
353   kind = opts.tag_type
354   if kind == constants.TAG_CLUSTER:
355     retval = kind, kind
356   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
357     if not args:
358       raise errors.OpPrereqError("no arguments passed to the command")
359     name = args.pop(0)
360     retval = kind, name
361   else:
362     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
363   return retval
364
365
366 def _ExtendTags(opts, args):
367   """Extend the args if a source file has been given.
368
369   This function will extend the tags with the contents of the file
370   passed in the 'tags_source' attribute of the opts parameter. A file
371   named '-' will be replaced by stdin.
372
373   """
374   fname = opts.tags_source
375   if fname is None:
376     return
377   if fname == "-":
378     new_fh = sys.stdin
379   else:
380     new_fh = open(fname, "r")
381   new_data = []
382   try:
383     # we don't use the nice 'new_data = [line.strip() for line in fh]'
384     # because of python bug 1633941
385     while True:
386       line = new_fh.readline()
387       if not line:
388         break
389       new_data.append(line.strip())
390   finally:
391     new_fh.close()
392   args.extend(new_data)
393
394
395 def ListTags(opts, args):
396   """List the tags on a given object.
397
398   This is a generic implementation that knows how to deal with all
399   three cases of tag objects (cluster, node, instance). The opts
400   argument is expected to contain a tag_type field denoting what
401   object type we work on.
402
403   """
404   kind, name = _ExtractTagsObject(opts, args)
405   cl = GetClient()
406   result = cl.QueryTags(kind, name)
407   result = list(result)
408   result.sort()
409   for tag in result:
410     ToStdout(tag)
411
412
413 def AddTags(opts, args):
414   """Add tags on a given object.
415
416   This is a generic implementation that knows how to deal with all
417   three cases of tag objects (cluster, node, instance). The opts
418   argument is expected to contain a tag_type field denoting what
419   object type we work on.
420
421   """
422   kind, name = _ExtractTagsObject(opts, args)
423   _ExtendTags(opts, args)
424   if not args:
425     raise errors.OpPrereqError("No tags to be added")
426   op = opcodes.OpTagsSet(kind=kind, name=name, tags=args)
427   SubmitOpCode(op, opts=opts)
428
429
430 def RemoveTags(opts, args):
431   """Remove tags from a given object.
432
433   This is a generic implementation that knows how to deal with all
434   three cases of tag objects (cluster, node, instance). The opts
435   argument is expected to contain a tag_type field denoting what
436   object type we work on.
437
438   """
439   kind, name = _ExtractTagsObject(opts, args)
440   _ExtendTags(opts, args)
441   if not args:
442     raise errors.OpPrereqError("No tags to be removed")
443   op = opcodes.OpTagsDel(kind=kind, name=name, tags=args)
444   SubmitOpCode(op, opts=opts)
445
446
447 def check_unit(option, opt, value): # pylint: disable-msg=W0613
448   """OptParsers custom converter for units.
449
450   """
451   try:
452     return utils.ParseUnit(value)
453   except errors.UnitParseError, err:
454     raise OptionValueError("option %s: %s" % (opt, err))
455
456
457 def _SplitKeyVal(opt, data):
458   """Convert a KeyVal string into a dict.
459
460   This function will convert a key=val[,...] string into a dict. Empty
461   values will be converted specially: keys which have the prefix 'no_'
462   will have the value=False and the prefix stripped, the others will
463   have value=True.
464
465   @type opt: string
466   @param opt: a string holding the option name for which we process the
467       data, used in building error messages
468   @type data: string
469   @param data: a string of the format key=val,key=val,...
470   @rtype: dict
471   @return: {key=val, key=val}
472   @raises errors.ParameterError: if there are duplicate keys
473
474   """
475   kv_dict = {}
476   if data:
477     for elem in utils.UnescapeAndSplit(data, sep=","):
478       if "=" in elem:
479         key, val = elem.split("=", 1)
480       else:
481         if elem.startswith(NO_PREFIX):
482           key, val = elem[len(NO_PREFIX):], False
483         elif elem.startswith(UN_PREFIX):
484           key, val = elem[len(UN_PREFIX):], None
485         else:
486           key, val = elem, True
487       if key in kv_dict:
488         raise errors.ParameterError("Duplicate key '%s' in option %s" %
489                                     (key, opt))
490       kv_dict[key] = val
491   return kv_dict
492
493
494 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
495   """Custom parser for ident:key=val,key=val options.
496
497   This will store the parsed values as a tuple (ident, {key: val}). As such,
498   multiple uses of this option via action=append is possible.
499
500   """
501   if ":" not in value:
502     ident, rest = value, ''
503   else:
504     ident, rest = value.split(":", 1)
505
506   if ident.startswith(NO_PREFIX):
507     if rest:
508       msg = "Cannot pass options when removing parameter groups: %s" % value
509       raise errors.ParameterError(msg)
510     retval = (ident[len(NO_PREFIX):], False)
511   elif ident.startswith(UN_PREFIX):
512     if rest:
513       msg = "Cannot pass options when removing parameter groups: %s" % value
514       raise errors.ParameterError(msg)
515     retval = (ident[len(UN_PREFIX):], None)
516   else:
517     kv_dict = _SplitKeyVal(opt, rest)
518     retval = (ident, kv_dict)
519   return retval
520
521
522 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
523   """Custom parser class for key=val,key=val options.
524
525   This will store the parsed values as a dict {key: val}.
526
527   """
528   return _SplitKeyVal(opt, value)
529
530
531 def check_bool(option, opt, value): # pylint: disable-msg=W0613
532   """Custom parser for yes/no options.
533
534   This will store the parsed value as either True or False.
535
536   """
537   value = value.lower()
538   if value == constants.VALUE_FALSE or value == "no":
539     return False
540   elif value == constants.VALUE_TRUE or value == "yes":
541     return True
542   else:
543     raise errors.ParameterError("Invalid boolean value '%s'" % value)
544
545
546 # completion_suggestion is normally a list. Using numeric values not evaluating
547 # to False for dynamic completion.
548 (OPT_COMPL_MANY_NODES,
549  OPT_COMPL_ONE_NODE,
550  OPT_COMPL_ONE_INSTANCE,
551  OPT_COMPL_ONE_OS,
552  OPT_COMPL_ONE_IALLOCATOR,
553  OPT_COMPL_INST_ADD_NODES,
554  OPT_COMPL_ONE_NODEGROUP) = range(100, 107)
555
556 OPT_COMPL_ALL = frozenset([
557   OPT_COMPL_MANY_NODES,
558   OPT_COMPL_ONE_NODE,
559   OPT_COMPL_ONE_INSTANCE,
560   OPT_COMPL_ONE_OS,
561   OPT_COMPL_ONE_IALLOCATOR,
562   OPT_COMPL_INST_ADD_NODES,
563   OPT_COMPL_ONE_NODEGROUP,
564   ])
565
566
567 class CliOption(Option):
568   """Custom option class for optparse.
569
570   """
571   ATTRS = Option.ATTRS + [
572     "completion_suggest",
573     ]
574   TYPES = Option.TYPES + (
575     "identkeyval",
576     "keyval",
577     "unit",
578     "bool",
579     )
580   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
581   TYPE_CHECKER["identkeyval"] = check_ident_key_val
582   TYPE_CHECKER["keyval"] = check_key_val
583   TYPE_CHECKER["unit"] = check_unit
584   TYPE_CHECKER["bool"] = check_bool
585
586
587 # optparse.py sets make_option, so we do it for our own option class, too
588 cli_option = CliOption
589
590
591 _YORNO = "yes|no"
592
593 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
594                        help="Increase debugging level")
595
596 NOHDR_OPT = cli_option("--no-headers", default=False,
597                        action="store_true", dest="no_headers",
598                        help="Don't display column headers")
599
600 SEP_OPT = cli_option("--separator", default=None,
601                      action="store", dest="separator",
602                      help=("Separator between output fields"
603                            " (defaults to one space)"))
604
605 USEUNITS_OPT = cli_option("--units", default=None,
606                           dest="units", choices=('h', 'm', 'g', 't'),
607                           help="Specify units for output (one of h/m/g/t)")
608
609 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
610                         type="string", metavar="FIELDS",
611                         help="Comma separated list of output fields")
612
613 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
614                        default=False, help="Force the operation")
615
616 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
617                          default=False, help="Do not require confirmation")
618
619 IGNORE_OFFLINE_OPT = cli_option("--ignore-offline", dest="ignore_offline",
620                                   action="store_true", default=False,
621                                   help=("Ignore offline nodes and do as much"
622                                         " as possible"))
623
624 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
625                          default=None, help="File with tag names")
626
627 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
628                         default=False, action="store_true",
629                         help=("Submit the job and return the job ID, but"
630                               " don't wait for the job to finish"))
631
632 SYNC_OPT = cli_option("--sync", dest="do_locking",
633                       default=False, action="store_true",
634                       help=("Grab locks while doing the queries"
635                             " in order to ensure more consistent results"))
636
637 DRY_RUN_OPT = cli_option("--dry-run", default=False,
638                          action="store_true",
639                          help=("Do not execute the operation, just run the"
640                                " check steps and verify it it could be"
641                                " executed"))
642
643 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
644                          action="store_true",
645                          help="Increase the verbosity of the operation")
646
647 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
648                               action="store_true", dest="simulate_errors",
649                               help="Debugging option that makes the operation"
650                               " treat most runtime checks as failed")
651
652 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
653                         default=True, action="store_false",
654                         help="Don't wait for sync (DANGEROUS!)")
655
656 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
657                                help="Custom disk setup (diskless, file,"
658                                " plain or drbd)",
659                                default=None, metavar="TEMPL",
660                                choices=list(constants.DISK_TEMPLATES))
661
662 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
663                         help="Do not create any network cards for"
664                         " the instance")
665
666 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
667                                help="Relative path under default cluster-wide"
668                                " file storage dir to store file-based disks",
669                                default=None, metavar="<DIR>")
670
671 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
672                                   help="Driver to use for image files",
673                                   default="loop", metavar="<DRIVER>",
674                                   choices=list(constants.FILE_DRIVER))
675
676 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
677                             help="Select nodes for the instance automatically"
678                             " using the <NAME> iallocator plugin",
679                             default=None, type="string",
680                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
681
682 DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
683                             metavar="<NAME>",
684                             help="Set the default instance allocator plugin",
685                             default=None, type="string",
686                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
687
688 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
689                     metavar="<os>",
690                     completion_suggest=OPT_COMPL_ONE_OS)
691
692 OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
693                          type="keyval", default={},
694                          help="OS parameters")
695
696 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
697                                action="store_true", default=False,
698                                help="Force an unknown variant")
699
700 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
701                             action="store_true", default=False,
702                             help="Do not install the OS (will"
703                             " enable no-start)")
704
705 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
706                          type="keyval", default={},
707                          help="Backend parameters")
708
709 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
710                          default={}, dest="hvparams",
711                          help="Hypervisor parameters")
712
713 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
714                             help="Hypervisor and hypervisor options, in the"
715                             " format hypervisor:option=value,option=value,...",
716                             default=None, type="identkeyval")
717
718 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
719                         help="Hypervisor and hypervisor options, in the"
720                         " format hypervisor:option=value,option=value,...",
721                         default=[], action="append", type="identkeyval")
722
723 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
724                            action="store_false",
725                            help="Don't check that the instance's IP"
726                            " is alive")
727
728 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
729                              default=True, action="store_false",
730                              help="Don't check that the instance's name"
731                              " is resolvable")
732
733 NET_OPT = cli_option("--net",
734                      help="NIC parameters", default=[],
735                      dest="nics", action="append", type="identkeyval")
736
737 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
738                       dest="disks", action="append", type="identkeyval")
739
740 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
741                          help="Comma-separated list of disks"
742                          " indices to act on (e.g. 0,2) (optional,"
743                          " defaults to all disks)")
744
745 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
746                          help="Enforces a single-disk configuration using the"
747                          " given disk size, in MiB unless a suffix is used",
748                          default=None, type="unit", metavar="<size>")
749
750 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
751                                 dest="ignore_consistency",
752                                 action="store_true", default=False,
753                                 help="Ignore the consistency of the disks on"
754                                 " the secondary")
755
756 NONLIVE_OPT = cli_option("--non-live", dest="live",
757                          default=True, action="store_false",
758                          help="Do a non-live migration (this usually means"
759                          " freeze the instance, save the state, transfer and"
760                          " only then resume running on the secondary node)")
761
762 MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode",
763                                 default=None,
764                                 choices=list(constants.HT_MIGRATION_MODES),
765                                 help="Override default migration mode (choose"
766                                 " either live or non-live")
767
768 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
769                                 help="Target node and optional secondary node",
770                                 metavar="<pnode>[:<snode>]",
771                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
772
773 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
774                            action="append", metavar="<node>",
775                            help="Use only this node (can be used multiple"
776                            " times, if not given defaults to all nodes)",
777                            completion_suggest=OPT_COMPL_ONE_NODE)
778
779 NODEGROUP_OPT = cli_option("-g", "--node-group",
780                            dest="nodegroup",
781                            help="Node group (name or uuid)",
782                            metavar="<nodegroup>",
783                            default=None, type="string",
784                            completion_suggest=OPT_COMPL_ONE_NODEGROUP)
785
786 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
787                              metavar="<node>",
788                              completion_suggest=OPT_COMPL_ONE_NODE)
789
790 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
791                          action="store_false",
792                          help="Don't start the instance after creation")
793
794 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
795                          action="store_true", default=False,
796                          help="Show command instead of executing it")
797
798 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
799                          default=False, action="store_true",
800                          help="Instead of performing the migration, try to"
801                          " recover from a failed cleanup. This is safe"
802                          " to run even if the instance is healthy, but it"
803                          " will create extra replication traffic and "
804                          " disrupt briefly the replication (like during the"
805                          " migration")
806
807 STATIC_OPT = cli_option("-s", "--static", dest="static",
808                         action="store_true", default=False,
809                         help="Only show configuration data, not runtime data")
810
811 ALL_OPT = cli_option("--all", dest="show_all",
812                      default=False, action="store_true",
813                      help="Show info on all instances on the cluster."
814                      " This can take a long time to run, use wisely")
815
816 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
817                            action="store_true", default=False,
818                            help="Interactive OS reinstall, lists available"
819                            " OS templates for selection")
820
821 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
822                                  action="store_true", default=False,
823                                  help="Remove the instance from the cluster"
824                                  " configuration even if there are failures"
825                                  " during the removal process")
826
827 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
828                                         dest="ignore_remove_failures",
829                                         action="store_true", default=False,
830                                         help="Remove the instance from the"
831                                         " cluster configuration even if there"
832                                         " are failures during the removal"
833                                         " process")
834
835 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
836                                  action="store_true", default=False,
837                                  help="Remove the instance from the cluster")
838
839 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
840                                help="Specifies the new secondary node",
841                                metavar="NODE", default=None,
842                                completion_suggest=OPT_COMPL_ONE_NODE)
843
844 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
845                             default=False, action="store_true",
846                             help="Replace the disk(s) on the primary"
847                             " node (only for the drbd template)")
848
849 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
850                               default=False, action="store_true",
851                               help="Replace the disk(s) on the secondary"
852                               " node (only for the drbd template)")
853
854 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
855                               default=False, action="store_true",
856                               help="Lock all nodes and auto-promote as needed"
857                               " to MC status")
858
859 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
860                               default=False, action="store_true",
861                               help="Automatically replace faulty disks"
862                               " (only for the drbd template)")
863
864 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
865                              default=False, action="store_true",
866                              help="Ignore current recorded size"
867                              " (useful for forcing activation when"
868                              " the recorded size is wrong)")
869
870 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
871                           metavar="<node>",
872                           completion_suggest=OPT_COMPL_ONE_NODE)
873
874 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
875                          metavar="<dir>")
876
877 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
878                               help="Specify the secondary ip for the node",
879                               metavar="ADDRESS", default=None)
880
881 READD_OPT = cli_option("--readd", dest="readd",
882                        default=False, action="store_true",
883                        help="Readd old node after replacing it")
884
885 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
886                                 default=True, action="store_false",
887                                 help="Disable SSH key fingerprint checking")
888
889 NODE_FORCE_JOIN_OPT = cli_option("--force-join", dest="force_join",
890                                  default=False, action="store_true",
891                                  help="Force the joining of a node,"
892                                       " needed when merging clusters")
893
894 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
895                     type="bool", default=None, metavar=_YORNO,
896                     help="Set the master_candidate flag on the node")
897
898 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
899                          type="bool", default=None,
900                          help=("Set the offline flag on the node"
901                                " (cluster does not communicate with offline"
902                                " nodes)"))
903
904 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
905                          type="bool", default=None,
906                          help=("Set the drained flag on the node"
907                                " (excluded from allocation operations)"))
908
909 CAPAB_MASTER_OPT = cli_option("--master-capable", dest="master_capable",
910                     type="bool", default=None, metavar=_YORNO,
911                     help="Set the master_capable flag on the node")
912
913 CAPAB_VM_OPT = cli_option("--vm-capable", dest="vm_capable",
914                     type="bool", default=None, metavar=_YORNO,
915                     help="Set the vm_capable flag on the node")
916
917 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
918                              type="bool", default=None, metavar=_YORNO,
919                              help="Set the allocatable flag on a volume")
920
921 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
922                                help="Disable support for lvm based instances"
923                                " (cluster-wide)",
924                                action="store_false", default=True)
925
926 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
927                             dest="enabled_hypervisors",
928                             help="Comma-separated list of hypervisors",
929                             type="string", default=None)
930
931 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
932                             type="keyval", default={},
933                             help="NIC parameters")
934
935 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
936                          dest="candidate_pool_size", type="int",
937                          help="Set the candidate pool size")
938
939 VG_NAME_OPT = cli_option("--vg-name", dest="vg_name",
940                          help=("Enables LVM and specifies the volume group"
941                                " name (cluster-wide) for disk allocation"
942                                " [%s]" % constants.DEFAULT_VG),
943                          metavar="VG", default=None)
944
945 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
946                           help="Destroy cluster", action="store_true")
947
948 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
949                           help="Skip node agreement check (dangerous)",
950                           action="store_true", default=False)
951
952 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
953                             help="Specify the mac prefix for the instance IP"
954                             " addresses, in the format XX:XX:XX",
955                             metavar="PREFIX",
956                             default=None)
957
958 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
959                                help="Specify the node interface (cluster-wide)"
960                                " on which the master IP address will be added"
961                                " (cluster init default: %s)" %
962                                constants.DEFAULT_BRIDGE,
963                                metavar="NETDEV",
964                                default=None)
965
966 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
967                                 help="Specify the default directory (cluster-"
968                                 "wide) for storing the file-based disks [%s]" %
969                                 constants.DEFAULT_FILE_STORAGE_DIR,
970                                 metavar="DIR",
971                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
972
973 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
974                                    help="Don't modify /etc/hosts",
975                                    action="store_false", default=True)
976
977 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
978                                     help="Don't initialize SSH keys",
979                                     action="store_false", default=True)
980
981 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
982                              help="Enable parseable error messages",
983                              action="store_true", default=False)
984
985 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
986                           help="Skip N+1 memory redundancy tests",
987                           action="store_true", default=False)
988
989 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
990                              help="Type of reboot: soft/hard/full",
991                              default=constants.INSTANCE_REBOOT_HARD,
992                              metavar="<REBOOT>",
993                              choices=list(constants.REBOOT_TYPES))
994
995 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
996                                     dest="ignore_secondaries",
997                                     default=False, action="store_true",
998                                     help="Ignore errors from secondaries")
999
1000 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
1001                             action="store_false", default=True,
1002                             help="Don't shutdown the instance (unsafe)")
1003
1004 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
1005                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
1006                          help="Maximum time to wait")
1007
1008 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
1009                          dest="shutdown_timeout", type="int",
1010                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
1011                          help="Maximum time to wait for instance shutdown")
1012
1013 INTERVAL_OPT = cli_option("--interval", dest="interval", type="int",
1014                           default=None,
1015                           help=("Number of seconds between repetions of the"
1016                                 " command"))
1017
1018 EARLY_RELEASE_OPT = cli_option("--early-release",
1019                                dest="early_release", default=False,
1020                                action="store_true",
1021                                help="Release the locks on the secondary"
1022                                " node(s) early")
1023
1024 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
1025                                   dest="new_cluster_cert",
1026                                   default=False, action="store_true",
1027                                   help="Generate a new cluster certificate")
1028
1029 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
1030                            default=None,
1031                            help="File containing new RAPI certificate")
1032
1033 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
1034                                default=None, action="store_true",
1035                                help=("Generate a new self-signed RAPI"
1036                                      " certificate"))
1037
1038 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
1039                                     dest="new_confd_hmac_key",
1040                                     default=False, action="store_true",
1041                                     help=("Create a new HMAC key for %s" %
1042                                           constants.CONFD))
1043
1044 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
1045                                        dest="cluster_domain_secret",
1046                                        default=None,
1047                                        help=("Load new new cluster domain"
1048                                              " secret from file"))
1049
1050 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
1051                                            dest="new_cluster_domain_secret",
1052                                            default=False, action="store_true",
1053                                            help=("Create a new cluster domain"
1054                                                  " secret"))
1055
1056 USE_REPL_NET_OPT = cli_option("--use-replication-network",
1057                               dest="use_replication_network",
1058                               help="Whether to use the replication network"
1059                               " for talking to the nodes",
1060                               action="store_true", default=False)
1061
1062 MAINTAIN_NODE_HEALTH_OPT = \
1063     cli_option("--maintain-node-health", dest="maintain_node_health",
1064                metavar=_YORNO, default=None, type="bool",
1065                help="Configure the cluster to automatically maintain node"
1066                " health, by shutting down unknown instances, shutting down"
1067                " unknown DRBD devices, etc.")
1068
1069 IDENTIFY_DEFAULTS_OPT = \
1070     cli_option("--identify-defaults", dest="identify_defaults",
1071                default=False, action="store_true",
1072                help="Identify which saved instance parameters are equal to"
1073                " the current cluster defaults and set them as such, instead"
1074                " of marking them as overridden")
1075
1076 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
1077                          action="store", dest="uid_pool",
1078                          help=("A list of user-ids or user-id"
1079                                " ranges separated by commas"))
1080
1081 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
1082                           action="store", dest="add_uids",
1083                           help=("A list of user-ids or user-id"
1084                                 " ranges separated by commas, to be"
1085                                 " added to the user-id pool"))
1086
1087 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
1088                              action="store", dest="remove_uids",
1089                              help=("A list of user-ids or user-id"
1090                                    " ranges separated by commas, to be"
1091                                    " removed from the user-id pool"))
1092
1093 RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None,
1094                              action="store", dest="reserved_lvs",
1095                              help=("A comma-separated list of reserved"
1096                                    " logical volumes names, that will be"
1097                                    " ignored by cluster verify"))
1098
1099 ROMAN_OPT = cli_option("--roman",
1100                        dest="roman_integers", default=False,
1101                        action="store_true",
1102                        help="Use roman numbers for positive integers")
1103
1104 DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1105                              action="store", default=None,
1106                              help="Specifies usermode helper for DRBD")
1107
1108 NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1109                                 action="store_false", default=True,
1110                                 help="Disable support for DRBD")
1111
1112 PRIMARY_IP_VERSION_OPT = \
1113     cli_option("--primary-ip-version", default=constants.IP4_VERSION,
1114                action="store", dest="primary_ip_version",
1115                metavar="%d|%d" % (constants.IP4_VERSION,
1116                                   constants.IP6_VERSION),
1117                help="Cluster-wide IP version for primary IP")
1118
1119 PRIORITY_OPT = cli_option("--priority", default=None, dest="priority",
1120                           metavar="|".join(name for name, _ in _PRIORITY_NAMES),
1121                           choices=_PRIONAME_TO_VALUE.keys(),
1122                           help="Priority for opcode processing")
1123
1124 HID_OS_OPT = cli_option("--hidden", dest="hidden",
1125                         type="bool", default=None, metavar=_YORNO,
1126                         help="Sets the hidden flag on the OS")
1127
1128 BLK_OS_OPT = cli_option("--blacklisted", dest="blacklisted",
1129                         type="bool", default=None, metavar=_YORNO,
1130                         help="Sets the blacklisted flag on the OS")
1131
1132 PREALLOC_WIPE_DISKS_OPT = cli_option("--prealloc-wipe-disks", default=None,
1133                                      type="bool", metavar=_YORNO,
1134                                      dest="prealloc_wipe_disks",
1135                                      help=("Wipe disks prior to instance"
1136                                            " creation"))
1137
1138 NODE_PARAMS_OPT = cli_option("--node-parameters", dest="ndparams",
1139                              type="keyval", default=None,
1140                              help="Node parameters")
1141
1142 ALLOC_POLICY_OPT = cli_option("--alloc-policy", dest="alloc_policy",
1143                               action="store", metavar="POLICY", default=None,
1144                               help="Allocation policy for the node group")
1145
1146 NODE_POWERED_OPT = cli_option("--node-powered", default=None,
1147                               type="bool", metavar=_YORNO,
1148                               dest="node_powered",
1149                               help="Specify if the SoR for node is powered")
1150
1151
1152 #: Options provided by all commands
1153 COMMON_OPTS = [DEBUG_OPT]
1154
1155 # common options for creating instances. add and import then add their own
1156 # specific ones.
1157 COMMON_CREATE_OPTS = [
1158   BACKEND_OPT,
1159   DISK_OPT,
1160   DISK_TEMPLATE_OPT,
1161   FILESTORE_DIR_OPT,
1162   FILESTORE_DRIVER_OPT,
1163   HYPERVISOR_OPT,
1164   IALLOCATOR_OPT,
1165   NET_OPT,
1166   NODE_PLACEMENT_OPT,
1167   NOIPCHECK_OPT,
1168   NONAMECHECK_OPT,
1169   NONICS_OPT,
1170   NWSYNC_OPT,
1171   OSPARAMS_OPT,
1172   OS_SIZE_OPT,
1173   SUBMIT_OPT,
1174   DRY_RUN_OPT,
1175   PRIORITY_OPT,
1176   ]
1177
1178
1179 def _ParseArgs(argv, commands, aliases):
1180   """Parser for the command line arguments.
1181
1182   This function parses the arguments and returns the function which
1183   must be executed together with its (modified) arguments.
1184
1185   @param argv: the command line
1186   @param commands: dictionary with special contents, see the design
1187       doc for cmdline handling
1188   @param aliases: dictionary with command aliases {'alias': 'target, ...}
1189
1190   """
1191   if len(argv) == 0:
1192     binary = "<command>"
1193   else:
1194     binary = argv[0].split("/")[-1]
1195
1196   if len(argv) > 1 and argv[1] == "--version":
1197     ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1198              constants.RELEASE_VERSION)
1199     # Quit right away. That way we don't have to care about this special
1200     # argument. optparse.py does it the same.
1201     sys.exit(0)
1202
1203   if len(argv) < 2 or not (argv[1] in commands or
1204                            argv[1] in aliases):
1205     # let's do a nice thing
1206     sortedcmds = commands.keys()
1207     sortedcmds.sort()
1208
1209     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1210     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1211     ToStdout("")
1212
1213     # compute the max line length for cmd + usage
1214     mlen = max([len(" %s" % cmd) for cmd in commands])
1215     mlen = min(60, mlen) # should not get here...
1216
1217     # and format a nice command list
1218     ToStdout("Commands:")
1219     for cmd in sortedcmds:
1220       cmdstr = " %s" % (cmd,)
1221       help_text = commands[cmd][4]
1222       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1223       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1224       for line in help_lines:
1225         ToStdout("%-*s   %s", mlen, "", line)
1226
1227     ToStdout("")
1228
1229     return None, None, None
1230
1231   # get command, unalias it, and look it up in commands
1232   cmd = argv.pop(1)
1233   if cmd in aliases:
1234     if cmd in commands:
1235       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1236                                    " command" % cmd)
1237
1238     if aliases[cmd] not in commands:
1239       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1240                                    " command '%s'" % (cmd, aliases[cmd]))
1241
1242     cmd = aliases[cmd]
1243
1244   func, args_def, parser_opts, usage, description = commands[cmd]
1245   parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
1246                         description=description,
1247                         formatter=TitledHelpFormatter(),
1248                         usage="%%prog %s %s" % (cmd, usage))
1249   parser.disable_interspersed_args()
1250   options, args = parser.parse_args()
1251
1252   if not _CheckArguments(cmd, args_def, args):
1253     return None, None, None
1254
1255   return func, options, args
1256
1257
1258 def _CheckArguments(cmd, args_def, args):
1259   """Verifies the arguments using the argument definition.
1260
1261   Algorithm:
1262
1263     1. Abort with error if values specified by user but none expected.
1264
1265     1. For each argument in definition
1266
1267       1. Keep running count of minimum number of values (min_count)
1268       1. Keep running count of maximum number of values (max_count)
1269       1. If it has an unlimited number of values
1270
1271         1. Abort with error if it's not the last argument in the definition
1272
1273     1. If last argument has limited number of values
1274
1275       1. Abort with error if number of values doesn't match or is too large
1276
1277     1. Abort with error if user didn't pass enough values (min_count)
1278
1279   """
1280   if args and not args_def:
1281     ToStderr("Error: Command %s expects no arguments", cmd)
1282     return False
1283
1284   min_count = None
1285   max_count = None
1286   check_max = None
1287
1288   last_idx = len(args_def) - 1
1289
1290   for idx, arg in enumerate(args_def):
1291     if min_count is None:
1292       min_count = arg.min
1293     elif arg.min is not None:
1294       min_count += arg.min
1295
1296     if max_count is None:
1297       max_count = arg.max
1298     elif arg.max is not None:
1299       max_count += arg.max
1300
1301     if idx == last_idx:
1302       check_max = (arg.max is not None)
1303
1304     elif arg.max is None:
1305       raise errors.ProgrammerError("Only the last argument can have max=None")
1306
1307   if check_max:
1308     # Command with exact number of arguments
1309     if (min_count is not None and max_count is not None and
1310         min_count == max_count and len(args) != min_count):
1311       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1312       return False
1313
1314     # Command with limited number of arguments
1315     if max_count is not None and len(args) > max_count:
1316       ToStderr("Error: Command %s expects only %d argument(s)",
1317                cmd, max_count)
1318       return False
1319
1320   # Command with some required arguments
1321   if min_count is not None and len(args) < min_count:
1322     ToStderr("Error: Command %s expects at least %d argument(s)",
1323              cmd, min_count)
1324     return False
1325
1326   return True
1327
1328
1329 def SplitNodeOption(value):
1330   """Splits the value of a --node option.
1331
1332   """
1333   if value and ':' in value:
1334     return value.split(':', 1)
1335   else:
1336     return (value, None)
1337
1338
1339 def CalculateOSNames(os_name, os_variants):
1340   """Calculates all the names an OS can be called, according to its variants.
1341
1342   @type os_name: string
1343   @param os_name: base name of the os
1344   @type os_variants: list or None
1345   @param os_variants: list of supported variants
1346   @rtype: list
1347   @return: list of valid names
1348
1349   """
1350   if os_variants:
1351     return ['%s+%s' % (os_name, v) for v in os_variants]
1352   else:
1353     return [os_name]
1354
1355
1356 def ParseFields(selected, default):
1357   """Parses the values of "--field"-like options.
1358
1359   @type selected: string or None
1360   @param selected: User-selected options
1361   @type default: list
1362   @param default: Default fields
1363
1364   """
1365   if selected is None:
1366     return default
1367
1368   if selected.startswith("+"):
1369     return default + selected[1:].split(",")
1370
1371   return selected.split(",")
1372
1373
1374 UsesRPC = rpc.RunWithRPC
1375
1376
1377 def AskUser(text, choices=None):
1378   """Ask the user a question.
1379
1380   @param text: the question to ask
1381
1382   @param choices: list with elements tuples (input_char, return_value,
1383       description); if not given, it will default to: [('y', True,
1384       'Perform the operation'), ('n', False, 'Do no do the operation')];
1385       note that the '?' char is reserved for help
1386
1387   @return: one of the return values from the choices list; if input is
1388       not possible (i.e. not running with a tty, we return the last
1389       entry from the list
1390
1391   """
1392   if choices is None:
1393     choices = [('y', True, 'Perform the operation'),
1394                ('n', False, 'Do not perform the operation')]
1395   if not choices or not isinstance(choices, list):
1396     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1397   for entry in choices:
1398     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1399       raise errors.ProgrammerError("Invalid choices element to AskUser")
1400
1401   answer = choices[-1][1]
1402   new_text = []
1403   for line in text.splitlines():
1404     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1405   text = "\n".join(new_text)
1406   try:
1407     f = file("/dev/tty", "a+")
1408   except IOError:
1409     return answer
1410   try:
1411     chars = [entry[0] for entry in choices]
1412     chars[-1] = "[%s]" % chars[-1]
1413     chars.append('?')
1414     maps = dict([(entry[0], entry[1]) for entry in choices])
1415     while True:
1416       f.write(text)
1417       f.write('\n')
1418       f.write("/".join(chars))
1419       f.write(": ")
1420       line = f.readline(2).strip().lower()
1421       if line in maps:
1422         answer = maps[line]
1423         break
1424       elif line == '?':
1425         for entry in choices:
1426           f.write(" %s - %s\n" % (entry[0], entry[2]))
1427         f.write("\n")
1428         continue
1429   finally:
1430     f.close()
1431   return answer
1432
1433
1434 class JobSubmittedException(Exception):
1435   """Job was submitted, client should exit.
1436
1437   This exception has one argument, the ID of the job that was
1438   submitted. The handler should print this ID.
1439
1440   This is not an error, just a structured way to exit from clients.
1441
1442   """
1443
1444
1445 def SendJob(ops, cl=None):
1446   """Function to submit an opcode without waiting for the results.
1447
1448   @type ops: list
1449   @param ops: list of opcodes
1450   @type cl: luxi.Client
1451   @param cl: the luxi client to use for communicating with the master;
1452              if None, a new client will be created
1453
1454   """
1455   if cl is None:
1456     cl = GetClient()
1457
1458   job_id = cl.SubmitJob(ops)
1459
1460   return job_id
1461
1462
1463 def GenericPollJob(job_id, cbs, report_cbs):
1464   """Generic job-polling function.
1465
1466   @type job_id: number
1467   @param job_id: Job ID
1468   @type cbs: Instance of L{JobPollCbBase}
1469   @param cbs: Data callbacks
1470   @type report_cbs: Instance of L{JobPollReportCbBase}
1471   @param report_cbs: Reporting callbacks
1472
1473   """
1474   prev_job_info = None
1475   prev_logmsg_serial = None
1476
1477   status = None
1478
1479   while True:
1480     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1481                                       prev_logmsg_serial)
1482     if not result:
1483       # job not found, go away!
1484       raise errors.JobLost("Job with id %s lost" % job_id)
1485
1486     if result == constants.JOB_NOTCHANGED:
1487       report_cbs.ReportNotChanged(job_id, status)
1488
1489       # Wait again
1490       continue
1491
1492     # Split result, a tuple of (field values, log entries)
1493     (job_info, log_entries) = result
1494     (status, ) = job_info
1495
1496     if log_entries:
1497       for log_entry in log_entries:
1498         (serial, timestamp, log_type, message) = log_entry
1499         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1500                                     log_type, message)
1501         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1502
1503     # TODO: Handle canceled and archived jobs
1504     elif status in (constants.JOB_STATUS_SUCCESS,
1505                     constants.JOB_STATUS_ERROR,
1506                     constants.JOB_STATUS_CANCELING,
1507                     constants.JOB_STATUS_CANCELED):
1508       break
1509
1510     prev_job_info = job_info
1511
1512   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1513   if not jobs:
1514     raise errors.JobLost("Job with id %s lost" % job_id)
1515
1516   status, opstatus, result = jobs[0]
1517
1518   if status == constants.JOB_STATUS_SUCCESS:
1519     return result
1520
1521   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1522     raise errors.OpExecError("Job was canceled")
1523
1524   has_ok = False
1525   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1526     if status == constants.OP_STATUS_SUCCESS:
1527       has_ok = True
1528     elif status == constants.OP_STATUS_ERROR:
1529       errors.MaybeRaise(msg)
1530
1531       if has_ok:
1532         raise errors.OpExecError("partial failure (opcode %d): %s" %
1533                                  (idx, msg))
1534
1535       raise errors.OpExecError(str(msg))
1536
1537   # default failure mode
1538   raise errors.OpExecError(result)
1539
1540
1541 class JobPollCbBase:
1542   """Base class for L{GenericPollJob} callbacks.
1543
1544   """
1545   def __init__(self):
1546     """Initializes this class.
1547
1548     """
1549
1550   def WaitForJobChangeOnce(self, job_id, fields,
1551                            prev_job_info, prev_log_serial):
1552     """Waits for changes on a job.
1553
1554     """
1555     raise NotImplementedError()
1556
1557   def QueryJobs(self, job_ids, fields):
1558     """Returns the selected fields for the selected job IDs.
1559
1560     @type job_ids: list of numbers
1561     @param job_ids: Job IDs
1562     @type fields: list of strings
1563     @param fields: Fields
1564
1565     """
1566     raise NotImplementedError()
1567
1568
1569 class JobPollReportCbBase:
1570   """Base class for L{GenericPollJob} reporting callbacks.
1571
1572   """
1573   def __init__(self):
1574     """Initializes this class.
1575
1576     """
1577
1578   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1579     """Handles a log message.
1580
1581     """
1582     raise NotImplementedError()
1583
1584   def ReportNotChanged(self, job_id, status):
1585     """Called for if a job hasn't changed in a while.
1586
1587     @type job_id: number
1588     @param job_id: Job ID
1589     @type status: string or None
1590     @param status: Job status if available
1591
1592     """
1593     raise NotImplementedError()
1594
1595
1596 class _LuxiJobPollCb(JobPollCbBase):
1597   def __init__(self, cl):
1598     """Initializes this class.
1599
1600     """
1601     JobPollCbBase.__init__(self)
1602     self.cl = cl
1603
1604   def WaitForJobChangeOnce(self, job_id, fields,
1605                            prev_job_info, prev_log_serial):
1606     """Waits for changes on a job.
1607
1608     """
1609     return self.cl.WaitForJobChangeOnce(job_id, fields,
1610                                         prev_job_info, prev_log_serial)
1611
1612   def QueryJobs(self, job_ids, fields):
1613     """Returns the selected fields for the selected job IDs.
1614
1615     """
1616     return self.cl.QueryJobs(job_ids, fields)
1617
1618
1619 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1620   def __init__(self, feedback_fn):
1621     """Initializes this class.
1622
1623     """
1624     JobPollReportCbBase.__init__(self)
1625
1626     self.feedback_fn = feedback_fn
1627
1628     assert callable(feedback_fn)
1629
1630   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1631     """Handles a log message.
1632
1633     """
1634     self.feedback_fn((timestamp, log_type, log_msg))
1635
1636   def ReportNotChanged(self, job_id, status):
1637     """Called if a job hasn't changed in a while.
1638
1639     """
1640     # Ignore
1641
1642
1643 class StdioJobPollReportCb(JobPollReportCbBase):
1644   def __init__(self):
1645     """Initializes this class.
1646
1647     """
1648     JobPollReportCbBase.__init__(self)
1649
1650     self.notified_queued = False
1651     self.notified_waitlock = False
1652
1653   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1654     """Handles a log message.
1655
1656     """
1657     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1658              FormatLogMessage(log_type, log_msg))
1659
1660   def ReportNotChanged(self, job_id, status):
1661     """Called if a job hasn't changed in a while.
1662
1663     """
1664     if status is None:
1665       return
1666
1667     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1668       ToStderr("Job %s is waiting in queue", job_id)
1669       self.notified_queued = True
1670
1671     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1672       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1673       self.notified_waitlock = True
1674
1675
1676 def FormatLogMessage(log_type, log_msg):
1677   """Formats a job message according to its type.
1678
1679   """
1680   if log_type != constants.ELOG_MESSAGE:
1681     log_msg = str(log_msg)
1682
1683   return utils.SafeEncode(log_msg)
1684
1685
1686 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1687   """Function to poll for the result of a job.
1688
1689   @type job_id: job identified
1690   @param job_id: the job to poll for results
1691   @type cl: luxi.Client
1692   @param cl: the luxi client to use for communicating with the master;
1693              if None, a new client will be created
1694
1695   """
1696   if cl is None:
1697     cl = GetClient()
1698
1699   if reporter is None:
1700     if feedback_fn:
1701       reporter = FeedbackFnJobPollReportCb(feedback_fn)
1702     else:
1703       reporter = StdioJobPollReportCb()
1704   elif feedback_fn:
1705     raise errors.ProgrammerError("Can't specify reporter and feedback function")
1706
1707   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1708
1709
1710 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1711   """Legacy function to submit an opcode.
1712
1713   This is just a simple wrapper over the construction of the processor
1714   instance. It should be extended to better handle feedback and
1715   interaction functions.
1716
1717   """
1718   if cl is None:
1719     cl = GetClient()
1720
1721   SetGenericOpcodeOpts([op], opts)
1722
1723   job_id = SendJob([op], cl=cl)
1724
1725   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1726                        reporter=reporter)
1727
1728   return op_results[0]
1729
1730
1731 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1732   """Wrapper around SubmitOpCode or SendJob.
1733
1734   This function will decide, based on the 'opts' parameter, whether to
1735   submit and wait for the result of the opcode (and return it), or
1736   whether to just send the job and print its identifier. It is used in
1737   order to simplify the implementation of the '--submit' option.
1738
1739   It will also process the opcodes if we're sending the via SendJob
1740   (otherwise SubmitOpCode does it).
1741
1742   """
1743   if opts and opts.submit_only:
1744     job = [op]
1745     SetGenericOpcodeOpts(job, opts)
1746     job_id = SendJob(job, cl=cl)
1747     raise JobSubmittedException(job_id)
1748   else:
1749     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1750
1751
1752 def SetGenericOpcodeOpts(opcode_list, options):
1753   """Processor for generic options.
1754
1755   This function updates the given opcodes based on generic command
1756   line options (like debug, dry-run, etc.).
1757
1758   @param opcode_list: list of opcodes
1759   @param options: command line options or None
1760   @return: None (in-place modification)
1761
1762   """
1763   if not options:
1764     return
1765   for op in opcode_list:
1766     op.debug_level = options.debug
1767     if hasattr(options, "dry_run"):
1768       op.dry_run = options.dry_run
1769     if getattr(options, "priority", None) is not None:
1770       op.priority = _PRIONAME_TO_VALUE[options.priority]
1771
1772
1773 def GetClient():
1774   # TODO: Cache object?
1775   try:
1776     client = luxi.Client()
1777   except luxi.NoMasterError:
1778     ss = ssconf.SimpleStore()
1779
1780     # Try to read ssconf file
1781     try:
1782       ss.GetMasterNode()
1783     except errors.ConfigurationError:
1784       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1785                                  " not part of a cluster")
1786
1787     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1788     if master != myself:
1789       raise errors.OpPrereqError("This is not the master node, please connect"
1790                                  " to node '%s' and rerun the command" %
1791                                  master)
1792     raise
1793   return client
1794
1795
1796 def FormatError(err):
1797   """Return a formatted error message for a given error.
1798
1799   This function takes an exception instance and returns a tuple
1800   consisting of two values: first, the recommended exit code, and
1801   second, a string describing the error message (not
1802   newline-terminated).
1803
1804   """
1805   retcode = 1
1806   obuf = StringIO()
1807   msg = str(err)
1808   if isinstance(err, errors.ConfigurationError):
1809     txt = "Corrupt configuration file: %s" % msg
1810     logging.error(txt)
1811     obuf.write(txt + "\n")
1812     obuf.write("Aborting.")
1813     retcode = 2
1814   elif isinstance(err, errors.HooksAbort):
1815     obuf.write("Failure: hooks execution failed:\n")
1816     for node, script, out in err.args[0]:
1817       if out:
1818         obuf.write("  node: %s, script: %s, output: %s\n" %
1819                    (node, script, out))
1820       else:
1821         obuf.write("  node: %s, script: %s (no output)\n" %
1822                    (node, script))
1823   elif isinstance(err, errors.HooksFailure):
1824     obuf.write("Failure: hooks general failure: %s" % msg)
1825   elif isinstance(err, errors.ResolverError):
1826     this_host = netutils.Hostname.GetSysName()
1827     if err.args[0] == this_host:
1828       msg = "Failure: can't resolve my own hostname ('%s')"
1829     else:
1830       msg = "Failure: can't resolve hostname '%s'"
1831     obuf.write(msg % err.args[0])
1832   elif isinstance(err, errors.OpPrereqError):
1833     if len(err.args) == 2:
1834       obuf.write("Failure: prerequisites not met for this"
1835                " operation:\nerror type: %s, error details:\n%s" %
1836                  (err.args[1], err.args[0]))
1837     else:
1838       obuf.write("Failure: prerequisites not met for this"
1839                  " operation:\n%s" % msg)
1840   elif isinstance(err, errors.OpExecError):
1841     obuf.write("Failure: command execution error:\n%s" % msg)
1842   elif isinstance(err, errors.TagError):
1843     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1844   elif isinstance(err, errors.JobQueueDrainError):
1845     obuf.write("Failure: the job queue is marked for drain and doesn't"
1846                " accept new requests\n")
1847   elif isinstance(err, errors.JobQueueFull):
1848     obuf.write("Failure: the job queue is full and doesn't accept new"
1849                " job submissions until old jobs are archived\n")
1850   elif isinstance(err, errors.TypeEnforcementError):
1851     obuf.write("Parameter Error: %s" % msg)
1852   elif isinstance(err, errors.ParameterError):
1853     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1854   elif isinstance(err, luxi.NoMasterError):
1855     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1856                " and listening for connections?")
1857   elif isinstance(err, luxi.TimeoutError):
1858     obuf.write("Timeout while talking to the master daemon. Jobs might have"
1859                " been submitted and will continue to run even if the call"
1860                " timed out. Useful commands in this situation are \"gnt-job"
1861                " list\", \"gnt-job cancel\" and \"gnt-job watch\". Error:\n")
1862     obuf.write(msg)
1863   elif isinstance(err, luxi.PermissionError):
1864     obuf.write("It seems you don't have permissions to connect to the"
1865                " master daemon.\nPlease retry as a different user.")
1866   elif isinstance(err, luxi.ProtocolError):
1867     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1868                "%s" % msg)
1869   elif isinstance(err, errors.JobLost):
1870     obuf.write("Error checking job status: %s" % msg)
1871   elif isinstance(err, errors.GenericError):
1872     obuf.write("Unhandled Ganeti error: %s" % msg)
1873   elif isinstance(err, JobSubmittedException):
1874     obuf.write("JobID: %s\n" % err.args[0])
1875     retcode = 0
1876   else:
1877     obuf.write("Unhandled exception: %s" % msg)
1878   return retcode, obuf.getvalue().rstrip('\n')
1879
1880
1881 def GenericMain(commands, override=None, aliases=None):
1882   """Generic main function for all the gnt-* commands.
1883
1884   Arguments:
1885     - commands: a dictionary with a special structure, see the design doc
1886                 for command line handling.
1887     - override: if not None, we expect a dictionary with keys that will
1888                 override command line options; this can be used to pass
1889                 options from the scripts to generic functions
1890     - aliases: dictionary with command aliases {'alias': 'target, ...}
1891
1892   """
1893   # save the program name and the entire command line for later logging
1894   if sys.argv:
1895     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1896     if len(sys.argv) >= 2:
1897       binary += " " + sys.argv[1]
1898       old_cmdline = " ".join(sys.argv[2:])
1899     else:
1900       old_cmdline = ""
1901   else:
1902     binary = "<unknown program>"
1903     old_cmdline = ""
1904
1905   if aliases is None:
1906     aliases = {}
1907
1908   try:
1909     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1910   except errors.ParameterError, err:
1911     result, err_msg = FormatError(err)
1912     ToStderr(err_msg)
1913     return 1
1914
1915   if func is None: # parse error
1916     return 1
1917
1918   if override is not None:
1919     for key, val in override.iteritems():
1920       setattr(options, key, val)
1921
1922   utils.SetupLogging(constants.LOG_COMMANDS, binary, debug=options.debug,
1923                      stderr_logging=True)
1924
1925   if old_cmdline:
1926     logging.info("run with arguments '%s'", old_cmdline)
1927   else:
1928     logging.info("run with no arguments")
1929
1930   try:
1931     result = func(options, args)
1932   except (errors.GenericError, luxi.ProtocolError,
1933           JobSubmittedException), err:
1934     result, err_msg = FormatError(err)
1935     logging.exception("Error during command processing")
1936     ToStderr(err_msg)
1937   except KeyboardInterrupt:
1938     result = constants.EXIT_FAILURE
1939     ToStderr("Aborted. Note that if the operation created any jobs, they"
1940              " might have been submitted and"
1941              " will continue to run in the background.")
1942
1943   return result
1944
1945
1946 def ParseNicOption(optvalue):
1947   """Parses the value of the --net option(s).
1948
1949   """
1950   try:
1951     nic_max = max(int(nidx[0]) + 1 for nidx in optvalue)
1952   except (TypeError, ValueError), err:
1953     raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1954
1955   nics = [{}] * nic_max
1956   for nidx, ndict in optvalue:
1957     nidx = int(nidx)
1958
1959     if not isinstance(ndict, dict):
1960       raise errors.OpPrereqError("Invalid nic/%d value: expected dict,"
1961                                  " got %s" % (nidx, ndict))
1962
1963     utils.ForceDictType(ndict, constants.INIC_PARAMS_TYPES)
1964
1965     nics[nidx] = ndict
1966
1967   return nics
1968
1969
1970 def GenericInstanceCreate(mode, opts, args):
1971   """Add an instance to the cluster via either creation or import.
1972
1973   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1974   @param opts: the command line options selected by the user
1975   @type args: list
1976   @param args: should contain only one element, the new instance name
1977   @rtype: int
1978   @return: the desired exit code
1979
1980   """
1981   instance = args[0]
1982
1983   (pnode, snode) = SplitNodeOption(opts.node)
1984
1985   hypervisor = None
1986   hvparams = {}
1987   if opts.hypervisor:
1988     hypervisor, hvparams = opts.hypervisor
1989
1990   if opts.nics:
1991     nics = ParseNicOption(opts.nics)
1992   elif opts.no_nics:
1993     # no nics
1994     nics = []
1995   elif mode == constants.INSTANCE_CREATE:
1996     # default of one nic, all auto
1997     nics = [{}]
1998   else:
1999     # mode == import
2000     nics = []
2001
2002   if opts.disk_template == constants.DT_DISKLESS:
2003     if opts.disks or opts.sd_size is not None:
2004       raise errors.OpPrereqError("Diskless instance but disk"
2005                                  " information passed")
2006     disks = []
2007   else:
2008     if (not opts.disks and not opts.sd_size
2009         and mode == constants.INSTANCE_CREATE):
2010       raise errors.OpPrereqError("No disk information specified")
2011     if opts.disks and opts.sd_size is not None:
2012       raise errors.OpPrereqError("Please use either the '--disk' or"
2013                                  " '-s' option")
2014     if opts.sd_size is not None:
2015       opts.disks = [(0, {"size": opts.sd_size})]
2016
2017     if opts.disks:
2018       try:
2019         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
2020       except ValueError, err:
2021         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
2022       disks = [{}] * disk_max
2023     else:
2024       disks = []
2025     for didx, ddict in opts.disks:
2026       didx = int(didx)
2027       if not isinstance(ddict, dict):
2028         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
2029         raise errors.OpPrereqError(msg)
2030       elif "size" in ddict:
2031         if "adopt" in ddict:
2032           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
2033                                      " (disk %d)" % didx)
2034         try:
2035           ddict["size"] = utils.ParseUnit(ddict["size"])
2036         except ValueError, err:
2037           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
2038                                      (didx, err))
2039       elif "adopt" in ddict:
2040         if mode == constants.INSTANCE_IMPORT:
2041           raise errors.OpPrereqError("Disk adoption not allowed for instance"
2042                                      " import")
2043         ddict["size"] = 0
2044       else:
2045         raise errors.OpPrereqError("Missing size or adoption source for"
2046                                    " disk %d" % didx)
2047       disks[didx] = ddict
2048
2049   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
2050   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
2051
2052   if mode == constants.INSTANCE_CREATE:
2053     start = opts.start
2054     os_type = opts.os
2055     force_variant = opts.force_variant
2056     src_node = None
2057     src_path = None
2058     no_install = opts.no_install
2059     identify_defaults = False
2060   elif mode == constants.INSTANCE_IMPORT:
2061     start = False
2062     os_type = None
2063     force_variant = False
2064     src_node = opts.src_node
2065     src_path = opts.src_dir
2066     no_install = None
2067     identify_defaults = opts.identify_defaults
2068   else:
2069     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
2070
2071   op = opcodes.OpInstanceCreate(instance_name=instance,
2072                                 disks=disks,
2073                                 disk_template=opts.disk_template,
2074                                 nics=nics,
2075                                 pnode=pnode, snode=snode,
2076                                 ip_check=opts.ip_check,
2077                                 name_check=opts.name_check,
2078                                 wait_for_sync=opts.wait_for_sync,
2079                                 file_storage_dir=opts.file_storage_dir,
2080                                 file_driver=opts.file_driver,
2081                                 iallocator=opts.iallocator,
2082                                 hypervisor=hypervisor,
2083                                 hvparams=hvparams,
2084                                 beparams=opts.beparams,
2085                                 osparams=opts.osparams,
2086                                 mode=mode,
2087                                 start=start,
2088                                 os_type=os_type,
2089                                 force_variant=force_variant,
2090                                 src_node=src_node,
2091                                 src_path=src_path,
2092                                 no_install=no_install,
2093                                 identify_defaults=identify_defaults)
2094
2095   SubmitOrSend(op, opts)
2096   return 0
2097
2098
2099 class _RunWhileClusterStoppedHelper:
2100   """Helper class for L{RunWhileClusterStopped} to simplify state management
2101
2102   """
2103   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
2104     """Initializes this class.
2105
2106     @type feedback_fn: callable
2107     @param feedback_fn: Feedback function
2108     @type cluster_name: string
2109     @param cluster_name: Cluster name
2110     @type master_node: string
2111     @param master_node Master node name
2112     @type online_nodes: list
2113     @param online_nodes: List of names of online nodes
2114
2115     """
2116     self.feedback_fn = feedback_fn
2117     self.cluster_name = cluster_name
2118     self.master_node = master_node
2119     self.online_nodes = online_nodes
2120
2121     self.ssh = ssh.SshRunner(self.cluster_name)
2122
2123     self.nonmaster_nodes = [name for name in online_nodes
2124                             if name != master_node]
2125
2126     assert self.master_node not in self.nonmaster_nodes
2127
2128   def _RunCmd(self, node_name, cmd):
2129     """Runs a command on the local or a remote machine.
2130
2131     @type node_name: string
2132     @param node_name: Machine name
2133     @type cmd: list
2134     @param cmd: Command
2135
2136     """
2137     if node_name is None or node_name == self.master_node:
2138       # No need to use SSH
2139       result = utils.RunCmd(cmd)
2140     else:
2141       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
2142
2143     if result.failed:
2144       errmsg = ["Failed to run command %s" % result.cmd]
2145       if node_name:
2146         errmsg.append("on node %s" % node_name)
2147       errmsg.append(": exitcode %s and error %s" %
2148                     (result.exit_code, result.output))
2149       raise errors.OpExecError(" ".join(errmsg))
2150
2151   def Call(self, fn, *args):
2152     """Call function while all daemons are stopped.
2153
2154     @type fn: callable
2155     @param fn: Function to be called
2156
2157     """
2158     # Pause watcher by acquiring an exclusive lock on watcher state file
2159     self.feedback_fn("Blocking watcher")
2160     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
2161     try:
2162       # TODO: Currently, this just blocks. There's no timeout.
2163       # TODO: Should it be a shared lock?
2164       watcher_block.Exclusive(blocking=True)
2165
2166       # Stop master daemons, so that no new jobs can come in and all running
2167       # ones are finished
2168       self.feedback_fn("Stopping master daemons")
2169       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
2170       try:
2171         # Stop daemons on all nodes
2172         for node_name in self.online_nodes:
2173           self.feedback_fn("Stopping daemons on %s" % node_name)
2174           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
2175
2176         # All daemons are shut down now
2177         try:
2178           return fn(self, *args)
2179         except Exception, err:
2180           _, errmsg = FormatError(err)
2181           logging.exception("Caught exception")
2182           self.feedback_fn(errmsg)
2183           raise
2184       finally:
2185         # Start cluster again, master node last
2186         for node_name in self.nonmaster_nodes + [self.master_node]:
2187           self.feedback_fn("Starting daemons on %s" % node_name)
2188           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
2189     finally:
2190       # Resume watcher
2191       watcher_block.Close()
2192
2193
2194 def RunWhileClusterStopped(feedback_fn, fn, *args):
2195   """Calls a function while all cluster daemons are stopped.
2196
2197   @type feedback_fn: callable
2198   @param feedback_fn: Feedback function
2199   @type fn: callable
2200   @param fn: Function to be called when daemons are stopped
2201
2202   """
2203   feedback_fn("Gathering cluster information")
2204
2205   # This ensures we're running on the master daemon
2206   cl = GetClient()
2207
2208   (cluster_name, master_node) = \
2209     cl.QueryConfigValues(["cluster_name", "master_node"])
2210
2211   online_nodes = GetOnlineNodes([], cl=cl)
2212
2213   # Don't keep a reference to the client. The master daemon will go away.
2214   del cl
2215
2216   assert master_node in online_nodes
2217
2218   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2219                                        online_nodes).Call(fn, *args)
2220
2221
2222 def GenerateTable(headers, fields, separator, data,
2223                   numfields=None, unitfields=None,
2224                   units=None):
2225   """Prints a table with headers and different fields.
2226
2227   @type headers: dict
2228   @param headers: dictionary mapping field names to headers for
2229       the table
2230   @type fields: list
2231   @param fields: the field names corresponding to each row in
2232       the data field
2233   @param separator: the separator to be used; if this is None,
2234       the default 'smart' algorithm is used which computes optimal
2235       field width, otherwise just the separator is used between
2236       each field
2237   @type data: list
2238   @param data: a list of lists, each sublist being one row to be output
2239   @type numfields: list
2240   @param numfields: a list with the fields that hold numeric
2241       values and thus should be right-aligned
2242   @type unitfields: list
2243   @param unitfields: a list with the fields that hold numeric
2244       values that should be formatted with the units field
2245   @type units: string or None
2246   @param units: the units we should use for formatting, or None for
2247       automatic choice (human-readable for non-separator usage, otherwise
2248       megabytes); this is a one-letter string
2249
2250   """
2251   if units is None:
2252     if separator:
2253       units = "m"
2254     else:
2255       units = "h"
2256
2257   if numfields is None:
2258     numfields = []
2259   if unitfields is None:
2260     unitfields = []
2261
2262   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2263   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2264
2265   format_fields = []
2266   for field in fields:
2267     if headers and field not in headers:
2268       # TODO: handle better unknown fields (either revert to old
2269       # style of raising exception, or deal more intelligently with
2270       # variable fields)
2271       headers[field] = field
2272     if separator is not None:
2273       format_fields.append("%s")
2274     elif numfields.Matches(field):
2275       format_fields.append("%*s")
2276     else:
2277       format_fields.append("%-*s")
2278
2279   if separator is None:
2280     mlens = [0 for name in fields]
2281     format_str = ' '.join(format_fields)
2282   else:
2283     format_str = separator.replace("%", "%%").join(format_fields)
2284
2285   for row in data:
2286     if row is None:
2287       continue
2288     for idx, val in enumerate(row):
2289       if unitfields.Matches(fields[idx]):
2290         try:
2291           val = int(val)
2292         except (TypeError, ValueError):
2293           pass
2294         else:
2295           val = row[idx] = utils.FormatUnit(val, units)
2296       val = row[idx] = str(val)
2297       if separator is None:
2298         mlens[idx] = max(mlens[idx], len(val))
2299
2300   result = []
2301   if headers:
2302     args = []
2303     for idx, name in enumerate(fields):
2304       hdr = headers[name]
2305       if separator is None:
2306         mlens[idx] = max(mlens[idx], len(hdr))
2307         args.append(mlens[idx])
2308       args.append(hdr)
2309     result.append(format_str % tuple(args))
2310
2311   if separator is None:
2312     assert len(mlens) == len(fields)
2313
2314     if fields and not numfields.Matches(fields[-1]):
2315       mlens[-1] = 0
2316
2317   for line in data:
2318     args = []
2319     if line is None:
2320       line = ['-' for _ in fields]
2321     for idx in range(len(fields)):
2322       if separator is None:
2323         args.append(mlens[idx])
2324       args.append(line[idx])
2325     result.append(format_str % tuple(args))
2326
2327   return result
2328
2329
2330 def _FormatBool(value):
2331   """Formats a boolean value as a string.
2332
2333   """
2334   if value:
2335     return "Y"
2336   return "N"
2337
2338
2339 #: Default formatting for query results; (callback, align right)
2340 _DEFAULT_FORMAT_QUERY = {
2341   constants.QFT_TEXT: (str, False),
2342   constants.QFT_BOOL: (_FormatBool, False),
2343   constants.QFT_NUMBER: (str, True),
2344   constants.QFT_TIMESTAMP: (utils.FormatTime, False),
2345   constants.QFT_OTHER: (str, False),
2346   constants.QFT_UNKNOWN: (str, False),
2347   }
2348
2349
2350 def _GetColumnFormatter(fdef, override, unit):
2351   """Returns formatting function for a field.
2352
2353   @type fdef: L{objects.QueryFieldDefinition}
2354   @type override: dict
2355   @param override: Dictionary for overriding field formatting functions,
2356     indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
2357   @type unit: string
2358   @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT}
2359   @rtype: tuple; (callable, bool)
2360   @return: Returns the function to format a value (takes one parameter) and a
2361     boolean for aligning the value on the right-hand side
2362
2363   """
2364   fmt = override.get(fdef.name, None)
2365   if fmt is not None:
2366     return fmt
2367
2368   assert constants.QFT_UNIT not in _DEFAULT_FORMAT_QUERY
2369
2370   if fdef.kind == constants.QFT_UNIT:
2371     # Can't keep this information in the static dictionary
2372     return (lambda value: utils.FormatUnit(value, unit), True)
2373
2374   fmt = _DEFAULT_FORMAT_QUERY.get(fdef.kind, None)
2375   if fmt is not None:
2376     return fmt
2377
2378   raise NotImplementedError("Can't format column type '%s'" % fdef.kind)
2379
2380
2381 class _QueryColumnFormatter:
2382   """Callable class for formatting fields of a query.
2383
2384   """
2385   def __init__(self, fn, status_fn):
2386     """Initializes this class.
2387
2388     @type fn: callable
2389     @param fn: Formatting function
2390     @type status_fn: callable
2391     @param status_fn: Function to report fields' status
2392
2393     """
2394     self._fn = fn
2395     self._status_fn = status_fn
2396
2397   def __call__(self, data):
2398     """Returns a field's string representation.
2399
2400     """
2401     (status, value) = data
2402
2403     # Report status
2404     self._status_fn(status)
2405
2406     if status == constants.RS_NORMAL:
2407       return self._fn(value)
2408
2409     assert value is None, \
2410            "Found value %r for abnormal status %s" % (value, status)
2411
2412     if status == constants.RS_UNKNOWN:
2413       return "(unknown)"
2414
2415     if status == constants.RS_NODATA:
2416       return "(nodata)"
2417
2418     if status == constants.RS_UNAVAIL:
2419       return "(unavail)"
2420
2421     if status == constants.RS_OFFLINE:
2422       return "(offline)"
2423
2424     raise NotImplementedError("Unknown status %s" % status)
2425
2426
2427 def FormatQueryResult(result, unit=None, format_override=None, separator=None,
2428                       header=False):
2429   """Formats data in L{objects.QueryResponse}.
2430
2431   @type result: L{objects.QueryResponse}
2432   @param result: result of query operation
2433   @type unit: string
2434   @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT},
2435     see L{utils.text.FormatUnit}
2436   @type format_override: dict
2437   @param format_override: Dictionary for overriding field formatting functions,
2438     indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
2439   @type separator: string or None
2440   @param separator: String used to separate fields
2441   @type header: bool
2442   @param header: Whether to output header row
2443
2444   """
2445   if unit is None:
2446     if separator:
2447       unit = "m"
2448     else:
2449       unit = "h"
2450
2451   if format_override is None:
2452     format_override = {}
2453
2454   stats = dict.fromkeys(constants.RS_ALL, 0)
2455
2456   def _RecordStatus(status):
2457     if status in stats:
2458       stats[status] += 1
2459
2460   columns = []
2461   for fdef in result.fields:
2462     assert fdef.title and fdef.name
2463     (fn, align_right) = _GetColumnFormatter(fdef, format_override, unit)
2464     columns.append(TableColumn(fdef.title,
2465                                _QueryColumnFormatter(fn, _RecordStatus),
2466                                align_right))
2467
2468   table = FormatTable(result.data, columns, header, separator)
2469
2470   # Collect statistics
2471   assert len(stats) == len(constants.RS_ALL)
2472   assert compat.all(count >= 0 for count in stats.values())
2473
2474   # Determine overall status. If there was no data, unknown fields must be
2475   # detected via the field definitions.
2476   if (stats[constants.RS_UNKNOWN] or
2477       (not result.data and _GetUnknownFields(result.fields))):
2478     status = QR_UNKNOWN
2479   elif compat.any(count > 0 for key, count in stats.items()
2480                   if key != constants.RS_NORMAL):
2481     status = QR_INCOMPLETE
2482   else:
2483     status = QR_NORMAL
2484
2485   return (status, table)
2486
2487
2488 def _GetUnknownFields(fdefs):
2489   """Returns list of unknown fields included in C{fdefs}.
2490
2491   @type fdefs: list of L{objects.QueryFieldDefinition}
2492
2493   """
2494   return [fdef for fdef in fdefs
2495           if fdef.kind == constants.QFT_UNKNOWN]
2496
2497
2498 def _WarnUnknownFields(fdefs):
2499   """Prints a warning to stderr if a query included unknown fields.
2500
2501   @type fdefs: list of L{objects.QueryFieldDefinition}
2502
2503   """
2504   unknown = _GetUnknownFields(fdefs)
2505   if unknown:
2506     ToStderr("Warning: Queried for unknown fields %s",
2507              utils.CommaJoin(fdef.name for fdef in unknown))
2508     return True
2509
2510   return False
2511
2512
2513 def GenericList(resource, fields, names, unit, separator, header, cl=None,
2514                 format_override=None):
2515   """Generic implementation for listing all items of a resource.
2516
2517   @param resource: One of L{constants.QR_OP_LUXI}
2518   @type fields: list of strings
2519   @param fields: List of fields to query for
2520   @type names: list of strings
2521   @param names: Names of items to query for
2522   @type unit: string or None
2523   @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT} or
2524     None for automatic choice (human-readable for non-separator usage,
2525     otherwise megabytes); this is a one-letter string
2526   @type separator: string or None
2527   @param separator: String used to separate fields
2528   @type header: bool
2529   @param header: Whether to show header row
2530   @type format_override: dict
2531   @param format_override: Dictionary for overriding field formatting functions,
2532     indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
2533
2534   """
2535   if cl is None:
2536     cl = GetClient()
2537
2538   if not names:
2539     names = None
2540
2541   response = cl.Query(resource, fields, qlang.MakeSimpleFilter("name", names))
2542
2543   found_unknown = _WarnUnknownFields(response.fields)
2544
2545   (status, data) = FormatQueryResult(response, unit=unit, separator=separator,
2546                                      header=header,
2547                                      format_override=format_override)
2548
2549   for line in data:
2550     ToStdout(line)
2551
2552   assert ((found_unknown and status == QR_UNKNOWN) or
2553           (not found_unknown and status != QR_UNKNOWN))
2554
2555   if status == QR_UNKNOWN:
2556     return constants.EXIT_UNKNOWN_FIELD
2557
2558   # TODO: Should the list command fail if not all data could be collected?
2559   return constants.EXIT_SUCCESS
2560
2561
2562 def GenericListFields(resource, fields, separator, header, cl=None):
2563   """Generic implementation for listing fields for a resource.
2564
2565   @param resource: One of L{constants.QR_OP_LUXI}
2566   @type fields: list of strings
2567   @param fields: List of fields to query for
2568   @type separator: string or None
2569   @param separator: String used to separate fields
2570   @type header: bool
2571   @param header: Whether to show header row
2572
2573   """
2574   if cl is None:
2575     cl = GetClient()
2576
2577   if not fields:
2578     fields = None
2579
2580   response = cl.QueryFields(resource, fields)
2581
2582   found_unknown = _WarnUnknownFields(response.fields)
2583
2584   columns = [
2585     TableColumn("Name", str, False),
2586     TableColumn("Title", str, False),
2587     # TODO: Add field description to master daemon
2588     ]
2589
2590   rows = [[fdef.name, fdef.title] for fdef in response.fields]
2591
2592   for line in FormatTable(rows, columns, header, separator):
2593     ToStdout(line)
2594
2595   if found_unknown:
2596     return constants.EXIT_UNKNOWN_FIELD
2597
2598   return constants.EXIT_SUCCESS
2599
2600
2601 class TableColumn:
2602   """Describes a column for L{FormatTable}.
2603
2604   """
2605   def __init__(self, title, fn, align_right):
2606     """Initializes this class.
2607
2608     @type title: string
2609     @param title: Column title
2610     @type fn: callable
2611     @param fn: Formatting function
2612     @type align_right: bool
2613     @param align_right: Whether to align values on the right-hand side
2614
2615     """
2616     self.title = title
2617     self.format = fn
2618     self.align_right = align_right
2619
2620
2621 def _GetColFormatString(width, align_right):
2622   """Returns the format string for a field.
2623
2624   """
2625   if align_right:
2626     sign = ""
2627   else:
2628     sign = "-"
2629
2630   return "%%%s%ss" % (sign, width)
2631
2632
2633 def FormatTable(rows, columns, header, separator):
2634   """Formats data as a table.
2635
2636   @type rows: list of lists
2637   @param rows: Row data, one list per row
2638   @type columns: list of L{TableColumn}
2639   @param columns: Column descriptions
2640   @type header: bool
2641   @param header: Whether to show header row
2642   @type separator: string or None
2643   @param separator: String used to separate columns
2644
2645   """
2646   if header:
2647     data = [[col.title for col in columns]]
2648     colwidth = [len(col.title) for col in columns]
2649   else:
2650     data = []
2651     colwidth = [0 for _ in columns]
2652
2653   # Format row data
2654   for row in rows:
2655     assert len(row) == len(columns)
2656
2657     formatted = [col.format(value) for value, col in zip(row, columns)]
2658
2659     if separator is None:
2660       # Update column widths
2661       for idx, (oldwidth, value) in enumerate(zip(colwidth, formatted)):
2662         # Modifying a list's items while iterating is fine
2663         colwidth[idx] = max(oldwidth, len(value))
2664
2665     data.append(formatted)
2666
2667   if separator is not None:
2668     # Return early if a separator is used
2669     return [separator.join(row) for row in data]
2670
2671   if columns and not columns[-1].align_right:
2672     # Avoid unnecessary spaces at end of line
2673     colwidth[-1] = 0
2674
2675   # Build format string
2676   fmt = " ".join([_GetColFormatString(width, col.align_right)
2677                   for col, width in zip(columns, colwidth)])
2678
2679   return [fmt % tuple(row) for row in data]
2680
2681
2682 def FormatTimestamp(ts):
2683   """Formats a given timestamp.
2684
2685   @type ts: timestamp
2686   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2687
2688   @rtype: string
2689   @return: a string with the formatted timestamp
2690
2691   """
2692   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2693     return '?'
2694   sec, usec = ts
2695   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2696
2697
2698 def ParseTimespec(value):
2699   """Parse a time specification.
2700
2701   The following suffixed will be recognized:
2702
2703     - s: seconds
2704     - m: minutes
2705     - h: hours
2706     - d: day
2707     - w: weeks
2708
2709   Without any suffix, the value will be taken to be in seconds.
2710
2711   """
2712   value = str(value)
2713   if not value:
2714     raise errors.OpPrereqError("Empty time specification passed")
2715   suffix_map = {
2716     's': 1,
2717     'm': 60,
2718     'h': 3600,
2719     'd': 86400,
2720     'w': 604800,
2721     }
2722   if value[-1] not in suffix_map:
2723     try:
2724       value = int(value)
2725     except (TypeError, ValueError):
2726       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2727   else:
2728     multiplier = suffix_map[value[-1]]
2729     value = value[:-1]
2730     if not value: # no data left after stripping the suffix
2731       raise errors.OpPrereqError("Invalid time specification (only"
2732                                  " suffix passed)")
2733     try:
2734       value = int(value) * multiplier
2735     except (TypeError, ValueError):
2736       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2737   return value
2738
2739
2740 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2741                    filter_master=False):
2742   """Returns the names of online nodes.
2743
2744   This function will also log a warning on stderr with the names of
2745   the online nodes.
2746
2747   @param nodes: if not empty, use only this subset of nodes (minus the
2748       offline ones)
2749   @param cl: if not None, luxi client to use
2750   @type nowarn: boolean
2751   @param nowarn: by default, this function will output a note with the
2752       offline nodes that are skipped; if this parameter is True the
2753       note is not displayed
2754   @type secondary_ips: boolean
2755   @param secondary_ips: if True, return the secondary IPs instead of the
2756       names, useful for doing network traffic over the replication interface
2757       (if any)
2758   @type filter_master: boolean
2759   @param filter_master: if True, do not return the master node in the list
2760       (useful in coordination with secondary_ips where we cannot check our
2761       node name against the list)
2762
2763   """
2764   if cl is None:
2765     cl = GetClient()
2766
2767   if secondary_ips:
2768     name_idx = 2
2769   else:
2770     name_idx = 0
2771
2772   if filter_master:
2773     master_node = cl.QueryConfigValues(["master_node"])[0]
2774     filter_fn = lambda x: x != master_node
2775   else:
2776     filter_fn = lambda _: True
2777
2778   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2779                          use_locking=False)
2780   offline = [row[0] for row in result if row[1]]
2781   if offline and not nowarn:
2782     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2783   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2784
2785
2786 def _ToStream(stream, txt, *args):
2787   """Write a message to a stream, bypassing the logging system
2788
2789   @type stream: file object
2790   @param stream: the file to which we should write
2791   @type txt: str
2792   @param txt: the message
2793
2794   """
2795   if args:
2796     args = tuple(args)
2797     stream.write(txt % args)
2798   else:
2799     stream.write(txt)
2800   stream.write('\n')
2801   stream.flush()
2802
2803
2804 def ToStdout(txt, *args):
2805   """Write a message to stdout only, bypassing the logging system
2806
2807   This is just a wrapper over _ToStream.
2808
2809   @type txt: str
2810   @param txt: the message
2811
2812   """
2813   _ToStream(sys.stdout, txt, *args)
2814
2815
2816 def ToStderr(txt, *args):
2817   """Write a message to stderr only, bypassing the logging system
2818
2819   This is just a wrapper over _ToStream.
2820
2821   @type txt: str
2822   @param txt: the message
2823
2824   """
2825   _ToStream(sys.stderr, txt, *args)
2826
2827
2828 class JobExecutor(object):
2829   """Class which manages the submission and execution of multiple jobs.
2830
2831   Note that instances of this class should not be reused between
2832   GetResults() calls.
2833
2834   """
2835   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2836     self.queue = []
2837     if cl is None:
2838       cl = GetClient()
2839     self.cl = cl
2840     self.verbose = verbose
2841     self.jobs = []
2842     self.opts = opts
2843     self.feedback_fn = feedback_fn
2844
2845   def QueueJob(self, name, *ops):
2846     """Record a job for later submit.
2847
2848     @type name: string
2849     @param name: a description of the job, will be used in WaitJobSet
2850     """
2851     SetGenericOpcodeOpts(ops, self.opts)
2852     self.queue.append((name, ops))
2853
2854   def SubmitPending(self, each=False):
2855     """Submit all pending jobs.
2856
2857     """
2858     if each:
2859       results = []
2860       for row in self.queue:
2861         # SubmitJob will remove the success status, but raise an exception if
2862         # the submission fails, so we'll notice that anyway.
2863         results.append([True, self.cl.SubmitJob(row[1])])
2864     else:
2865       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2866     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2867                                                             self.queue)):
2868       self.jobs.append((idx, status, data, name))
2869
2870   def _ChooseJob(self):
2871     """Choose a non-waiting/queued job to poll next.
2872
2873     """
2874     assert self.jobs, "_ChooseJob called with empty job list"
2875
2876     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2877     assert result
2878
2879     for job_data, status in zip(self.jobs, result):
2880       if (isinstance(status, list) and status and
2881           status[0] in (constants.JOB_STATUS_QUEUED,
2882                         constants.JOB_STATUS_WAITLOCK,
2883                         constants.JOB_STATUS_CANCELING)):
2884         # job is still present and waiting
2885         continue
2886       # good candidate found (either running job or lost job)
2887       self.jobs.remove(job_data)
2888       return job_data
2889
2890     # no job found
2891     return self.jobs.pop(0)
2892
2893   def GetResults(self):
2894     """Wait for and return the results of all jobs.
2895
2896     @rtype: list
2897     @return: list of tuples (success, job results), in the same order
2898         as the submitted jobs; if a job has failed, instead of the result
2899         there will be the error message
2900
2901     """
2902     if not self.jobs:
2903       self.SubmitPending()
2904     results = []
2905     if self.verbose:
2906       ok_jobs = [row[2] for row in self.jobs if row[1]]
2907       if ok_jobs:
2908         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2909
2910     # first, remove any non-submitted jobs
2911     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2912     for idx, _, jid, name in failures:
2913       ToStderr("Failed to submit job for %s: %s", name, jid)
2914       results.append((idx, False, jid))
2915
2916     while self.jobs:
2917       (idx, _, jid, name) = self._ChooseJob()
2918       ToStdout("Waiting for job %s for %s...", jid, name)
2919       try:
2920         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2921         success = True
2922       except errors.JobLost, err:
2923         _, job_result = FormatError(err)
2924         ToStderr("Job %s for %s has been archived, cannot check its result",
2925                  jid, name)
2926         success = False
2927       except (errors.GenericError, luxi.ProtocolError), err:
2928         _, job_result = FormatError(err)
2929         success = False
2930         # the error message will always be shown, verbose or not
2931         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2932
2933       results.append((idx, success, job_result))
2934
2935     # sort based on the index, then drop it
2936     results.sort()
2937     results = [i[1:] for i in results]
2938
2939     return results
2940
2941   def WaitOrShow(self, wait):
2942     """Wait for job results or only print the job IDs.
2943
2944     @type wait: boolean
2945     @param wait: whether to wait or not
2946
2947     """
2948     if wait:
2949       return self.GetResults()
2950     else:
2951       if not self.jobs:
2952         self.SubmitPending()
2953       for _, status, result, name in self.jobs:
2954         if status:
2955           ToStdout("%s: %s", result, name)
2956         else:
2957           ToStderr("Failure for %s: %s", name, result)
2958       return [row[1:3] for row in self.jobs]
2959
2960
2961 def FormatParameterDict(buf, param_dict, actual, level=1):
2962   """Formats a parameter dictionary.
2963
2964   @type buf: L{StringIO}
2965   @param buf: the buffer into which to write
2966   @type param_dict: dict
2967   @param param_dict: the own parameters
2968   @type actual: dict
2969   @param actual: the current parameter set (including defaults)
2970   @param level: Level of indent
2971
2972   """
2973   indent = "  " * level
2974   for key in sorted(actual):
2975     val = param_dict.get(key, "default (%s)" % actual[key])
2976     buf.write("%s- %s: %s\n" % (indent, key, val))