jqueue/gnt-job: Add job priority fields for display
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41 from ganeti import netutils
42
43 from optparse import (OptionParser, TitledHelpFormatter,
44                       Option, OptionValueError)
45
46
47 __all__ = [
48   # Command line options
49   "ADD_UIDS_OPT",
50   "ALLOCATABLE_OPT",
51   "ALL_OPT",
52   "AUTO_PROMOTE_OPT",
53   "AUTO_REPLACE_OPT",
54   "BACKEND_OPT",
55   "BLK_OS_OPT",
56   "CLEANUP_OPT",
57   "CLUSTER_DOMAIN_SECRET_OPT",
58   "CONFIRM_OPT",
59   "CP_SIZE_OPT",
60   "DEBUG_OPT",
61   "DEBUG_SIMERR_OPT",
62   "DISKIDX_OPT",
63   "DISK_OPT",
64   "DISK_TEMPLATE_OPT",
65   "DRAINED_OPT",
66   "DRY_RUN_OPT",
67   "DRBD_HELPER_OPT",
68   "EARLY_RELEASE_OPT",
69   "ENABLED_HV_OPT",
70   "ERROR_CODES_OPT",
71   "FIELDS_OPT",
72   "FILESTORE_DIR_OPT",
73   "FILESTORE_DRIVER_OPT",
74   "FORCE_OPT",
75   "FORCE_VARIANT_OPT",
76   "GLOBAL_FILEDIR_OPT",
77   "HID_OS_OPT",
78   "HVLIST_OPT",
79   "HVOPTS_OPT",
80   "HYPERVISOR_OPT",
81   "IALLOCATOR_OPT",
82   "DEFAULT_IALLOCATOR_OPT",
83   "IDENTIFY_DEFAULTS_OPT",
84   "IGNORE_CONSIST_OPT",
85   "IGNORE_FAILURES_OPT",
86   "IGNORE_REMOVE_FAILURES_OPT",
87   "IGNORE_SECONDARIES_OPT",
88   "IGNORE_SIZE_OPT",
89   "INTERVAL_OPT",
90   "MAC_PREFIX_OPT",
91   "MAINTAIN_NODE_HEALTH_OPT",
92   "MASTER_NETDEV_OPT",
93   "MC_OPT",
94   "MIGRATION_MODE_OPT",
95   "NET_OPT",
96   "NEW_CLUSTER_CERT_OPT",
97   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
98   "NEW_CONFD_HMAC_KEY_OPT",
99   "NEW_RAPI_CERT_OPT",
100   "NEW_SECONDARY_OPT",
101   "NIC_PARAMS_OPT",
102   "NODE_LIST_OPT",
103   "NODE_PLACEMENT_OPT",
104   "NODEGROUP_OPT",
105   "NODRBD_STORAGE_OPT",
106   "NOHDR_OPT",
107   "NOIPCHECK_OPT",
108   "NO_INSTALL_OPT",
109   "NONAMECHECK_OPT",
110   "NOLVM_STORAGE_OPT",
111   "NOMODIFY_ETCHOSTS_OPT",
112   "NOMODIFY_SSH_SETUP_OPT",
113   "NONICS_OPT",
114   "NONLIVE_OPT",
115   "NONPLUS1_OPT",
116   "NOSHUTDOWN_OPT",
117   "NOSTART_OPT",
118   "NOSSH_KEYCHECK_OPT",
119   "NOVOTING_OPT",
120   "NWSYNC_OPT",
121   "ON_PRIMARY_OPT",
122   "ON_SECONDARY_OPT",
123   "OFFLINE_OPT",
124   "OSPARAMS_OPT",
125   "OS_OPT",
126   "OS_SIZE_OPT",
127   "PRIMARY_IP_VERSION_OPT",
128   "PRIORITY_OPT",
129   "RAPI_CERT_OPT",
130   "READD_OPT",
131   "REBOOT_TYPE_OPT",
132   "REMOVE_INSTANCE_OPT",
133   "REMOVE_UIDS_OPT",
134   "RESERVED_LVS_OPT",
135   "ROMAN_OPT",
136   "SECONDARY_IP_OPT",
137   "SELECT_OS_OPT",
138   "SEP_OPT",
139   "SHOWCMD_OPT",
140   "SHUTDOWN_TIMEOUT_OPT",
141   "SINGLE_NODE_OPT",
142   "SRC_DIR_OPT",
143   "SRC_NODE_OPT",
144   "SUBMIT_OPT",
145   "STATIC_OPT",
146   "SYNC_OPT",
147   "TAG_SRC_OPT",
148   "TIMEOUT_OPT",
149   "UIDPOOL_OPT",
150   "USEUNITS_OPT",
151   "USE_REPL_NET_OPT",
152   "VERBOSE_OPT",
153   "VG_NAME_OPT",
154   "YES_DOIT_OPT",
155   # Generic functions for CLI programs
156   "GenericMain",
157   "GenericInstanceCreate",
158   "GetClient",
159   "GetOnlineNodes",
160   "JobExecutor",
161   "JobSubmittedException",
162   "ParseTimespec",
163   "RunWhileClusterStopped",
164   "SubmitOpCode",
165   "SubmitOrSend",
166   "UsesRPC",
167   # Formatting functions
168   "ToStderr", "ToStdout",
169   "FormatError",
170   "GenerateTable",
171   "AskUser",
172   "FormatTimestamp",
173   "FormatLogMessage",
174   # Tags functions
175   "ListTags",
176   "AddTags",
177   "RemoveTags",
178   # command line options support infrastructure
179   "ARGS_MANY_INSTANCES",
180   "ARGS_MANY_NODES",
181   "ARGS_NONE",
182   "ARGS_ONE_INSTANCE",
183   "ARGS_ONE_NODE",
184   "ARGS_ONE_OS",
185   "ArgChoice",
186   "ArgCommand",
187   "ArgFile",
188   "ArgHost",
189   "ArgInstance",
190   "ArgJobId",
191   "ArgNode",
192   "ArgOs",
193   "ArgSuggest",
194   "ArgUnknown",
195   "OPT_COMPL_INST_ADD_NODES",
196   "OPT_COMPL_MANY_NODES",
197   "OPT_COMPL_ONE_IALLOCATOR",
198   "OPT_COMPL_ONE_INSTANCE",
199   "OPT_COMPL_ONE_NODE",
200   "OPT_COMPL_ONE_NODEGROUP",
201   "OPT_COMPL_ONE_OS",
202   "cli_option",
203   "SplitNodeOption",
204   "CalculateOSNames",
205   "ParseFields",
206   ]
207
208 NO_PREFIX = "no_"
209 UN_PREFIX = "-"
210
211 #: Priorities (sorted)
212 _PRIORITY_NAMES = [
213   ("low", constants.OP_PRIO_LOW),
214   ("normal", constants.OP_PRIO_NORMAL),
215   ("high", constants.OP_PRIO_HIGH),
216   ]
217
218 #: Priority dictionary for easier lookup
219 # TODO: Replace this and _PRIORITY_NAMES with a single sorted dictionary once
220 # we migrate to Python 2.6
221 _PRIONAME_TO_VALUE = dict(_PRIORITY_NAMES)
222
223
224 class _Argument:
225   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
226     self.min = min
227     self.max = max
228
229   def __repr__(self):
230     return ("<%s min=%s max=%s>" %
231             (self.__class__.__name__, self.min, self.max))
232
233
234 class ArgSuggest(_Argument):
235   """Suggesting argument.
236
237   Value can be any of the ones passed to the constructor.
238
239   """
240   # pylint: disable-msg=W0622
241   def __init__(self, min=0, max=None, choices=None):
242     _Argument.__init__(self, min=min, max=max)
243     self.choices = choices
244
245   def __repr__(self):
246     return ("<%s min=%s max=%s choices=%r>" %
247             (self.__class__.__name__, self.min, self.max, self.choices))
248
249
250 class ArgChoice(ArgSuggest):
251   """Choice argument.
252
253   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
254   but value must be one of the choices.
255
256   """
257
258
259 class ArgUnknown(_Argument):
260   """Unknown argument to program (e.g. determined at runtime).
261
262   """
263
264
265 class ArgInstance(_Argument):
266   """Instances argument.
267
268   """
269
270
271 class ArgNode(_Argument):
272   """Node argument.
273
274   """
275
276 class ArgJobId(_Argument):
277   """Job ID argument.
278
279   """
280
281
282 class ArgFile(_Argument):
283   """File path argument.
284
285   """
286
287
288 class ArgCommand(_Argument):
289   """Command argument.
290
291   """
292
293
294 class ArgHost(_Argument):
295   """Host argument.
296
297   """
298
299
300 class ArgOs(_Argument):
301   """OS argument.
302
303   """
304
305
306 ARGS_NONE = []
307 ARGS_MANY_INSTANCES = [ArgInstance()]
308 ARGS_MANY_NODES = [ArgNode()]
309 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
310 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
311 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
312
313
314 def _ExtractTagsObject(opts, args):
315   """Extract the tag type object.
316
317   Note that this function will modify its args parameter.
318
319   """
320   if not hasattr(opts, "tag_type"):
321     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
322   kind = opts.tag_type
323   if kind == constants.TAG_CLUSTER:
324     retval = kind, kind
325   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
326     if not args:
327       raise errors.OpPrereqError("no arguments passed to the command")
328     name = args.pop(0)
329     retval = kind, name
330   else:
331     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
332   return retval
333
334
335 def _ExtendTags(opts, args):
336   """Extend the args if a source file has been given.
337
338   This function will extend the tags with the contents of the file
339   passed in the 'tags_source' attribute of the opts parameter. A file
340   named '-' will be replaced by stdin.
341
342   """
343   fname = opts.tags_source
344   if fname is None:
345     return
346   if fname == "-":
347     new_fh = sys.stdin
348   else:
349     new_fh = open(fname, "r")
350   new_data = []
351   try:
352     # we don't use the nice 'new_data = [line.strip() for line in fh]'
353     # because of python bug 1633941
354     while True:
355       line = new_fh.readline()
356       if not line:
357         break
358       new_data.append(line.strip())
359   finally:
360     new_fh.close()
361   args.extend(new_data)
362
363
364 def ListTags(opts, args):
365   """List the tags on a given object.
366
367   This is a generic implementation that knows how to deal with all
368   three cases of tag objects (cluster, node, instance). The opts
369   argument is expected to contain a tag_type field denoting what
370   object type we work on.
371
372   """
373   kind, name = _ExtractTagsObject(opts, args)
374   cl = GetClient()
375   result = cl.QueryTags(kind, name)
376   result = list(result)
377   result.sort()
378   for tag in result:
379     ToStdout(tag)
380
381
382 def AddTags(opts, args):
383   """Add tags on a given object.
384
385   This is a generic implementation that knows how to deal with all
386   three cases of tag objects (cluster, node, instance). The opts
387   argument is expected to contain a tag_type field denoting what
388   object type we work on.
389
390   """
391   kind, name = _ExtractTagsObject(opts, args)
392   _ExtendTags(opts, args)
393   if not args:
394     raise errors.OpPrereqError("No tags to be added")
395   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
396   SubmitOpCode(op, opts=opts)
397
398
399 def RemoveTags(opts, args):
400   """Remove tags from a given object.
401
402   This is a generic implementation that knows how to deal with all
403   three cases of tag objects (cluster, node, instance). The opts
404   argument is expected to contain a tag_type field denoting what
405   object type we work on.
406
407   """
408   kind, name = _ExtractTagsObject(opts, args)
409   _ExtendTags(opts, args)
410   if not args:
411     raise errors.OpPrereqError("No tags to be removed")
412   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
413   SubmitOpCode(op, opts=opts)
414
415
416 def check_unit(option, opt, value): # pylint: disable-msg=W0613
417   """OptParsers custom converter for units.
418
419   """
420   try:
421     return utils.ParseUnit(value)
422   except errors.UnitParseError, err:
423     raise OptionValueError("option %s: %s" % (opt, err))
424
425
426 def _SplitKeyVal(opt, data):
427   """Convert a KeyVal string into a dict.
428
429   This function will convert a key=val[,...] string into a dict. Empty
430   values will be converted specially: keys which have the prefix 'no_'
431   will have the value=False and the prefix stripped, the others will
432   have value=True.
433
434   @type opt: string
435   @param opt: a string holding the option name for which we process the
436       data, used in building error messages
437   @type data: string
438   @param data: a string of the format key=val,key=val,...
439   @rtype: dict
440   @return: {key=val, key=val}
441   @raises errors.ParameterError: if there are duplicate keys
442
443   """
444   kv_dict = {}
445   if data:
446     for elem in utils.UnescapeAndSplit(data, sep=","):
447       if "=" in elem:
448         key, val = elem.split("=", 1)
449       else:
450         if elem.startswith(NO_PREFIX):
451           key, val = elem[len(NO_PREFIX):], False
452         elif elem.startswith(UN_PREFIX):
453           key, val = elem[len(UN_PREFIX):], None
454         else:
455           key, val = elem, True
456       if key in kv_dict:
457         raise errors.ParameterError("Duplicate key '%s' in option %s" %
458                                     (key, opt))
459       kv_dict[key] = val
460   return kv_dict
461
462
463 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
464   """Custom parser for ident:key=val,key=val options.
465
466   This will store the parsed values as a tuple (ident, {key: val}). As such,
467   multiple uses of this option via action=append is possible.
468
469   """
470   if ":" not in value:
471     ident, rest = value, ''
472   else:
473     ident, rest = value.split(":", 1)
474
475   if ident.startswith(NO_PREFIX):
476     if rest:
477       msg = "Cannot pass options when removing parameter groups: %s" % value
478       raise errors.ParameterError(msg)
479     retval = (ident[len(NO_PREFIX):], False)
480   elif ident.startswith(UN_PREFIX):
481     if rest:
482       msg = "Cannot pass options when removing parameter groups: %s" % value
483       raise errors.ParameterError(msg)
484     retval = (ident[len(UN_PREFIX):], None)
485   else:
486     kv_dict = _SplitKeyVal(opt, rest)
487     retval = (ident, kv_dict)
488   return retval
489
490
491 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
492   """Custom parser class for key=val,key=val options.
493
494   This will store the parsed values as a dict {key: val}.
495
496   """
497   return _SplitKeyVal(opt, value)
498
499
500 def check_bool(option, opt, value): # pylint: disable-msg=W0613
501   """Custom parser for yes/no options.
502
503   This will store the parsed value as either True or False.
504
505   """
506   value = value.lower()
507   if value == constants.VALUE_FALSE or value == "no":
508     return False
509   elif value == constants.VALUE_TRUE or value == "yes":
510     return True
511   else:
512     raise errors.ParameterError("Invalid boolean value '%s'" % value)
513
514
515 # completion_suggestion is normally a list. Using numeric values not evaluating
516 # to False for dynamic completion.
517 (OPT_COMPL_MANY_NODES,
518  OPT_COMPL_ONE_NODE,
519  OPT_COMPL_ONE_INSTANCE,
520  OPT_COMPL_ONE_OS,
521  OPT_COMPL_ONE_IALLOCATOR,
522  OPT_COMPL_INST_ADD_NODES,
523  OPT_COMPL_ONE_NODEGROUP) = range(100, 107)
524
525 OPT_COMPL_ALL = frozenset([
526   OPT_COMPL_MANY_NODES,
527   OPT_COMPL_ONE_NODE,
528   OPT_COMPL_ONE_INSTANCE,
529   OPT_COMPL_ONE_OS,
530   OPT_COMPL_ONE_IALLOCATOR,
531   OPT_COMPL_INST_ADD_NODES,
532   OPT_COMPL_ONE_NODEGROUP,
533   ])
534
535
536 class CliOption(Option):
537   """Custom option class for optparse.
538
539   """
540   ATTRS = Option.ATTRS + [
541     "completion_suggest",
542     ]
543   TYPES = Option.TYPES + (
544     "identkeyval",
545     "keyval",
546     "unit",
547     "bool",
548     )
549   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
550   TYPE_CHECKER["identkeyval"] = check_ident_key_val
551   TYPE_CHECKER["keyval"] = check_key_val
552   TYPE_CHECKER["unit"] = check_unit
553   TYPE_CHECKER["bool"] = check_bool
554
555
556 # optparse.py sets make_option, so we do it for our own option class, too
557 cli_option = CliOption
558
559
560 _YORNO = "yes|no"
561
562 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
563                        help="Increase debugging level")
564
565 NOHDR_OPT = cli_option("--no-headers", default=False,
566                        action="store_true", dest="no_headers",
567                        help="Don't display column headers")
568
569 SEP_OPT = cli_option("--separator", default=None,
570                      action="store", dest="separator",
571                      help=("Separator between output fields"
572                            " (defaults to one space)"))
573
574 USEUNITS_OPT = cli_option("--units", default=None,
575                           dest="units", choices=('h', 'm', 'g', 't'),
576                           help="Specify units for output (one of hmgt)")
577
578 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
579                         type="string", metavar="FIELDS",
580                         help="Comma separated list of output fields")
581
582 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
583                        default=False, help="Force the operation")
584
585 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
586                          default=False, help="Do not require confirmation")
587
588 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
589                          default=None, help="File with tag names")
590
591 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
592                         default=False, action="store_true",
593                         help=("Submit the job and return the job ID, but"
594                               " don't wait for the job to finish"))
595
596 SYNC_OPT = cli_option("--sync", dest="do_locking",
597                       default=False, action="store_true",
598                       help=("Grab locks while doing the queries"
599                             " in order to ensure more consistent results"))
600
601 DRY_RUN_OPT = cli_option("--dry-run", default=False,
602                          action="store_true",
603                          help=("Do not execute the operation, just run the"
604                                " check steps and verify it it could be"
605                                " executed"))
606
607 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
608                          action="store_true",
609                          help="Increase the verbosity of the operation")
610
611 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
612                               action="store_true", dest="simulate_errors",
613                               help="Debugging option that makes the operation"
614                               " treat most runtime checks as failed")
615
616 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
617                         default=True, action="store_false",
618                         help="Don't wait for sync (DANGEROUS!)")
619
620 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
621                                help="Custom disk setup (diskless, file,"
622                                " plain or drbd)",
623                                default=None, metavar="TEMPL",
624                                choices=list(constants.DISK_TEMPLATES))
625
626 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
627                         help="Do not create any network cards for"
628                         " the instance")
629
630 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
631                                help="Relative path under default cluster-wide"
632                                " file storage dir to store file-based disks",
633                                default=None, metavar="<DIR>")
634
635 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
636                                   help="Driver to use for image files",
637                                   default="loop", metavar="<DRIVER>",
638                                   choices=list(constants.FILE_DRIVER))
639
640 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
641                             help="Select nodes for the instance automatically"
642                             " using the <NAME> iallocator plugin",
643                             default=None, type="string",
644                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
645
646 DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
647                             metavar="<NAME>",
648                             help="Set the default instance allocator plugin",
649                             default=None, type="string",
650                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
651
652 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
653                     metavar="<os>",
654                     completion_suggest=OPT_COMPL_ONE_OS)
655
656 OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
657                          type="keyval", default={},
658                          help="OS parameters")
659
660 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
661                                action="store_true", default=False,
662                                help="Force an unknown variant")
663
664 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
665                             action="store_true", default=False,
666                             help="Do not install the OS (will"
667                             " enable no-start)")
668
669 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
670                          type="keyval", default={},
671                          help="Backend parameters")
672
673 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
674                          default={}, dest="hvparams",
675                          help="Hypervisor parameters")
676
677 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
678                             help="Hypervisor and hypervisor options, in the"
679                             " format hypervisor:option=value,option=value,...",
680                             default=None, type="identkeyval")
681
682 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
683                         help="Hypervisor and hypervisor options, in the"
684                         " format hypervisor:option=value,option=value,...",
685                         default=[], action="append", type="identkeyval")
686
687 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
688                            action="store_false",
689                            help="Don't check that the instance's IP"
690                            " is alive")
691
692 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
693                              default=True, action="store_false",
694                              help="Don't check that the instance's name"
695                              " is resolvable")
696
697 NET_OPT = cli_option("--net",
698                      help="NIC parameters", default=[],
699                      dest="nics", action="append", type="identkeyval")
700
701 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
702                       dest="disks", action="append", type="identkeyval")
703
704 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
705                          help="Comma-separated list of disks"
706                          " indices to act on (e.g. 0,2) (optional,"
707                          " defaults to all disks)")
708
709 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
710                          help="Enforces a single-disk configuration using the"
711                          " given disk size, in MiB unless a suffix is used",
712                          default=None, type="unit", metavar="<size>")
713
714 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
715                                 dest="ignore_consistency",
716                                 action="store_true", default=False,
717                                 help="Ignore the consistency of the disks on"
718                                 " the secondary")
719
720 NONLIVE_OPT = cli_option("--non-live", dest="live",
721                          default=True, action="store_false",
722                          help="Do a non-live migration (this usually means"
723                          " freeze the instance, save the state, transfer and"
724                          " only then resume running on the secondary node)")
725
726 MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode",
727                                 default=None,
728                                 choices=list(constants.HT_MIGRATION_MODES),
729                                 help="Override default migration mode (choose"
730                                 " either live or non-live")
731
732 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
733                                 help="Target node and optional secondary node",
734                                 metavar="<pnode>[:<snode>]",
735                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
736
737 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
738                            action="append", metavar="<node>",
739                            help="Use only this node (can be used multiple"
740                            " times, if not given defaults to all nodes)",
741                            completion_suggest=OPT_COMPL_ONE_NODE)
742
743 NODEGROUP_OPT = cli_option("-g", "--nodegroup",
744                            dest="nodegroup",
745                            help="Node group (name or uuid)",
746                            metavar="<nodegroup>",
747                            default=None, type="string",
748                            completion_suggest=OPT_COMPL_ONE_NODEGROUP)
749
750 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
751                              metavar="<node>",
752                              completion_suggest=OPT_COMPL_ONE_NODE)
753
754 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
755                          action="store_false",
756                          help="Don't start the instance after creation")
757
758 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
759                          action="store_true", default=False,
760                          help="Show command instead of executing it")
761
762 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
763                          default=False, action="store_true",
764                          help="Instead of performing the migration, try to"
765                          " recover from a failed cleanup. This is safe"
766                          " to run even if the instance is healthy, but it"
767                          " will create extra replication traffic and "
768                          " disrupt briefly the replication (like during the"
769                          " migration")
770
771 STATIC_OPT = cli_option("-s", "--static", dest="static",
772                         action="store_true", default=False,
773                         help="Only show configuration data, not runtime data")
774
775 ALL_OPT = cli_option("--all", dest="show_all",
776                      default=False, action="store_true",
777                      help="Show info on all instances on the cluster."
778                      " This can take a long time to run, use wisely")
779
780 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
781                            action="store_true", default=False,
782                            help="Interactive OS reinstall, lists available"
783                            " OS templates for selection")
784
785 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
786                                  action="store_true", default=False,
787                                  help="Remove the instance from the cluster"
788                                  " configuration even if there are failures"
789                                  " during the removal process")
790
791 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
792                                         dest="ignore_remove_failures",
793                                         action="store_true", default=False,
794                                         help="Remove the instance from the"
795                                         " cluster configuration even if there"
796                                         " are failures during the removal"
797                                         " process")
798
799 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
800                                  action="store_true", default=False,
801                                  help="Remove the instance from the cluster")
802
803 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
804                                help="Specifies the new secondary node",
805                                metavar="NODE", default=None,
806                                completion_suggest=OPT_COMPL_ONE_NODE)
807
808 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
809                             default=False, action="store_true",
810                             help="Replace the disk(s) on the primary"
811                             " node (only for the drbd template)")
812
813 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
814                               default=False, action="store_true",
815                               help="Replace the disk(s) on the secondary"
816                               " node (only for the drbd template)")
817
818 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
819                               default=False, action="store_true",
820                               help="Lock all nodes and auto-promote as needed"
821                               " to MC status")
822
823 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
824                               default=False, action="store_true",
825                               help="Automatically replace faulty disks"
826                               " (only for the drbd template)")
827
828 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
829                              default=False, action="store_true",
830                              help="Ignore current recorded size"
831                              " (useful for forcing activation when"
832                              " the recorded size is wrong)")
833
834 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
835                           metavar="<node>",
836                           completion_suggest=OPT_COMPL_ONE_NODE)
837
838 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
839                          metavar="<dir>")
840
841 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
842                               help="Specify the secondary ip for the node",
843                               metavar="ADDRESS", default=None)
844
845 READD_OPT = cli_option("--readd", dest="readd",
846                        default=False, action="store_true",
847                        help="Readd old node after replacing it")
848
849 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
850                                 default=True, action="store_false",
851                                 help="Disable SSH key fingerprint checking")
852
853
854 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
855                     type="bool", default=None, metavar=_YORNO,
856                     help="Set the master_candidate flag on the node")
857
858 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
859                          type="bool", default=None,
860                          help="Set the offline flag on the node")
861
862 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
863                          type="bool", default=None,
864                          help="Set the drained flag on the node")
865
866 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
867                              type="bool", default=None, metavar=_YORNO,
868                              help="Set the allocatable flag on a volume")
869
870 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
871                                help="Disable support for lvm based instances"
872                                " (cluster-wide)",
873                                action="store_false", default=True)
874
875 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
876                             dest="enabled_hypervisors",
877                             help="Comma-separated list of hypervisors",
878                             type="string", default=None)
879
880 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
881                             type="keyval", default={},
882                             help="NIC parameters")
883
884 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
885                          dest="candidate_pool_size", type="int",
886                          help="Set the candidate pool size")
887
888 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
889                          help="Enables LVM and specifies the volume group"
890                          " name (cluster-wide) for disk allocation [xenvg]",
891                          metavar="VG", default=None)
892
893 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
894                           help="Destroy cluster", action="store_true")
895
896 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
897                           help="Skip node agreement check (dangerous)",
898                           action="store_true", default=False)
899
900 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
901                             help="Specify the mac prefix for the instance IP"
902                             " addresses, in the format XX:XX:XX",
903                             metavar="PREFIX",
904                             default=None)
905
906 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
907                                help="Specify the node interface (cluster-wide)"
908                                " on which the master IP address will be added "
909                                " [%s]" % constants.DEFAULT_BRIDGE,
910                                metavar="NETDEV",
911                                default=constants.DEFAULT_BRIDGE)
912
913 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
914                                 help="Specify the default directory (cluster-"
915                                 "wide) for storing the file-based disks [%s]" %
916                                 constants.DEFAULT_FILE_STORAGE_DIR,
917                                 metavar="DIR",
918                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
919
920 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
921                                    help="Don't modify /etc/hosts",
922                                    action="store_false", default=True)
923
924 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
925                                     help="Don't initialize SSH keys",
926                                     action="store_false", default=True)
927
928 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
929                              help="Enable parseable error messages",
930                              action="store_true", default=False)
931
932 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
933                           help="Skip N+1 memory redundancy tests",
934                           action="store_true", default=False)
935
936 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
937                              help="Type of reboot: soft/hard/full",
938                              default=constants.INSTANCE_REBOOT_HARD,
939                              metavar="<REBOOT>",
940                              choices=list(constants.REBOOT_TYPES))
941
942 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
943                                     dest="ignore_secondaries",
944                                     default=False, action="store_true",
945                                     help="Ignore errors from secondaries")
946
947 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
948                             action="store_false", default=True,
949                             help="Don't shutdown the instance (unsafe)")
950
951 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
952                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
953                          help="Maximum time to wait")
954
955 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
956                          dest="shutdown_timeout", type="int",
957                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
958                          help="Maximum time to wait for instance shutdown")
959
960 INTERVAL_OPT = cli_option("--interval", dest="interval", type="int",
961                           default=None,
962                           help=("Number of seconds between repetions of the"
963                                 " command"))
964
965 EARLY_RELEASE_OPT = cli_option("--early-release",
966                                dest="early_release", default=False,
967                                action="store_true",
968                                help="Release the locks on the secondary"
969                                " node(s) early")
970
971 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
972                                   dest="new_cluster_cert",
973                                   default=False, action="store_true",
974                                   help="Generate a new cluster certificate")
975
976 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
977                            default=None,
978                            help="File containing new RAPI certificate")
979
980 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
981                                default=None, action="store_true",
982                                help=("Generate a new self-signed RAPI"
983                                      " certificate"))
984
985 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
986                                     dest="new_confd_hmac_key",
987                                     default=False, action="store_true",
988                                     help=("Create a new HMAC key for %s" %
989                                           constants.CONFD))
990
991 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
992                                        dest="cluster_domain_secret",
993                                        default=None,
994                                        help=("Load new new cluster domain"
995                                              " secret from file"))
996
997 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
998                                            dest="new_cluster_domain_secret",
999                                            default=False, action="store_true",
1000                                            help=("Create a new cluster domain"
1001                                                  " secret"))
1002
1003 USE_REPL_NET_OPT = cli_option("--use-replication-network",
1004                               dest="use_replication_network",
1005                               help="Whether to use the replication network"
1006                               " for talking to the nodes",
1007                               action="store_true", default=False)
1008
1009 MAINTAIN_NODE_HEALTH_OPT = \
1010     cli_option("--maintain-node-health", dest="maintain_node_health",
1011                metavar=_YORNO, default=None, type="bool",
1012                help="Configure the cluster to automatically maintain node"
1013                " health, by shutting down unknown instances, shutting down"
1014                " unknown DRBD devices, etc.")
1015
1016 IDENTIFY_DEFAULTS_OPT = \
1017     cli_option("--identify-defaults", dest="identify_defaults",
1018                default=False, action="store_true",
1019                help="Identify which saved instance parameters are equal to"
1020                " the current cluster defaults and set them as such, instead"
1021                " of marking them as overridden")
1022
1023 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
1024                          action="store", dest="uid_pool",
1025                          help=("A list of user-ids or user-id"
1026                                " ranges separated by commas"))
1027
1028 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
1029                           action="store", dest="add_uids",
1030                           help=("A list of user-ids or user-id"
1031                                 " ranges separated by commas, to be"
1032                                 " added to the user-id pool"))
1033
1034 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
1035                              action="store", dest="remove_uids",
1036                              help=("A list of user-ids or user-id"
1037                                    " ranges separated by commas, to be"
1038                                    " removed from the user-id pool"))
1039
1040 RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None,
1041                              action="store", dest="reserved_lvs",
1042                              help=("A comma-separated list of reserved"
1043                                    " logical volumes names, that will be"
1044                                    " ignored by cluster verify"))
1045
1046 ROMAN_OPT = cli_option("--roman",
1047                        dest="roman_integers", default=False,
1048                        action="store_true",
1049                        help="Use roman numbers for positive integers")
1050
1051 DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1052                              action="store", default=None,
1053                              help="Specifies usermode helper for DRBD")
1054
1055 NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1056                                 action="store_false", default=True,
1057                                 help="Disable support for DRBD")
1058
1059 PRIMARY_IP_VERSION_OPT = \
1060     cli_option("--primary-ip-version", default=constants.IP4_VERSION,
1061                action="store", dest="primary_ip_version",
1062                metavar="%d|%d" % (constants.IP4_VERSION,
1063                                   constants.IP6_VERSION),
1064                help="Cluster-wide IP version for primary IP")
1065
1066 PRIORITY_OPT = cli_option("--priority", default=None, dest="priority",
1067                           metavar="|".join(name for name, _ in _PRIORITY_NAMES),
1068                           choices=_PRIONAME_TO_VALUE.keys(),
1069                           help="Priority for opcode processing")
1070
1071 HID_OS_OPT = cli_option("--hidden", dest="hidden",
1072                         type="bool", default=None, metavar=_YORNO,
1073                         help="Sets the hidden flag on the OS")
1074
1075 BLK_OS_OPT = cli_option("--blacklisted", dest="blacklisted",
1076                         type="bool", default=None, metavar=_YORNO,
1077                         help="Sets the blacklisted flag on the OS")
1078
1079
1080 #: Options provided by all commands
1081 COMMON_OPTS = [DEBUG_OPT]
1082
1083
1084 def _ParseArgs(argv, commands, aliases):
1085   """Parser for the command line arguments.
1086
1087   This function parses the arguments and returns the function which
1088   must be executed together with its (modified) arguments.
1089
1090   @param argv: the command line
1091   @param commands: dictionary with special contents, see the design
1092       doc for cmdline handling
1093   @param aliases: dictionary with command aliases {'alias': 'target, ...}
1094
1095   """
1096   if len(argv) == 0:
1097     binary = "<command>"
1098   else:
1099     binary = argv[0].split("/")[-1]
1100
1101   if len(argv) > 1 and argv[1] == "--version":
1102     ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1103              constants.RELEASE_VERSION)
1104     # Quit right away. That way we don't have to care about this special
1105     # argument. optparse.py does it the same.
1106     sys.exit(0)
1107
1108   if len(argv) < 2 or not (argv[1] in commands or
1109                            argv[1] in aliases):
1110     # let's do a nice thing
1111     sortedcmds = commands.keys()
1112     sortedcmds.sort()
1113
1114     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1115     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1116     ToStdout("")
1117
1118     # compute the max line length for cmd + usage
1119     mlen = max([len(" %s" % cmd) for cmd in commands])
1120     mlen = min(60, mlen) # should not get here...
1121
1122     # and format a nice command list
1123     ToStdout("Commands:")
1124     for cmd in sortedcmds:
1125       cmdstr = " %s" % (cmd,)
1126       help_text = commands[cmd][4]
1127       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1128       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1129       for line in help_lines:
1130         ToStdout("%-*s   %s", mlen, "", line)
1131
1132     ToStdout("")
1133
1134     return None, None, None
1135
1136   # get command, unalias it, and look it up in commands
1137   cmd = argv.pop(1)
1138   if cmd in aliases:
1139     if cmd in commands:
1140       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1141                                    " command" % cmd)
1142
1143     if aliases[cmd] not in commands:
1144       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1145                                    " command '%s'" % (cmd, aliases[cmd]))
1146
1147     cmd = aliases[cmd]
1148
1149   func, args_def, parser_opts, usage, description = commands[cmd]
1150   parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
1151                         description=description,
1152                         formatter=TitledHelpFormatter(),
1153                         usage="%%prog %s %s" % (cmd, usage))
1154   parser.disable_interspersed_args()
1155   options, args = parser.parse_args()
1156
1157   if not _CheckArguments(cmd, args_def, args):
1158     return None, None, None
1159
1160   return func, options, args
1161
1162
1163 def _CheckArguments(cmd, args_def, args):
1164   """Verifies the arguments using the argument definition.
1165
1166   Algorithm:
1167
1168     1. Abort with error if values specified by user but none expected.
1169
1170     1. For each argument in definition
1171
1172       1. Keep running count of minimum number of values (min_count)
1173       1. Keep running count of maximum number of values (max_count)
1174       1. If it has an unlimited number of values
1175
1176         1. Abort with error if it's not the last argument in the definition
1177
1178     1. If last argument has limited number of values
1179
1180       1. Abort with error if number of values doesn't match or is too large
1181
1182     1. Abort with error if user didn't pass enough values (min_count)
1183
1184   """
1185   if args and not args_def:
1186     ToStderr("Error: Command %s expects no arguments", cmd)
1187     return False
1188
1189   min_count = None
1190   max_count = None
1191   check_max = None
1192
1193   last_idx = len(args_def) - 1
1194
1195   for idx, arg in enumerate(args_def):
1196     if min_count is None:
1197       min_count = arg.min
1198     elif arg.min is not None:
1199       min_count += arg.min
1200
1201     if max_count is None:
1202       max_count = arg.max
1203     elif arg.max is not None:
1204       max_count += arg.max
1205
1206     if idx == last_idx:
1207       check_max = (arg.max is not None)
1208
1209     elif arg.max is None:
1210       raise errors.ProgrammerError("Only the last argument can have max=None")
1211
1212   if check_max:
1213     # Command with exact number of arguments
1214     if (min_count is not None and max_count is not None and
1215         min_count == max_count and len(args) != min_count):
1216       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1217       return False
1218
1219     # Command with limited number of arguments
1220     if max_count is not None and len(args) > max_count:
1221       ToStderr("Error: Command %s expects only %d argument(s)",
1222                cmd, max_count)
1223       return False
1224
1225   # Command with some required arguments
1226   if min_count is not None and len(args) < min_count:
1227     ToStderr("Error: Command %s expects at least %d argument(s)",
1228              cmd, min_count)
1229     return False
1230
1231   return True
1232
1233
1234 def SplitNodeOption(value):
1235   """Splits the value of a --node option.
1236
1237   """
1238   if value and ':' in value:
1239     return value.split(':', 1)
1240   else:
1241     return (value, None)
1242
1243
1244 def CalculateOSNames(os_name, os_variants):
1245   """Calculates all the names an OS can be called, according to its variants.
1246
1247   @type os_name: string
1248   @param os_name: base name of the os
1249   @type os_variants: list or None
1250   @param os_variants: list of supported variants
1251   @rtype: list
1252   @return: list of valid names
1253
1254   """
1255   if os_variants:
1256     return ['%s+%s' % (os_name, v) for v in os_variants]
1257   else:
1258     return [os_name]
1259
1260
1261 def ParseFields(selected, default):
1262   """Parses the values of "--field"-like options.
1263
1264   @type selected: string or None
1265   @param selected: User-selected options
1266   @type default: list
1267   @param default: Default fields
1268
1269   """
1270   if selected is None:
1271     return default
1272
1273   if selected.startswith("+"):
1274     return default + selected[1:].split(",")
1275
1276   return selected.split(",")
1277
1278
1279 UsesRPC = rpc.RunWithRPC
1280
1281
1282 def AskUser(text, choices=None):
1283   """Ask the user a question.
1284
1285   @param text: the question to ask
1286
1287   @param choices: list with elements tuples (input_char, return_value,
1288       description); if not given, it will default to: [('y', True,
1289       'Perform the operation'), ('n', False, 'Do no do the operation')];
1290       note that the '?' char is reserved for help
1291
1292   @return: one of the return values from the choices list; if input is
1293       not possible (i.e. not running with a tty, we return the last
1294       entry from the list
1295
1296   """
1297   if choices is None:
1298     choices = [('y', True, 'Perform the operation'),
1299                ('n', False, 'Do not perform the operation')]
1300   if not choices or not isinstance(choices, list):
1301     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1302   for entry in choices:
1303     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1304       raise errors.ProgrammerError("Invalid choices element to AskUser")
1305
1306   answer = choices[-1][1]
1307   new_text = []
1308   for line in text.splitlines():
1309     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1310   text = "\n".join(new_text)
1311   try:
1312     f = file("/dev/tty", "a+")
1313   except IOError:
1314     return answer
1315   try:
1316     chars = [entry[0] for entry in choices]
1317     chars[-1] = "[%s]" % chars[-1]
1318     chars.append('?')
1319     maps = dict([(entry[0], entry[1]) for entry in choices])
1320     while True:
1321       f.write(text)
1322       f.write('\n')
1323       f.write("/".join(chars))
1324       f.write(": ")
1325       line = f.readline(2).strip().lower()
1326       if line in maps:
1327         answer = maps[line]
1328         break
1329       elif line == '?':
1330         for entry in choices:
1331           f.write(" %s - %s\n" % (entry[0], entry[2]))
1332         f.write("\n")
1333         continue
1334   finally:
1335     f.close()
1336   return answer
1337
1338
1339 class JobSubmittedException(Exception):
1340   """Job was submitted, client should exit.
1341
1342   This exception has one argument, the ID of the job that was
1343   submitted. The handler should print this ID.
1344
1345   This is not an error, just a structured way to exit from clients.
1346
1347   """
1348
1349
1350 def SendJob(ops, cl=None):
1351   """Function to submit an opcode without waiting for the results.
1352
1353   @type ops: list
1354   @param ops: list of opcodes
1355   @type cl: luxi.Client
1356   @param cl: the luxi client to use for communicating with the master;
1357              if None, a new client will be created
1358
1359   """
1360   if cl is None:
1361     cl = GetClient()
1362
1363   job_id = cl.SubmitJob(ops)
1364
1365   return job_id
1366
1367
1368 def GenericPollJob(job_id, cbs, report_cbs):
1369   """Generic job-polling function.
1370
1371   @type job_id: number
1372   @param job_id: Job ID
1373   @type cbs: Instance of L{JobPollCbBase}
1374   @param cbs: Data callbacks
1375   @type report_cbs: Instance of L{JobPollReportCbBase}
1376   @param report_cbs: Reporting callbacks
1377
1378   """
1379   prev_job_info = None
1380   prev_logmsg_serial = None
1381
1382   status = None
1383
1384   while True:
1385     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1386                                       prev_logmsg_serial)
1387     if not result:
1388       # job not found, go away!
1389       raise errors.JobLost("Job with id %s lost" % job_id)
1390
1391     if result == constants.JOB_NOTCHANGED:
1392       report_cbs.ReportNotChanged(job_id, status)
1393
1394       # Wait again
1395       continue
1396
1397     # Split result, a tuple of (field values, log entries)
1398     (job_info, log_entries) = result
1399     (status, ) = job_info
1400
1401     if log_entries:
1402       for log_entry in log_entries:
1403         (serial, timestamp, log_type, message) = log_entry
1404         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1405                                     log_type, message)
1406         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1407
1408     # TODO: Handle canceled and archived jobs
1409     elif status in (constants.JOB_STATUS_SUCCESS,
1410                     constants.JOB_STATUS_ERROR,
1411                     constants.JOB_STATUS_CANCELING,
1412                     constants.JOB_STATUS_CANCELED):
1413       break
1414
1415     prev_job_info = job_info
1416
1417   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1418   if not jobs:
1419     raise errors.JobLost("Job with id %s lost" % job_id)
1420
1421   status, opstatus, result = jobs[0]
1422
1423   if status == constants.JOB_STATUS_SUCCESS:
1424     return result
1425
1426   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1427     raise errors.OpExecError("Job was canceled")
1428
1429   has_ok = False
1430   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1431     if status == constants.OP_STATUS_SUCCESS:
1432       has_ok = True
1433     elif status == constants.OP_STATUS_ERROR:
1434       errors.MaybeRaise(msg)
1435
1436       if has_ok:
1437         raise errors.OpExecError("partial failure (opcode %d): %s" %
1438                                  (idx, msg))
1439
1440       raise errors.OpExecError(str(msg))
1441
1442   # default failure mode
1443   raise errors.OpExecError(result)
1444
1445
1446 class JobPollCbBase:
1447   """Base class for L{GenericPollJob} callbacks.
1448
1449   """
1450   def __init__(self):
1451     """Initializes this class.
1452
1453     """
1454
1455   def WaitForJobChangeOnce(self, job_id, fields,
1456                            prev_job_info, prev_log_serial):
1457     """Waits for changes on a job.
1458
1459     """
1460     raise NotImplementedError()
1461
1462   def QueryJobs(self, job_ids, fields):
1463     """Returns the selected fields for the selected job IDs.
1464
1465     @type job_ids: list of numbers
1466     @param job_ids: Job IDs
1467     @type fields: list of strings
1468     @param fields: Fields
1469
1470     """
1471     raise NotImplementedError()
1472
1473
1474 class JobPollReportCbBase:
1475   """Base class for L{GenericPollJob} reporting callbacks.
1476
1477   """
1478   def __init__(self):
1479     """Initializes this class.
1480
1481     """
1482
1483   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1484     """Handles a log message.
1485
1486     """
1487     raise NotImplementedError()
1488
1489   def ReportNotChanged(self, job_id, status):
1490     """Called for if a job hasn't changed in a while.
1491
1492     @type job_id: number
1493     @param job_id: Job ID
1494     @type status: string or None
1495     @param status: Job status if available
1496
1497     """
1498     raise NotImplementedError()
1499
1500
1501 class _LuxiJobPollCb(JobPollCbBase):
1502   def __init__(self, cl):
1503     """Initializes this class.
1504
1505     """
1506     JobPollCbBase.__init__(self)
1507     self.cl = cl
1508
1509   def WaitForJobChangeOnce(self, job_id, fields,
1510                            prev_job_info, prev_log_serial):
1511     """Waits for changes on a job.
1512
1513     """
1514     return self.cl.WaitForJobChangeOnce(job_id, fields,
1515                                         prev_job_info, prev_log_serial)
1516
1517   def QueryJobs(self, job_ids, fields):
1518     """Returns the selected fields for the selected job IDs.
1519
1520     """
1521     return self.cl.QueryJobs(job_ids, fields)
1522
1523
1524 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1525   def __init__(self, feedback_fn):
1526     """Initializes this class.
1527
1528     """
1529     JobPollReportCbBase.__init__(self)
1530
1531     self.feedback_fn = feedback_fn
1532
1533     assert callable(feedback_fn)
1534
1535   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1536     """Handles a log message.
1537
1538     """
1539     self.feedback_fn((timestamp, log_type, log_msg))
1540
1541   def ReportNotChanged(self, job_id, status):
1542     """Called if a job hasn't changed in a while.
1543
1544     """
1545     # Ignore
1546
1547
1548 class StdioJobPollReportCb(JobPollReportCbBase):
1549   def __init__(self):
1550     """Initializes this class.
1551
1552     """
1553     JobPollReportCbBase.__init__(self)
1554
1555     self.notified_queued = False
1556     self.notified_waitlock = False
1557
1558   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1559     """Handles a log message.
1560
1561     """
1562     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1563              FormatLogMessage(log_type, log_msg))
1564
1565   def ReportNotChanged(self, job_id, status):
1566     """Called if a job hasn't changed in a while.
1567
1568     """
1569     if status is None:
1570       return
1571
1572     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1573       ToStderr("Job %s is waiting in queue", job_id)
1574       self.notified_queued = True
1575
1576     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1577       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1578       self.notified_waitlock = True
1579
1580
1581 def FormatLogMessage(log_type, log_msg):
1582   """Formats a job message according to its type.
1583
1584   """
1585   if log_type != constants.ELOG_MESSAGE:
1586     log_msg = str(log_msg)
1587
1588   return utils.SafeEncode(log_msg)
1589
1590
1591 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1592   """Function to poll for the result of a job.
1593
1594   @type job_id: job identified
1595   @param job_id: the job to poll for results
1596   @type cl: luxi.Client
1597   @param cl: the luxi client to use for communicating with the master;
1598              if None, a new client will be created
1599
1600   """
1601   if cl is None:
1602     cl = GetClient()
1603
1604   if reporter is None:
1605     if feedback_fn:
1606       reporter = FeedbackFnJobPollReportCb(feedback_fn)
1607     else:
1608       reporter = StdioJobPollReportCb()
1609   elif feedback_fn:
1610     raise errors.ProgrammerError("Can't specify reporter and feedback function")
1611
1612   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1613
1614
1615 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1616   """Legacy function to submit an opcode.
1617
1618   This is just a simple wrapper over the construction of the processor
1619   instance. It should be extended to better handle feedback and
1620   interaction functions.
1621
1622   """
1623   if cl is None:
1624     cl = GetClient()
1625
1626   SetGenericOpcodeOpts([op], opts)
1627
1628   job_id = SendJob([op], cl=cl)
1629
1630   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1631                        reporter=reporter)
1632
1633   return op_results[0]
1634
1635
1636 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1637   """Wrapper around SubmitOpCode or SendJob.
1638
1639   This function will decide, based on the 'opts' parameter, whether to
1640   submit and wait for the result of the opcode (and return it), or
1641   whether to just send the job and print its identifier. It is used in
1642   order to simplify the implementation of the '--submit' option.
1643
1644   It will also process the opcodes if we're sending the via SendJob
1645   (otherwise SubmitOpCode does it).
1646
1647   """
1648   if opts and opts.submit_only:
1649     job = [op]
1650     SetGenericOpcodeOpts(job, opts)
1651     job_id = SendJob(job, cl=cl)
1652     raise JobSubmittedException(job_id)
1653   else:
1654     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1655
1656
1657 def SetGenericOpcodeOpts(opcode_list, options):
1658   """Processor for generic options.
1659
1660   This function updates the given opcodes based on generic command
1661   line options (like debug, dry-run, etc.).
1662
1663   @param opcode_list: list of opcodes
1664   @param options: command line options or None
1665   @return: None (in-place modification)
1666
1667   """
1668   if not options:
1669     return
1670   for op in opcode_list:
1671     op.debug_level = options.debug
1672     if hasattr(options, "dry_run"):
1673       op.dry_run = options.dry_run
1674     if getattr(options, "priority", None) is not None:
1675       op.priority = _PRIONAME_TO_VALUE[options.priority]
1676
1677
1678 def GetClient():
1679   # TODO: Cache object?
1680   try:
1681     client = luxi.Client()
1682   except luxi.NoMasterError:
1683     ss = ssconf.SimpleStore()
1684
1685     # Try to read ssconf file
1686     try:
1687       ss.GetMasterNode()
1688     except errors.ConfigurationError:
1689       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1690                                  " not part of a cluster")
1691
1692     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1693     if master != myself:
1694       raise errors.OpPrereqError("This is not the master node, please connect"
1695                                  " to node '%s' and rerun the command" %
1696                                  master)
1697     raise
1698   return client
1699
1700
1701 def FormatError(err):
1702   """Return a formatted error message for a given error.
1703
1704   This function takes an exception instance and returns a tuple
1705   consisting of two values: first, the recommended exit code, and
1706   second, a string describing the error message (not
1707   newline-terminated).
1708
1709   """
1710   retcode = 1
1711   obuf = StringIO()
1712   msg = str(err)
1713   if isinstance(err, errors.ConfigurationError):
1714     txt = "Corrupt configuration file: %s" % msg
1715     logging.error(txt)
1716     obuf.write(txt + "\n")
1717     obuf.write("Aborting.")
1718     retcode = 2
1719   elif isinstance(err, errors.HooksAbort):
1720     obuf.write("Failure: hooks execution failed:\n")
1721     for node, script, out in err.args[0]:
1722       if out:
1723         obuf.write("  node: %s, script: %s, output: %s\n" %
1724                    (node, script, out))
1725       else:
1726         obuf.write("  node: %s, script: %s (no output)\n" %
1727                    (node, script))
1728   elif isinstance(err, errors.HooksFailure):
1729     obuf.write("Failure: hooks general failure: %s" % msg)
1730   elif isinstance(err, errors.ResolverError):
1731     this_host = netutils.Hostname.GetSysName()
1732     if err.args[0] == this_host:
1733       msg = "Failure: can't resolve my own hostname ('%s')"
1734     else:
1735       msg = "Failure: can't resolve hostname '%s'"
1736     obuf.write(msg % err.args[0])
1737   elif isinstance(err, errors.OpPrereqError):
1738     if len(err.args) == 2:
1739       obuf.write("Failure: prerequisites not met for this"
1740                " operation:\nerror type: %s, error details:\n%s" %
1741                  (err.args[1], err.args[0]))
1742     else:
1743       obuf.write("Failure: prerequisites not met for this"
1744                  " operation:\n%s" % msg)
1745   elif isinstance(err, errors.OpExecError):
1746     obuf.write("Failure: command execution error:\n%s" % msg)
1747   elif isinstance(err, errors.TagError):
1748     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1749   elif isinstance(err, errors.JobQueueDrainError):
1750     obuf.write("Failure: the job queue is marked for drain and doesn't"
1751                " accept new requests\n")
1752   elif isinstance(err, errors.JobQueueFull):
1753     obuf.write("Failure: the job queue is full and doesn't accept new"
1754                " job submissions until old jobs are archived\n")
1755   elif isinstance(err, errors.TypeEnforcementError):
1756     obuf.write("Parameter Error: %s" % msg)
1757   elif isinstance(err, errors.ParameterError):
1758     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1759   elif isinstance(err, luxi.NoMasterError):
1760     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1761                " and listening for connections?")
1762   elif isinstance(err, luxi.TimeoutError):
1763     obuf.write("Timeout while talking to the master daemon. Error:\n"
1764                "%s" % msg)
1765   elif isinstance(err, luxi.PermissionError):
1766     obuf.write("It seems you don't have permissions to connect to the"
1767                " master daemon.\nPlease retry as a different user.")
1768   elif isinstance(err, luxi.ProtocolError):
1769     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1770                "%s" % msg)
1771   elif isinstance(err, errors.JobLost):
1772     obuf.write("Error checking job status: %s" % msg)
1773   elif isinstance(err, errors.GenericError):
1774     obuf.write("Unhandled Ganeti error: %s" % msg)
1775   elif isinstance(err, JobSubmittedException):
1776     obuf.write("JobID: %s\n" % err.args[0])
1777     retcode = 0
1778   else:
1779     obuf.write("Unhandled exception: %s" % msg)
1780   return retcode, obuf.getvalue().rstrip('\n')
1781
1782
1783 def GenericMain(commands, override=None, aliases=None):
1784   """Generic main function for all the gnt-* commands.
1785
1786   Arguments:
1787     - commands: a dictionary with a special structure, see the design doc
1788                 for command line handling.
1789     - override: if not None, we expect a dictionary with keys that will
1790                 override command line options; this can be used to pass
1791                 options from the scripts to generic functions
1792     - aliases: dictionary with command aliases {'alias': 'target, ...}
1793
1794   """
1795   # save the program name and the entire command line for later logging
1796   if sys.argv:
1797     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1798     if len(sys.argv) >= 2:
1799       binary += " " + sys.argv[1]
1800       old_cmdline = " ".join(sys.argv[2:])
1801     else:
1802       old_cmdline = ""
1803   else:
1804     binary = "<unknown program>"
1805     old_cmdline = ""
1806
1807   if aliases is None:
1808     aliases = {}
1809
1810   try:
1811     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1812   except errors.ParameterError, err:
1813     result, err_msg = FormatError(err)
1814     ToStderr(err_msg)
1815     return 1
1816
1817   if func is None: # parse error
1818     return 1
1819
1820   if override is not None:
1821     for key, val in override.iteritems():
1822       setattr(options, key, val)
1823
1824   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1825                      stderr_logging=True, program=binary)
1826
1827   if old_cmdline:
1828     logging.info("run with arguments '%s'", old_cmdline)
1829   else:
1830     logging.info("run with no arguments")
1831
1832   try:
1833     result = func(options, args)
1834   except (errors.GenericError, luxi.ProtocolError,
1835           JobSubmittedException), err:
1836     result, err_msg = FormatError(err)
1837     logging.exception("Error during command processing")
1838     ToStderr(err_msg)
1839
1840   return result
1841
1842
1843 def ParseNicOption(optvalue):
1844   """Parses the value of the --net option(s).
1845
1846   """
1847   try:
1848     nic_max = max(int(nidx[0]) + 1 for nidx in optvalue)
1849   except (TypeError, ValueError), err:
1850     raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1851
1852   nics = [{}] * nic_max
1853   for nidx, ndict in optvalue:
1854     nidx = int(nidx)
1855
1856     if not isinstance(ndict, dict):
1857       raise errors.OpPrereqError("Invalid nic/%d value: expected dict,"
1858                                  " got %s" % (nidx, ndict))
1859
1860     utils.ForceDictType(ndict, constants.INIC_PARAMS_TYPES)
1861
1862     nics[nidx] = ndict
1863
1864   return nics
1865
1866
1867 def GenericInstanceCreate(mode, opts, args):
1868   """Add an instance to the cluster via either creation or import.
1869
1870   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1871   @param opts: the command line options selected by the user
1872   @type args: list
1873   @param args: should contain only one element, the new instance name
1874   @rtype: int
1875   @return: the desired exit code
1876
1877   """
1878   instance = args[0]
1879
1880   (pnode, snode) = SplitNodeOption(opts.node)
1881
1882   hypervisor = None
1883   hvparams = {}
1884   if opts.hypervisor:
1885     hypervisor, hvparams = opts.hypervisor
1886
1887   if opts.nics:
1888     nics = ParseNicOption(opts.nics)
1889   elif opts.no_nics:
1890     # no nics
1891     nics = []
1892   elif mode == constants.INSTANCE_CREATE:
1893     # default of one nic, all auto
1894     nics = [{}]
1895   else:
1896     # mode == import
1897     nics = []
1898
1899   if opts.disk_template == constants.DT_DISKLESS:
1900     if opts.disks or opts.sd_size is not None:
1901       raise errors.OpPrereqError("Diskless instance but disk"
1902                                  " information passed")
1903     disks = []
1904   else:
1905     if (not opts.disks and not opts.sd_size
1906         and mode == constants.INSTANCE_CREATE):
1907       raise errors.OpPrereqError("No disk information specified")
1908     if opts.disks and opts.sd_size is not None:
1909       raise errors.OpPrereqError("Please use either the '--disk' or"
1910                                  " '-s' option")
1911     if opts.sd_size is not None:
1912       opts.disks = [(0, {"size": opts.sd_size})]
1913
1914     if opts.disks:
1915       try:
1916         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1917       except ValueError, err:
1918         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1919       disks = [{}] * disk_max
1920     else:
1921       disks = []
1922     for didx, ddict in opts.disks:
1923       didx = int(didx)
1924       if not isinstance(ddict, dict):
1925         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1926         raise errors.OpPrereqError(msg)
1927       elif "size" in ddict:
1928         if "adopt" in ddict:
1929           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1930                                      " (disk %d)" % didx)
1931         try:
1932           ddict["size"] = utils.ParseUnit(ddict["size"])
1933         except ValueError, err:
1934           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1935                                      (didx, err))
1936       elif "adopt" in ddict:
1937         if mode == constants.INSTANCE_IMPORT:
1938           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1939                                      " import")
1940         ddict["size"] = 0
1941       else:
1942         raise errors.OpPrereqError("Missing size or adoption source for"
1943                                    " disk %d" % didx)
1944       disks[didx] = ddict
1945
1946   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1947   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1948
1949   if mode == constants.INSTANCE_CREATE:
1950     start = opts.start
1951     os_type = opts.os
1952     force_variant = opts.force_variant
1953     src_node = None
1954     src_path = None
1955     no_install = opts.no_install
1956     identify_defaults = False
1957   elif mode == constants.INSTANCE_IMPORT:
1958     start = False
1959     os_type = None
1960     force_variant = False
1961     src_node = opts.src_node
1962     src_path = opts.src_dir
1963     no_install = None
1964     identify_defaults = opts.identify_defaults
1965   else:
1966     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1967
1968   op = opcodes.OpCreateInstance(instance_name=instance,
1969                                 disks=disks,
1970                                 disk_template=opts.disk_template,
1971                                 nics=nics,
1972                                 pnode=pnode, snode=snode,
1973                                 ip_check=opts.ip_check,
1974                                 name_check=opts.name_check,
1975                                 wait_for_sync=opts.wait_for_sync,
1976                                 file_storage_dir=opts.file_storage_dir,
1977                                 file_driver=opts.file_driver,
1978                                 iallocator=opts.iallocator,
1979                                 hypervisor=hypervisor,
1980                                 hvparams=hvparams,
1981                                 beparams=opts.beparams,
1982                                 osparams=opts.osparams,
1983                                 mode=mode,
1984                                 start=start,
1985                                 os_type=os_type,
1986                                 force_variant=force_variant,
1987                                 src_node=src_node,
1988                                 src_path=src_path,
1989                                 no_install=no_install,
1990                                 identify_defaults=identify_defaults)
1991
1992   SubmitOrSend(op, opts)
1993   return 0
1994
1995
1996 class _RunWhileClusterStoppedHelper:
1997   """Helper class for L{RunWhileClusterStopped} to simplify state management
1998
1999   """
2000   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
2001     """Initializes this class.
2002
2003     @type feedback_fn: callable
2004     @param feedback_fn: Feedback function
2005     @type cluster_name: string
2006     @param cluster_name: Cluster name
2007     @type master_node: string
2008     @param master_node Master node name
2009     @type online_nodes: list
2010     @param online_nodes: List of names of online nodes
2011
2012     """
2013     self.feedback_fn = feedback_fn
2014     self.cluster_name = cluster_name
2015     self.master_node = master_node
2016     self.online_nodes = online_nodes
2017
2018     self.ssh = ssh.SshRunner(self.cluster_name)
2019
2020     self.nonmaster_nodes = [name for name in online_nodes
2021                             if name != master_node]
2022
2023     assert self.master_node not in self.nonmaster_nodes
2024
2025   def _RunCmd(self, node_name, cmd):
2026     """Runs a command on the local or a remote machine.
2027
2028     @type node_name: string
2029     @param node_name: Machine name
2030     @type cmd: list
2031     @param cmd: Command
2032
2033     """
2034     if node_name is None or node_name == self.master_node:
2035       # No need to use SSH
2036       result = utils.RunCmd(cmd)
2037     else:
2038       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
2039
2040     if result.failed:
2041       errmsg = ["Failed to run command %s" % result.cmd]
2042       if node_name:
2043         errmsg.append("on node %s" % node_name)
2044       errmsg.append(": exitcode %s and error %s" %
2045                     (result.exit_code, result.output))
2046       raise errors.OpExecError(" ".join(errmsg))
2047
2048   def Call(self, fn, *args):
2049     """Call function while all daemons are stopped.
2050
2051     @type fn: callable
2052     @param fn: Function to be called
2053
2054     """
2055     # Pause watcher by acquiring an exclusive lock on watcher state file
2056     self.feedback_fn("Blocking watcher")
2057     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
2058     try:
2059       # TODO: Currently, this just blocks. There's no timeout.
2060       # TODO: Should it be a shared lock?
2061       watcher_block.Exclusive(blocking=True)
2062
2063       # Stop master daemons, so that no new jobs can come in and all running
2064       # ones are finished
2065       self.feedback_fn("Stopping master daemons")
2066       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
2067       try:
2068         # Stop daemons on all nodes
2069         for node_name in self.online_nodes:
2070           self.feedback_fn("Stopping daemons on %s" % node_name)
2071           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
2072
2073         # All daemons are shut down now
2074         try:
2075           return fn(self, *args)
2076         except Exception, err:
2077           _, errmsg = FormatError(err)
2078           logging.exception("Caught exception")
2079           self.feedback_fn(errmsg)
2080           raise
2081       finally:
2082         # Start cluster again, master node last
2083         for node_name in self.nonmaster_nodes + [self.master_node]:
2084           self.feedback_fn("Starting daemons on %s" % node_name)
2085           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
2086     finally:
2087       # Resume watcher
2088       watcher_block.Close()
2089
2090
2091 def RunWhileClusterStopped(feedback_fn, fn, *args):
2092   """Calls a function while all cluster daemons are stopped.
2093
2094   @type feedback_fn: callable
2095   @param feedback_fn: Feedback function
2096   @type fn: callable
2097   @param fn: Function to be called when daemons are stopped
2098
2099   """
2100   feedback_fn("Gathering cluster information")
2101
2102   # This ensures we're running on the master daemon
2103   cl = GetClient()
2104
2105   (cluster_name, master_node) = \
2106     cl.QueryConfigValues(["cluster_name", "master_node"])
2107
2108   online_nodes = GetOnlineNodes([], cl=cl)
2109
2110   # Don't keep a reference to the client. The master daemon will go away.
2111   del cl
2112
2113   assert master_node in online_nodes
2114
2115   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2116                                        online_nodes).Call(fn, *args)
2117
2118
2119 def GenerateTable(headers, fields, separator, data,
2120                   numfields=None, unitfields=None,
2121                   units=None):
2122   """Prints a table with headers and different fields.
2123
2124   @type headers: dict
2125   @param headers: dictionary mapping field names to headers for
2126       the table
2127   @type fields: list
2128   @param fields: the field names corresponding to each row in
2129       the data field
2130   @param separator: the separator to be used; if this is None,
2131       the default 'smart' algorithm is used which computes optimal
2132       field width, otherwise just the separator is used between
2133       each field
2134   @type data: list
2135   @param data: a list of lists, each sublist being one row to be output
2136   @type numfields: list
2137   @param numfields: a list with the fields that hold numeric
2138       values and thus should be right-aligned
2139   @type unitfields: list
2140   @param unitfields: a list with the fields that hold numeric
2141       values that should be formatted with the units field
2142   @type units: string or None
2143   @param units: the units we should use for formatting, or None for
2144       automatic choice (human-readable for non-separator usage, otherwise
2145       megabytes); this is a one-letter string
2146
2147   """
2148   if units is None:
2149     if separator:
2150       units = "m"
2151     else:
2152       units = "h"
2153
2154   if numfields is None:
2155     numfields = []
2156   if unitfields is None:
2157     unitfields = []
2158
2159   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2160   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2161
2162   format_fields = []
2163   for field in fields:
2164     if headers and field not in headers:
2165       # TODO: handle better unknown fields (either revert to old
2166       # style of raising exception, or deal more intelligently with
2167       # variable fields)
2168       headers[field] = field
2169     if separator is not None:
2170       format_fields.append("%s")
2171     elif numfields.Matches(field):
2172       format_fields.append("%*s")
2173     else:
2174       format_fields.append("%-*s")
2175
2176   if separator is None:
2177     mlens = [0 for name in fields]
2178     format_str = ' '.join(format_fields)
2179   else:
2180     format_str = separator.replace("%", "%%").join(format_fields)
2181
2182   for row in data:
2183     if row is None:
2184       continue
2185     for idx, val in enumerate(row):
2186       if unitfields.Matches(fields[idx]):
2187         try:
2188           val = int(val)
2189         except (TypeError, ValueError):
2190           pass
2191         else:
2192           val = row[idx] = utils.FormatUnit(val, units)
2193       val = row[idx] = str(val)
2194       if separator is None:
2195         mlens[idx] = max(mlens[idx], len(val))
2196
2197   result = []
2198   if headers:
2199     args = []
2200     for idx, name in enumerate(fields):
2201       hdr = headers[name]
2202       if separator is None:
2203         mlens[idx] = max(mlens[idx], len(hdr))
2204         args.append(mlens[idx])
2205       args.append(hdr)
2206     result.append(format_str % tuple(args))
2207
2208   if separator is None:
2209     assert len(mlens) == len(fields)
2210
2211     if fields and not numfields.Matches(fields[-1]):
2212       mlens[-1] = 0
2213
2214   for line in data:
2215     args = []
2216     if line is None:
2217       line = ['-' for _ in fields]
2218     for idx in range(len(fields)):
2219       if separator is None:
2220         args.append(mlens[idx])
2221       args.append(line[idx])
2222     result.append(format_str % tuple(args))
2223
2224   return result
2225
2226
2227 def FormatTimestamp(ts):
2228   """Formats a given timestamp.
2229
2230   @type ts: timestamp
2231   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2232
2233   @rtype: string
2234   @return: a string with the formatted timestamp
2235
2236   """
2237   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2238     return '?'
2239   sec, usec = ts
2240   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2241
2242
2243 def ParseTimespec(value):
2244   """Parse a time specification.
2245
2246   The following suffixed will be recognized:
2247
2248     - s: seconds
2249     - m: minutes
2250     - h: hours
2251     - d: day
2252     - w: weeks
2253
2254   Without any suffix, the value will be taken to be in seconds.
2255
2256   """
2257   value = str(value)
2258   if not value:
2259     raise errors.OpPrereqError("Empty time specification passed")
2260   suffix_map = {
2261     's': 1,
2262     'm': 60,
2263     'h': 3600,
2264     'd': 86400,
2265     'w': 604800,
2266     }
2267   if value[-1] not in suffix_map:
2268     try:
2269       value = int(value)
2270     except (TypeError, ValueError):
2271       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2272   else:
2273     multiplier = suffix_map[value[-1]]
2274     value = value[:-1]
2275     if not value: # no data left after stripping the suffix
2276       raise errors.OpPrereqError("Invalid time specification (only"
2277                                  " suffix passed)")
2278     try:
2279       value = int(value) * multiplier
2280     except (TypeError, ValueError):
2281       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2282   return value
2283
2284
2285 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2286                    filter_master=False):
2287   """Returns the names of online nodes.
2288
2289   This function will also log a warning on stderr with the names of
2290   the online nodes.
2291
2292   @param nodes: if not empty, use only this subset of nodes (minus the
2293       offline ones)
2294   @param cl: if not None, luxi client to use
2295   @type nowarn: boolean
2296   @param nowarn: by default, this function will output a note with the
2297       offline nodes that are skipped; if this parameter is True the
2298       note is not displayed
2299   @type secondary_ips: boolean
2300   @param secondary_ips: if True, return the secondary IPs instead of the
2301       names, useful for doing network traffic over the replication interface
2302       (if any)
2303   @type filter_master: boolean
2304   @param filter_master: if True, do not return the master node in the list
2305       (useful in coordination with secondary_ips where we cannot check our
2306       node name against the list)
2307
2308   """
2309   if cl is None:
2310     cl = GetClient()
2311
2312   if secondary_ips:
2313     name_idx = 2
2314   else:
2315     name_idx = 0
2316
2317   if filter_master:
2318     master_node = cl.QueryConfigValues(["master_node"])[0]
2319     filter_fn = lambda x: x != master_node
2320   else:
2321     filter_fn = lambda _: True
2322
2323   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2324                          use_locking=False)
2325   offline = [row[0] for row in result if row[1]]
2326   if offline and not nowarn:
2327     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2328   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2329
2330
2331 def _ToStream(stream, txt, *args):
2332   """Write a message to a stream, bypassing the logging system
2333
2334   @type stream: file object
2335   @param stream: the file to which we should write
2336   @type txt: str
2337   @param txt: the message
2338
2339   """
2340   if args:
2341     args = tuple(args)
2342     stream.write(txt % args)
2343   else:
2344     stream.write(txt)
2345   stream.write('\n')
2346   stream.flush()
2347
2348
2349 def ToStdout(txt, *args):
2350   """Write a message to stdout only, bypassing the logging system
2351
2352   This is just a wrapper over _ToStream.
2353
2354   @type txt: str
2355   @param txt: the message
2356
2357   """
2358   _ToStream(sys.stdout, txt, *args)
2359
2360
2361 def ToStderr(txt, *args):
2362   """Write a message to stderr only, bypassing the logging system
2363
2364   This is just a wrapper over _ToStream.
2365
2366   @type txt: str
2367   @param txt: the message
2368
2369   """
2370   _ToStream(sys.stderr, txt, *args)
2371
2372
2373 class JobExecutor(object):
2374   """Class which manages the submission and execution of multiple jobs.
2375
2376   Note that instances of this class should not be reused between
2377   GetResults() calls.
2378
2379   """
2380   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2381     self.queue = []
2382     if cl is None:
2383       cl = GetClient()
2384     self.cl = cl
2385     self.verbose = verbose
2386     self.jobs = []
2387     self.opts = opts
2388     self.feedback_fn = feedback_fn
2389
2390   def QueueJob(self, name, *ops):
2391     """Record a job for later submit.
2392
2393     @type name: string
2394     @param name: a description of the job, will be used in WaitJobSet
2395     """
2396     SetGenericOpcodeOpts(ops, self.opts)
2397     self.queue.append((name, ops))
2398
2399   def SubmitPending(self, each=False):
2400     """Submit all pending jobs.
2401
2402     """
2403     if each:
2404       results = []
2405       for row in self.queue:
2406         # SubmitJob will remove the success status, but raise an exception if
2407         # the submission fails, so we'll notice that anyway.
2408         results.append([True, self.cl.SubmitJob(row[1])])
2409     else:
2410       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2411     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2412                                                             self.queue)):
2413       self.jobs.append((idx, status, data, name))
2414
2415   def _ChooseJob(self):
2416     """Choose a non-waiting/queued job to poll next.
2417
2418     """
2419     assert self.jobs, "_ChooseJob called with empty job list"
2420
2421     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2422     assert result
2423
2424     for job_data, status in zip(self.jobs, result):
2425       if (isinstance(status, list) and status and
2426           status[0] in (constants.JOB_STATUS_QUEUED,
2427                         constants.JOB_STATUS_WAITLOCK,
2428                         constants.JOB_STATUS_CANCELING)):
2429         # job is still present and waiting
2430         continue
2431       # good candidate found (either running job or lost job)
2432       self.jobs.remove(job_data)
2433       return job_data
2434
2435     # no job found
2436     return self.jobs.pop(0)
2437
2438   def GetResults(self):
2439     """Wait for and return the results of all jobs.
2440
2441     @rtype: list
2442     @return: list of tuples (success, job results), in the same order
2443         as the submitted jobs; if a job has failed, instead of the result
2444         there will be the error message
2445
2446     """
2447     if not self.jobs:
2448       self.SubmitPending()
2449     results = []
2450     if self.verbose:
2451       ok_jobs = [row[2] for row in self.jobs if row[1]]
2452       if ok_jobs:
2453         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2454
2455     # first, remove any non-submitted jobs
2456     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2457     for idx, _, jid, name in failures:
2458       ToStderr("Failed to submit job for %s: %s", name, jid)
2459       results.append((idx, False, jid))
2460
2461     while self.jobs:
2462       (idx, _, jid, name) = self._ChooseJob()
2463       ToStdout("Waiting for job %s for %s...", jid, name)
2464       try:
2465         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2466         success = True
2467       except errors.JobLost, err:
2468         _, job_result = FormatError(err)
2469         ToStderr("Job %s for %s has been archived, cannot check its result",
2470                  jid, name)
2471         success = False
2472       except (errors.GenericError, luxi.ProtocolError), err:
2473         _, job_result = FormatError(err)
2474         success = False
2475         # the error message will always be shown, verbose or not
2476         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2477
2478       results.append((idx, success, job_result))
2479
2480     # sort based on the index, then drop it
2481     results.sort()
2482     results = [i[1:] for i in results]
2483
2484     return results
2485
2486   def WaitOrShow(self, wait):
2487     """Wait for job results or only print the job IDs.
2488
2489     @type wait: boolean
2490     @param wait: whether to wait or not
2491
2492     """
2493     if wait:
2494       return self.GetResults()
2495     else:
2496       if not self.jobs:
2497         self.SubmitPending()
2498       for _, status, result, name in self.jobs:
2499         if status:
2500           ToStdout("%s: %s", result, name)
2501         else:
2502           ToStderr("Failure for %s: %s", name, result)
2503       return [row[1:3] for row in self.jobs]