Merge branch 'stable-2.2' into devel-2.2
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41 from ganeti import netutils
42
43 from optparse import (OptionParser, TitledHelpFormatter,
44                       Option, OptionValueError)
45
46
47 __all__ = [
48   # Command line options
49   "ADD_UIDS_OPT",
50   "ALLOCATABLE_OPT",
51   "ALL_OPT",
52   "AUTO_PROMOTE_OPT",
53   "AUTO_REPLACE_OPT",
54   "BACKEND_OPT",
55   "BLK_OS_OPT",
56   "CLEANUP_OPT",
57   "CLUSTER_DOMAIN_SECRET_OPT",
58   "CONFIRM_OPT",
59   "CP_SIZE_OPT",
60   "DEBUG_OPT",
61   "DEBUG_SIMERR_OPT",
62   "DISKIDX_OPT",
63   "DISK_OPT",
64   "DISK_TEMPLATE_OPT",
65   "DRAINED_OPT",
66   "DRY_RUN_OPT",
67   "DRBD_HELPER_OPT",
68   "EARLY_RELEASE_OPT",
69   "ENABLED_HV_OPT",
70   "ERROR_CODES_OPT",
71   "FIELDS_OPT",
72   "FILESTORE_DIR_OPT",
73   "FILESTORE_DRIVER_OPT",
74   "FORCE_OPT",
75   "FORCE_VARIANT_OPT",
76   "GLOBAL_FILEDIR_OPT",
77   "HID_OS_OPT",
78   "HVLIST_OPT",
79   "HVOPTS_OPT",
80   "HYPERVISOR_OPT",
81   "IALLOCATOR_OPT",
82   "DEFAULT_IALLOCATOR_OPT",
83   "IDENTIFY_DEFAULTS_OPT",
84   "IGNORE_CONSIST_OPT",
85   "IGNORE_FAILURES_OPT",
86   "IGNORE_REMOVE_FAILURES_OPT",
87   "IGNORE_SECONDARIES_OPT",
88   "IGNORE_SIZE_OPT",
89   "INTERVAL_OPT",
90   "MAC_PREFIX_OPT",
91   "MAINTAIN_NODE_HEALTH_OPT",
92   "MASTER_NETDEV_OPT",
93   "MC_OPT",
94   "MIGRATION_MODE_OPT",
95   "NET_OPT",
96   "NEW_CLUSTER_CERT_OPT",
97   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
98   "NEW_CONFD_HMAC_KEY_OPT",
99   "NEW_RAPI_CERT_OPT",
100   "NEW_SECONDARY_OPT",
101   "NIC_PARAMS_OPT",
102   "NODE_LIST_OPT",
103   "NODE_PLACEMENT_OPT",
104   "NODRBD_STORAGE_OPT",
105   "NOHDR_OPT",
106   "NOIPCHECK_OPT",
107   "NO_INSTALL_OPT",
108   "NONAMECHECK_OPT",
109   "NOLVM_STORAGE_OPT",
110   "NOMODIFY_ETCHOSTS_OPT",
111   "NOMODIFY_SSH_SETUP_OPT",
112   "NONICS_OPT",
113   "NONLIVE_OPT",
114   "NONPLUS1_OPT",
115   "NOSHUTDOWN_OPT",
116   "NOSTART_OPT",
117   "NOSSH_KEYCHECK_OPT",
118   "NOVOTING_OPT",
119   "NWSYNC_OPT",
120   "ON_PRIMARY_OPT",
121   "ON_SECONDARY_OPT",
122   "OFFLINE_OPT",
123   "OSPARAMS_OPT",
124   "OS_OPT",
125   "OS_SIZE_OPT",
126   "RAPI_CERT_OPT",
127   "READD_OPT",
128   "REBOOT_TYPE_OPT",
129   "REMOVE_INSTANCE_OPT",
130   "REMOVE_UIDS_OPT",
131   "RESERVED_LVS_OPT",
132   "ROMAN_OPT",
133   "SECONDARY_IP_OPT",
134   "SELECT_OS_OPT",
135   "SEP_OPT",
136   "SHOWCMD_OPT",
137   "SHUTDOWN_TIMEOUT_OPT",
138   "SINGLE_NODE_OPT",
139   "SRC_DIR_OPT",
140   "SRC_NODE_OPT",
141   "SUBMIT_OPT",
142   "STATIC_OPT",
143   "SYNC_OPT",
144   "TAG_SRC_OPT",
145   "TIMEOUT_OPT",
146   "UIDPOOL_OPT",
147   "USEUNITS_OPT",
148   "USE_REPL_NET_OPT",
149   "VERBOSE_OPT",
150   "VG_NAME_OPT",
151   "YES_DOIT_OPT",
152   # Generic functions for CLI programs
153   "GenericMain",
154   "GenericInstanceCreate",
155   "GetClient",
156   "GetOnlineNodes",
157   "JobExecutor",
158   "JobSubmittedException",
159   "ParseTimespec",
160   "RunWhileClusterStopped",
161   "SubmitOpCode",
162   "SubmitOrSend",
163   "UsesRPC",
164   # Formatting functions
165   "ToStderr", "ToStdout",
166   "FormatError",
167   "GenerateTable",
168   "AskUser",
169   "FormatTimestamp",
170   "FormatLogMessage",
171   # Tags functions
172   "ListTags",
173   "AddTags",
174   "RemoveTags",
175   # command line options support infrastructure
176   "ARGS_MANY_INSTANCES",
177   "ARGS_MANY_NODES",
178   "ARGS_NONE",
179   "ARGS_ONE_INSTANCE",
180   "ARGS_ONE_NODE",
181   "ARGS_ONE_OS",
182   "ArgChoice",
183   "ArgCommand",
184   "ArgFile",
185   "ArgHost",
186   "ArgInstance",
187   "ArgJobId",
188   "ArgNode",
189   "ArgOs",
190   "ArgSuggest",
191   "ArgUnknown",
192   "OPT_COMPL_INST_ADD_NODES",
193   "OPT_COMPL_MANY_NODES",
194   "OPT_COMPL_ONE_IALLOCATOR",
195   "OPT_COMPL_ONE_INSTANCE",
196   "OPT_COMPL_ONE_NODE",
197   "OPT_COMPL_ONE_OS",
198   "cli_option",
199   "SplitNodeOption",
200   "CalculateOSNames",
201   "ParseFields",
202   ]
203
204 NO_PREFIX = "no_"
205 UN_PREFIX = "-"
206
207
208 class _Argument:
209   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
210     self.min = min
211     self.max = max
212
213   def __repr__(self):
214     return ("<%s min=%s max=%s>" %
215             (self.__class__.__name__, self.min, self.max))
216
217
218 class ArgSuggest(_Argument):
219   """Suggesting argument.
220
221   Value can be any of the ones passed to the constructor.
222
223   """
224   # pylint: disable-msg=W0622
225   def __init__(self, min=0, max=None, choices=None):
226     _Argument.__init__(self, min=min, max=max)
227     self.choices = choices
228
229   def __repr__(self):
230     return ("<%s min=%s max=%s choices=%r>" %
231             (self.__class__.__name__, self.min, self.max, self.choices))
232
233
234 class ArgChoice(ArgSuggest):
235   """Choice argument.
236
237   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
238   but value must be one of the choices.
239
240   """
241
242
243 class ArgUnknown(_Argument):
244   """Unknown argument to program (e.g. determined at runtime).
245
246   """
247
248
249 class ArgInstance(_Argument):
250   """Instances argument.
251
252   """
253
254
255 class ArgNode(_Argument):
256   """Node argument.
257
258   """
259
260 class ArgJobId(_Argument):
261   """Job ID argument.
262
263   """
264
265
266 class ArgFile(_Argument):
267   """File path argument.
268
269   """
270
271
272 class ArgCommand(_Argument):
273   """Command argument.
274
275   """
276
277
278 class ArgHost(_Argument):
279   """Host argument.
280
281   """
282
283
284 class ArgOs(_Argument):
285   """OS argument.
286
287   """
288
289
290 ARGS_NONE = []
291 ARGS_MANY_INSTANCES = [ArgInstance()]
292 ARGS_MANY_NODES = [ArgNode()]
293 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
294 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
295 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
296
297
298 def _ExtractTagsObject(opts, args):
299   """Extract the tag type object.
300
301   Note that this function will modify its args parameter.
302
303   """
304   if not hasattr(opts, "tag_type"):
305     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
306   kind = opts.tag_type
307   if kind == constants.TAG_CLUSTER:
308     retval = kind, kind
309   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
310     if not args:
311       raise errors.OpPrereqError("no arguments passed to the command")
312     name = args.pop(0)
313     retval = kind, name
314   else:
315     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
316   return retval
317
318
319 def _ExtendTags(opts, args):
320   """Extend the args if a source file has been given.
321
322   This function will extend the tags with the contents of the file
323   passed in the 'tags_source' attribute of the opts parameter. A file
324   named '-' will be replaced by stdin.
325
326   """
327   fname = opts.tags_source
328   if fname is None:
329     return
330   if fname == "-":
331     new_fh = sys.stdin
332   else:
333     new_fh = open(fname, "r")
334   new_data = []
335   try:
336     # we don't use the nice 'new_data = [line.strip() for line in fh]'
337     # because of python bug 1633941
338     while True:
339       line = new_fh.readline()
340       if not line:
341         break
342       new_data.append(line.strip())
343   finally:
344     new_fh.close()
345   args.extend(new_data)
346
347
348 def ListTags(opts, args):
349   """List the tags on a given object.
350
351   This is a generic implementation that knows how to deal with all
352   three cases of tag objects (cluster, node, instance). The opts
353   argument is expected to contain a tag_type field denoting what
354   object type we work on.
355
356   """
357   kind, name = _ExtractTagsObject(opts, args)
358   cl = GetClient()
359   result = cl.QueryTags(kind, name)
360   result = list(result)
361   result.sort()
362   for tag in result:
363     ToStdout(tag)
364
365
366 def AddTags(opts, args):
367   """Add tags on a given object.
368
369   This is a generic implementation that knows how to deal with all
370   three cases of tag objects (cluster, node, instance). The opts
371   argument is expected to contain a tag_type field denoting what
372   object type we work on.
373
374   """
375   kind, name = _ExtractTagsObject(opts, args)
376   _ExtendTags(opts, args)
377   if not args:
378     raise errors.OpPrereqError("No tags to be added")
379   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
380   SubmitOpCode(op, opts=opts)
381
382
383 def RemoveTags(opts, args):
384   """Remove tags from a given object.
385
386   This is a generic implementation that knows how to deal with all
387   three cases of tag objects (cluster, node, instance). The opts
388   argument is expected to contain a tag_type field denoting what
389   object type we work on.
390
391   """
392   kind, name = _ExtractTagsObject(opts, args)
393   _ExtendTags(opts, args)
394   if not args:
395     raise errors.OpPrereqError("No tags to be removed")
396   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
397   SubmitOpCode(op, opts=opts)
398
399
400 def check_unit(option, opt, value): # pylint: disable-msg=W0613
401   """OptParsers custom converter for units.
402
403   """
404   try:
405     return utils.ParseUnit(value)
406   except errors.UnitParseError, err:
407     raise OptionValueError("option %s: %s" % (opt, err))
408
409
410 def _SplitKeyVal(opt, data):
411   """Convert a KeyVal string into a dict.
412
413   This function will convert a key=val[,...] string into a dict. Empty
414   values will be converted specially: keys which have the prefix 'no_'
415   will have the value=False and the prefix stripped, the others will
416   have value=True.
417
418   @type opt: string
419   @param opt: a string holding the option name for which we process the
420       data, used in building error messages
421   @type data: string
422   @param data: a string of the format key=val,key=val,...
423   @rtype: dict
424   @return: {key=val, key=val}
425   @raises errors.ParameterError: if there are duplicate keys
426
427   """
428   kv_dict = {}
429   if data:
430     for elem in utils.UnescapeAndSplit(data, sep=","):
431       if "=" in elem:
432         key, val = elem.split("=", 1)
433       else:
434         if elem.startswith(NO_PREFIX):
435           key, val = elem[len(NO_PREFIX):], False
436         elif elem.startswith(UN_PREFIX):
437           key, val = elem[len(UN_PREFIX):], None
438         else:
439           key, val = elem, True
440       if key in kv_dict:
441         raise errors.ParameterError("Duplicate key '%s' in option %s" %
442                                     (key, opt))
443       kv_dict[key] = val
444   return kv_dict
445
446
447 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
448   """Custom parser for ident:key=val,key=val options.
449
450   This will store the parsed values as a tuple (ident, {key: val}). As such,
451   multiple uses of this option via action=append is possible.
452
453   """
454   if ":" not in value:
455     ident, rest = value, ''
456   else:
457     ident, rest = value.split(":", 1)
458
459   if ident.startswith(NO_PREFIX):
460     if rest:
461       msg = "Cannot pass options when removing parameter groups: %s" % value
462       raise errors.ParameterError(msg)
463     retval = (ident[len(NO_PREFIX):], False)
464   elif ident.startswith(UN_PREFIX):
465     if rest:
466       msg = "Cannot pass options when removing parameter groups: %s" % value
467       raise errors.ParameterError(msg)
468     retval = (ident[len(UN_PREFIX):], None)
469   else:
470     kv_dict = _SplitKeyVal(opt, rest)
471     retval = (ident, kv_dict)
472   return retval
473
474
475 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
476   """Custom parser class for key=val,key=val options.
477
478   This will store the parsed values as a dict {key: val}.
479
480   """
481   return _SplitKeyVal(opt, value)
482
483
484 def check_bool(option, opt, value): # pylint: disable-msg=W0613
485   """Custom parser for yes/no options.
486
487   This will store the parsed value as either True or False.
488
489   """
490   value = value.lower()
491   if value == constants.VALUE_FALSE or value == "no":
492     return False
493   elif value == constants.VALUE_TRUE or value == "yes":
494     return True
495   else:
496     raise errors.ParameterError("Invalid boolean value '%s'" % value)
497
498
499 # completion_suggestion is normally a list. Using numeric values not evaluating
500 # to False for dynamic completion.
501 (OPT_COMPL_MANY_NODES,
502  OPT_COMPL_ONE_NODE,
503  OPT_COMPL_ONE_INSTANCE,
504  OPT_COMPL_ONE_OS,
505  OPT_COMPL_ONE_IALLOCATOR,
506  OPT_COMPL_INST_ADD_NODES) = range(100, 106)
507
508 OPT_COMPL_ALL = frozenset([
509   OPT_COMPL_MANY_NODES,
510   OPT_COMPL_ONE_NODE,
511   OPT_COMPL_ONE_INSTANCE,
512   OPT_COMPL_ONE_OS,
513   OPT_COMPL_ONE_IALLOCATOR,
514   OPT_COMPL_INST_ADD_NODES,
515   ])
516
517
518 class CliOption(Option):
519   """Custom option class for optparse.
520
521   """
522   ATTRS = Option.ATTRS + [
523     "completion_suggest",
524     ]
525   TYPES = Option.TYPES + (
526     "identkeyval",
527     "keyval",
528     "unit",
529     "bool",
530     )
531   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
532   TYPE_CHECKER["identkeyval"] = check_ident_key_val
533   TYPE_CHECKER["keyval"] = check_key_val
534   TYPE_CHECKER["unit"] = check_unit
535   TYPE_CHECKER["bool"] = check_bool
536
537
538 # optparse.py sets make_option, so we do it for our own option class, too
539 cli_option = CliOption
540
541
542 _YORNO = "yes|no"
543
544 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
545                        help="Increase debugging level")
546
547 NOHDR_OPT = cli_option("--no-headers", default=False,
548                        action="store_true", dest="no_headers",
549                        help="Don't display column headers")
550
551 SEP_OPT = cli_option("--separator", default=None,
552                      action="store", dest="separator",
553                      help=("Separator between output fields"
554                            " (defaults to one space)"))
555
556 USEUNITS_OPT = cli_option("--units", default=None,
557                           dest="units", choices=('h', 'm', 'g', 't'),
558                           help="Specify units for output (one of hmgt)")
559
560 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
561                         type="string", metavar="FIELDS",
562                         help="Comma separated list of output fields")
563
564 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
565                        default=False, help="Force the operation")
566
567 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
568                          default=False, help="Do not require confirmation")
569
570 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
571                          default=None, help="File with tag names")
572
573 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
574                         default=False, action="store_true",
575                         help=("Submit the job and return the job ID, but"
576                               " don't wait for the job to finish"))
577
578 SYNC_OPT = cli_option("--sync", dest="do_locking",
579                       default=False, action="store_true",
580                       help=("Grab locks while doing the queries"
581                             " in order to ensure more consistent results"))
582
583 DRY_RUN_OPT = cli_option("--dry-run", default=False,
584                          action="store_true",
585                          help=("Do not execute the operation, just run the"
586                                " check steps and verify it it could be"
587                                " executed"))
588
589 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
590                          action="store_true",
591                          help="Increase the verbosity of the operation")
592
593 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
594                               action="store_true", dest="simulate_errors",
595                               help="Debugging option that makes the operation"
596                               " treat most runtime checks as failed")
597
598 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
599                         default=True, action="store_false",
600                         help="Don't wait for sync (DANGEROUS!)")
601
602 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
603                                help="Custom disk setup (diskless, file,"
604                                " plain or drbd)",
605                                default=None, metavar="TEMPL",
606                                choices=list(constants.DISK_TEMPLATES))
607
608 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
609                         help="Do not create any network cards for"
610                         " the instance")
611
612 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
613                                help="Relative path under default cluster-wide"
614                                " file storage dir to store file-based disks",
615                                default=None, metavar="<DIR>")
616
617 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
618                                   help="Driver to use for image files",
619                                   default="loop", metavar="<DRIVER>",
620                                   choices=list(constants.FILE_DRIVER))
621
622 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
623                             help="Select nodes for the instance automatically"
624                             " using the <NAME> iallocator plugin",
625                             default=None, type="string",
626                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
627
628 DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
629                             metavar="<NAME>",
630                             help="Set the default instance allocator plugin",
631                             default=None, type="string",
632                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
633
634 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
635                     metavar="<os>",
636                     completion_suggest=OPT_COMPL_ONE_OS)
637
638 OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
639                          type="keyval", default={},
640                          help="OS parameters")
641
642 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
643                                action="store_true", default=False,
644                                help="Force an unknown variant")
645
646 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
647                             action="store_true", default=False,
648                             help="Do not install the OS (will"
649                             " enable no-start)")
650
651 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
652                          type="keyval", default={},
653                          help="Backend parameters")
654
655 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
656                          default={}, dest="hvparams",
657                          help="Hypervisor parameters")
658
659 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
660                             help="Hypervisor and hypervisor options, in the"
661                             " format hypervisor:option=value,option=value,...",
662                             default=None, type="identkeyval")
663
664 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
665                         help="Hypervisor and hypervisor options, in the"
666                         " format hypervisor:option=value,option=value,...",
667                         default=[], action="append", type="identkeyval")
668
669 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
670                            action="store_false",
671                            help="Don't check that the instance's IP"
672                            " is alive")
673
674 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
675                              default=True, action="store_false",
676                              help="Don't check that the instance's name"
677                              " is resolvable")
678
679 NET_OPT = cli_option("--net",
680                      help="NIC parameters", default=[],
681                      dest="nics", action="append", type="identkeyval")
682
683 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
684                       dest="disks", action="append", type="identkeyval")
685
686 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
687                          help="Comma-separated list of disks"
688                          " indices to act on (e.g. 0,2) (optional,"
689                          " defaults to all disks)")
690
691 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
692                          help="Enforces a single-disk configuration using the"
693                          " given disk size, in MiB unless a suffix is used",
694                          default=None, type="unit", metavar="<size>")
695
696 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
697                                 dest="ignore_consistency",
698                                 action="store_true", default=False,
699                                 help="Ignore the consistency of the disks on"
700                                 " the secondary")
701
702 NONLIVE_OPT = cli_option("--non-live", dest="live",
703                          default=True, action="store_false",
704                          help="Do a non-live migration (this usually means"
705                          " freeze the instance, save the state, transfer and"
706                          " only then resume running on the secondary node)")
707
708 MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode",
709                                 default=None,
710                                 choices=list(constants.HT_MIGRATION_MODES),
711                                 help="Override default migration mode (choose"
712                                 " either live or non-live")
713
714 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
715                                 help="Target node and optional secondary node",
716                                 metavar="<pnode>[:<snode>]",
717                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
718
719 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
720                            action="append", metavar="<node>",
721                            help="Use only this node (can be used multiple"
722                            " times, if not given defaults to all nodes)",
723                            completion_suggest=OPT_COMPL_ONE_NODE)
724
725 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
726                              metavar="<node>",
727                              completion_suggest=OPT_COMPL_ONE_NODE)
728
729 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
730                          action="store_false",
731                          help="Don't start the instance after creation")
732
733 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
734                          action="store_true", default=False,
735                          help="Show command instead of executing it")
736
737 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
738                          default=False, action="store_true",
739                          help="Instead of performing the migration, try to"
740                          " recover from a failed cleanup. This is safe"
741                          " to run even if the instance is healthy, but it"
742                          " will create extra replication traffic and "
743                          " disrupt briefly the replication (like during the"
744                          " migration")
745
746 STATIC_OPT = cli_option("-s", "--static", dest="static",
747                         action="store_true", default=False,
748                         help="Only show configuration data, not runtime data")
749
750 ALL_OPT = cli_option("--all", dest="show_all",
751                      default=False, action="store_true",
752                      help="Show info on all instances on the cluster."
753                      " This can take a long time to run, use wisely")
754
755 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
756                            action="store_true", default=False,
757                            help="Interactive OS reinstall, lists available"
758                            " OS templates for selection")
759
760 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
761                                  action="store_true", default=False,
762                                  help="Remove the instance from the cluster"
763                                  " configuration even if there are failures"
764                                  " during the removal process")
765
766 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
767                                         dest="ignore_remove_failures",
768                                         action="store_true", default=False,
769                                         help="Remove the instance from the"
770                                         " cluster configuration even if there"
771                                         " are failures during the removal"
772                                         " process")
773
774 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
775                                  action="store_true", default=False,
776                                  help="Remove the instance from the cluster")
777
778 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
779                                help="Specifies the new secondary node",
780                                metavar="NODE", default=None,
781                                completion_suggest=OPT_COMPL_ONE_NODE)
782
783 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
784                             default=False, action="store_true",
785                             help="Replace the disk(s) on the primary"
786                             " node (only for the drbd template)")
787
788 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
789                               default=False, action="store_true",
790                               help="Replace the disk(s) on the secondary"
791                               " node (only for the drbd template)")
792
793 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
794                               default=False, action="store_true",
795                               help="Lock all nodes and auto-promote as needed"
796                               " to MC status")
797
798 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
799                               default=False, action="store_true",
800                               help="Automatically replace faulty disks"
801                               " (only for the drbd template)")
802
803 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
804                              default=False, action="store_true",
805                              help="Ignore current recorded size"
806                              " (useful for forcing activation when"
807                              " the recorded size is wrong)")
808
809 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
810                           metavar="<node>",
811                           completion_suggest=OPT_COMPL_ONE_NODE)
812
813 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
814                          metavar="<dir>")
815
816 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
817                               help="Specify the secondary ip for the node",
818                               metavar="ADDRESS", default=None)
819
820 READD_OPT = cli_option("--readd", dest="readd",
821                        default=False, action="store_true",
822                        help="Readd old node after replacing it")
823
824 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
825                                 default=True, action="store_false",
826                                 help="Disable SSH key fingerprint checking")
827
828
829 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
830                     type="bool", default=None, metavar=_YORNO,
831                     help="Set the master_candidate flag on the node")
832
833 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
834                          type="bool", default=None,
835                          help="Set the offline flag on the node")
836
837 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
838                          type="bool", default=None,
839                          help="Set the drained flag on the node")
840
841 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
842                              type="bool", default=None, metavar=_YORNO,
843                              help="Set the allocatable flag on a volume")
844
845 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
846                                help="Disable support for lvm based instances"
847                                " (cluster-wide)",
848                                action="store_false", default=True)
849
850 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
851                             dest="enabled_hypervisors",
852                             help="Comma-separated list of hypervisors",
853                             type="string", default=None)
854
855 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
856                             type="keyval", default={},
857                             help="NIC parameters")
858
859 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
860                          dest="candidate_pool_size", type="int",
861                          help="Set the candidate pool size")
862
863 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
864                          help="Enables LVM and specifies the volume group"
865                          " name (cluster-wide) for disk allocation [xenvg]",
866                          metavar="VG", default=None)
867
868 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
869                           help="Destroy cluster", action="store_true")
870
871 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
872                           help="Skip node agreement check (dangerous)",
873                           action="store_true", default=False)
874
875 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
876                             help="Specify the mac prefix for the instance IP"
877                             " addresses, in the format XX:XX:XX",
878                             metavar="PREFIX",
879                             default=None)
880
881 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
882                                help="Specify the node interface (cluster-wide)"
883                                " on which the master IP address will be added "
884                                " [%s]" % constants.DEFAULT_BRIDGE,
885                                metavar="NETDEV",
886                                default=constants.DEFAULT_BRIDGE)
887
888 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
889                                 help="Specify the default directory (cluster-"
890                                 "wide) for storing the file-based disks [%s]" %
891                                 constants.DEFAULT_FILE_STORAGE_DIR,
892                                 metavar="DIR",
893                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
894
895 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
896                                    help="Don't modify /etc/hosts",
897                                    action="store_false", default=True)
898
899 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
900                                     help="Don't initialize SSH keys",
901                                     action="store_false", default=True)
902
903 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
904                              help="Enable parseable error messages",
905                              action="store_true", default=False)
906
907 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
908                           help="Skip N+1 memory redundancy tests",
909                           action="store_true", default=False)
910
911 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
912                              help="Type of reboot: soft/hard/full",
913                              default=constants.INSTANCE_REBOOT_HARD,
914                              metavar="<REBOOT>",
915                              choices=list(constants.REBOOT_TYPES))
916
917 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
918                                     dest="ignore_secondaries",
919                                     default=False, action="store_true",
920                                     help="Ignore errors from secondaries")
921
922 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
923                             action="store_false", default=True,
924                             help="Don't shutdown the instance (unsafe)")
925
926 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
927                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
928                          help="Maximum time to wait")
929
930 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
931                          dest="shutdown_timeout", type="int",
932                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
933                          help="Maximum time to wait for instance shutdown")
934
935 INTERVAL_OPT = cli_option("--interval", dest="interval", type="int",
936                           default=None,
937                           help=("Number of seconds between repetions of the"
938                                 " command"))
939
940 EARLY_RELEASE_OPT = cli_option("--early-release",
941                                dest="early_release", default=False,
942                                action="store_true",
943                                help="Release the locks on the secondary"
944                                " node(s) early")
945
946 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
947                                   dest="new_cluster_cert",
948                                   default=False, action="store_true",
949                                   help="Generate a new cluster certificate")
950
951 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
952                            default=None,
953                            help="File containing new RAPI certificate")
954
955 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
956                                default=None, action="store_true",
957                                help=("Generate a new self-signed RAPI"
958                                      " certificate"))
959
960 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
961                                     dest="new_confd_hmac_key",
962                                     default=False, action="store_true",
963                                     help=("Create a new HMAC key for %s" %
964                                           constants.CONFD))
965
966 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
967                                        dest="cluster_domain_secret",
968                                        default=None,
969                                        help=("Load new new cluster domain"
970                                              " secret from file"))
971
972 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
973                                            dest="new_cluster_domain_secret",
974                                            default=False, action="store_true",
975                                            help=("Create a new cluster domain"
976                                                  " secret"))
977
978 USE_REPL_NET_OPT = cli_option("--use-replication-network",
979                               dest="use_replication_network",
980                               help="Whether to use the replication network"
981                               " for talking to the nodes",
982                               action="store_true", default=False)
983
984 MAINTAIN_NODE_HEALTH_OPT = \
985     cli_option("--maintain-node-health", dest="maintain_node_health",
986                metavar=_YORNO, default=None, type="bool",
987                help="Configure the cluster to automatically maintain node"
988                " health, by shutting down unknown instances, shutting down"
989                " unknown DRBD devices, etc.")
990
991 IDENTIFY_DEFAULTS_OPT = \
992     cli_option("--identify-defaults", dest="identify_defaults",
993                default=False, action="store_true",
994                help="Identify which saved instance parameters are equal to"
995                " the current cluster defaults and set them as such, instead"
996                " of marking them as overridden")
997
998 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
999                          action="store", dest="uid_pool",
1000                          help=("A list of user-ids or user-id"
1001                                " ranges separated by commas"))
1002
1003 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
1004                           action="store", dest="add_uids",
1005                           help=("A list of user-ids or user-id"
1006                                 " ranges separated by commas, to be"
1007                                 " added to the user-id pool"))
1008
1009 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
1010                              action="store", dest="remove_uids",
1011                              help=("A list of user-ids or user-id"
1012                                    " ranges separated by commas, to be"
1013                                    " removed from the user-id pool"))
1014
1015 RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None,
1016                              action="store", dest="reserved_lvs",
1017                              help=("A comma-separated list of reserved"
1018                                    " logical volumes names, that will be"
1019                                    " ignored by cluster verify"))
1020
1021 ROMAN_OPT = cli_option("--roman",
1022                        dest="roman_integers", default=False,
1023                        action="store_true",
1024                        help="Use roman numbers for positive integers")
1025
1026 DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1027                              action="store", default=None,
1028                              help="Specifies usermode helper for DRBD")
1029
1030 NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1031                                 action="store_false", default=True,
1032                                 help="Disable support for DRBD")
1033
1034 HID_OS_OPT = cli_option("--hidden", dest="hidden",
1035                         type="bool", default=None, metavar=_YORNO,
1036                         help="Sets the hidden flag on the OS")
1037
1038 BLK_OS_OPT = cli_option("--blacklisted", dest="blacklisted",
1039                         type="bool", default=None, metavar=_YORNO,
1040                         help="Sets the blacklisted flag on the OS")
1041
1042
1043 #: Options provided by all commands
1044 COMMON_OPTS = [DEBUG_OPT]
1045
1046
1047 def _ParseArgs(argv, commands, aliases):
1048   """Parser for the command line arguments.
1049
1050   This function parses the arguments and returns the function which
1051   must be executed together with its (modified) arguments.
1052
1053   @param argv: the command line
1054   @param commands: dictionary with special contents, see the design
1055       doc for cmdline handling
1056   @param aliases: dictionary with command aliases {'alias': 'target, ...}
1057
1058   """
1059   if len(argv) == 0:
1060     binary = "<command>"
1061   else:
1062     binary = argv[0].split("/")[-1]
1063
1064   if len(argv) > 1 and argv[1] == "--version":
1065     ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1066              constants.RELEASE_VERSION)
1067     # Quit right away. That way we don't have to care about this special
1068     # argument. optparse.py does it the same.
1069     sys.exit(0)
1070
1071   if len(argv) < 2 or not (argv[1] in commands or
1072                            argv[1] in aliases):
1073     # let's do a nice thing
1074     sortedcmds = commands.keys()
1075     sortedcmds.sort()
1076
1077     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1078     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1079     ToStdout("")
1080
1081     # compute the max line length for cmd + usage
1082     mlen = max([len(" %s" % cmd) for cmd in commands])
1083     mlen = min(60, mlen) # should not get here...
1084
1085     # and format a nice command list
1086     ToStdout("Commands:")
1087     for cmd in sortedcmds:
1088       cmdstr = " %s" % (cmd,)
1089       help_text = commands[cmd][4]
1090       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1091       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1092       for line in help_lines:
1093         ToStdout("%-*s   %s", mlen, "", line)
1094
1095     ToStdout("")
1096
1097     return None, None, None
1098
1099   # get command, unalias it, and look it up in commands
1100   cmd = argv.pop(1)
1101   if cmd in aliases:
1102     if cmd in commands:
1103       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1104                                    " command" % cmd)
1105
1106     if aliases[cmd] not in commands:
1107       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1108                                    " command '%s'" % (cmd, aliases[cmd]))
1109
1110     cmd = aliases[cmd]
1111
1112   func, args_def, parser_opts, usage, description = commands[cmd]
1113   parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
1114                         description=description,
1115                         formatter=TitledHelpFormatter(),
1116                         usage="%%prog %s %s" % (cmd, usage))
1117   parser.disable_interspersed_args()
1118   options, args = parser.parse_args()
1119
1120   if not _CheckArguments(cmd, args_def, args):
1121     return None, None, None
1122
1123   return func, options, args
1124
1125
1126 def _CheckArguments(cmd, args_def, args):
1127   """Verifies the arguments using the argument definition.
1128
1129   Algorithm:
1130
1131     1. Abort with error if values specified by user but none expected.
1132
1133     1. For each argument in definition
1134
1135       1. Keep running count of minimum number of values (min_count)
1136       1. Keep running count of maximum number of values (max_count)
1137       1. If it has an unlimited number of values
1138
1139         1. Abort with error if it's not the last argument in the definition
1140
1141     1. If last argument has limited number of values
1142
1143       1. Abort with error if number of values doesn't match or is too large
1144
1145     1. Abort with error if user didn't pass enough values (min_count)
1146
1147   """
1148   if args and not args_def:
1149     ToStderr("Error: Command %s expects no arguments", cmd)
1150     return False
1151
1152   min_count = None
1153   max_count = None
1154   check_max = None
1155
1156   last_idx = len(args_def) - 1
1157
1158   for idx, arg in enumerate(args_def):
1159     if min_count is None:
1160       min_count = arg.min
1161     elif arg.min is not None:
1162       min_count += arg.min
1163
1164     if max_count is None:
1165       max_count = arg.max
1166     elif arg.max is not None:
1167       max_count += arg.max
1168
1169     if idx == last_idx:
1170       check_max = (arg.max is not None)
1171
1172     elif arg.max is None:
1173       raise errors.ProgrammerError("Only the last argument can have max=None")
1174
1175   if check_max:
1176     # Command with exact number of arguments
1177     if (min_count is not None and max_count is not None and
1178         min_count == max_count and len(args) != min_count):
1179       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1180       return False
1181
1182     # Command with limited number of arguments
1183     if max_count is not None and len(args) > max_count:
1184       ToStderr("Error: Command %s expects only %d argument(s)",
1185                cmd, max_count)
1186       return False
1187
1188   # Command with some required arguments
1189   if min_count is not None and len(args) < min_count:
1190     ToStderr("Error: Command %s expects at least %d argument(s)",
1191              cmd, min_count)
1192     return False
1193
1194   return True
1195
1196
1197 def SplitNodeOption(value):
1198   """Splits the value of a --node option.
1199
1200   """
1201   if value and ':' in value:
1202     return value.split(':', 1)
1203   else:
1204     return (value, None)
1205
1206
1207 def CalculateOSNames(os_name, os_variants):
1208   """Calculates all the names an OS can be called, according to its variants.
1209
1210   @type os_name: string
1211   @param os_name: base name of the os
1212   @type os_variants: list or None
1213   @param os_variants: list of supported variants
1214   @rtype: list
1215   @return: list of valid names
1216
1217   """
1218   if os_variants:
1219     return ['%s+%s' % (os_name, v) for v in os_variants]
1220   else:
1221     return [os_name]
1222
1223
1224 def ParseFields(selected, default):
1225   """Parses the values of "--field"-like options.
1226
1227   @type selected: string or None
1228   @param selected: User-selected options
1229   @type default: list
1230   @param default: Default fields
1231
1232   """
1233   if selected is None:
1234     return default
1235
1236   if selected.startswith("+"):
1237     return default + selected[1:].split(",")
1238
1239   return selected.split(",")
1240
1241
1242 UsesRPC = rpc.RunWithRPC
1243
1244
1245 def AskUser(text, choices=None):
1246   """Ask the user a question.
1247
1248   @param text: the question to ask
1249
1250   @param choices: list with elements tuples (input_char, return_value,
1251       description); if not given, it will default to: [('y', True,
1252       'Perform the operation'), ('n', False, 'Do no do the operation')];
1253       note that the '?' char is reserved for help
1254
1255   @return: one of the return values from the choices list; if input is
1256       not possible (i.e. not running with a tty, we return the last
1257       entry from the list
1258
1259   """
1260   if choices is None:
1261     choices = [('y', True, 'Perform the operation'),
1262                ('n', False, 'Do not perform the operation')]
1263   if not choices or not isinstance(choices, list):
1264     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1265   for entry in choices:
1266     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1267       raise errors.ProgrammerError("Invalid choices element to AskUser")
1268
1269   answer = choices[-1][1]
1270   new_text = []
1271   for line in text.splitlines():
1272     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1273   text = "\n".join(new_text)
1274   try:
1275     f = file("/dev/tty", "a+")
1276   except IOError:
1277     return answer
1278   try:
1279     chars = [entry[0] for entry in choices]
1280     chars[-1] = "[%s]" % chars[-1]
1281     chars.append('?')
1282     maps = dict([(entry[0], entry[1]) for entry in choices])
1283     while True:
1284       f.write(text)
1285       f.write('\n')
1286       f.write("/".join(chars))
1287       f.write(": ")
1288       line = f.readline(2).strip().lower()
1289       if line in maps:
1290         answer = maps[line]
1291         break
1292       elif line == '?':
1293         for entry in choices:
1294           f.write(" %s - %s\n" % (entry[0], entry[2]))
1295         f.write("\n")
1296         continue
1297   finally:
1298     f.close()
1299   return answer
1300
1301
1302 class JobSubmittedException(Exception):
1303   """Job was submitted, client should exit.
1304
1305   This exception has one argument, the ID of the job that was
1306   submitted. The handler should print this ID.
1307
1308   This is not an error, just a structured way to exit from clients.
1309
1310   """
1311
1312
1313 def SendJob(ops, cl=None):
1314   """Function to submit an opcode without waiting for the results.
1315
1316   @type ops: list
1317   @param ops: list of opcodes
1318   @type cl: luxi.Client
1319   @param cl: the luxi client to use for communicating with the master;
1320              if None, a new client will be created
1321
1322   """
1323   if cl is None:
1324     cl = GetClient()
1325
1326   job_id = cl.SubmitJob(ops)
1327
1328   return job_id
1329
1330
1331 def GenericPollJob(job_id, cbs, report_cbs):
1332   """Generic job-polling function.
1333
1334   @type job_id: number
1335   @param job_id: Job ID
1336   @type cbs: Instance of L{JobPollCbBase}
1337   @param cbs: Data callbacks
1338   @type report_cbs: Instance of L{JobPollReportCbBase}
1339   @param report_cbs: Reporting callbacks
1340
1341   """
1342   prev_job_info = None
1343   prev_logmsg_serial = None
1344
1345   status = None
1346
1347   while True:
1348     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1349                                       prev_logmsg_serial)
1350     if not result:
1351       # job not found, go away!
1352       raise errors.JobLost("Job with id %s lost" % job_id)
1353
1354     if result == constants.JOB_NOTCHANGED:
1355       report_cbs.ReportNotChanged(job_id, status)
1356
1357       # Wait again
1358       continue
1359
1360     # Split result, a tuple of (field values, log entries)
1361     (job_info, log_entries) = result
1362     (status, ) = job_info
1363
1364     if log_entries:
1365       for log_entry in log_entries:
1366         (serial, timestamp, log_type, message) = log_entry
1367         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1368                                     log_type, message)
1369         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1370
1371     # TODO: Handle canceled and archived jobs
1372     elif status in (constants.JOB_STATUS_SUCCESS,
1373                     constants.JOB_STATUS_ERROR,
1374                     constants.JOB_STATUS_CANCELING,
1375                     constants.JOB_STATUS_CANCELED):
1376       break
1377
1378     prev_job_info = job_info
1379
1380   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1381   if not jobs:
1382     raise errors.JobLost("Job with id %s lost" % job_id)
1383
1384   status, opstatus, result = jobs[0]
1385
1386   if status == constants.JOB_STATUS_SUCCESS:
1387     return result
1388
1389   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1390     raise errors.OpExecError("Job was canceled")
1391
1392   has_ok = False
1393   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1394     if status == constants.OP_STATUS_SUCCESS:
1395       has_ok = True
1396     elif status == constants.OP_STATUS_ERROR:
1397       errors.MaybeRaise(msg)
1398
1399       if has_ok:
1400         raise errors.OpExecError("partial failure (opcode %d): %s" %
1401                                  (idx, msg))
1402
1403       raise errors.OpExecError(str(msg))
1404
1405   # default failure mode
1406   raise errors.OpExecError(result)
1407
1408
1409 class JobPollCbBase:
1410   """Base class for L{GenericPollJob} callbacks.
1411
1412   """
1413   def __init__(self):
1414     """Initializes this class.
1415
1416     """
1417
1418   def WaitForJobChangeOnce(self, job_id, fields,
1419                            prev_job_info, prev_log_serial):
1420     """Waits for changes on a job.
1421
1422     """
1423     raise NotImplementedError()
1424
1425   def QueryJobs(self, job_ids, fields):
1426     """Returns the selected fields for the selected job IDs.
1427
1428     @type job_ids: list of numbers
1429     @param job_ids: Job IDs
1430     @type fields: list of strings
1431     @param fields: Fields
1432
1433     """
1434     raise NotImplementedError()
1435
1436
1437 class JobPollReportCbBase:
1438   """Base class for L{GenericPollJob} reporting callbacks.
1439
1440   """
1441   def __init__(self):
1442     """Initializes this class.
1443
1444     """
1445
1446   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1447     """Handles a log message.
1448
1449     """
1450     raise NotImplementedError()
1451
1452   def ReportNotChanged(self, job_id, status):
1453     """Called for if a job hasn't changed in a while.
1454
1455     @type job_id: number
1456     @param job_id: Job ID
1457     @type status: string or None
1458     @param status: Job status if available
1459
1460     """
1461     raise NotImplementedError()
1462
1463
1464 class _LuxiJobPollCb(JobPollCbBase):
1465   def __init__(self, cl):
1466     """Initializes this class.
1467
1468     """
1469     JobPollCbBase.__init__(self)
1470     self.cl = cl
1471
1472   def WaitForJobChangeOnce(self, job_id, fields,
1473                            prev_job_info, prev_log_serial):
1474     """Waits for changes on a job.
1475
1476     """
1477     return self.cl.WaitForJobChangeOnce(job_id, fields,
1478                                         prev_job_info, prev_log_serial)
1479
1480   def QueryJobs(self, job_ids, fields):
1481     """Returns the selected fields for the selected job IDs.
1482
1483     """
1484     return self.cl.QueryJobs(job_ids, fields)
1485
1486
1487 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1488   def __init__(self, feedback_fn):
1489     """Initializes this class.
1490
1491     """
1492     JobPollReportCbBase.__init__(self)
1493
1494     self.feedback_fn = feedback_fn
1495
1496     assert callable(feedback_fn)
1497
1498   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1499     """Handles a log message.
1500
1501     """
1502     self.feedback_fn((timestamp, log_type, log_msg))
1503
1504   def ReportNotChanged(self, job_id, status):
1505     """Called if a job hasn't changed in a while.
1506
1507     """
1508     # Ignore
1509
1510
1511 class StdioJobPollReportCb(JobPollReportCbBase):
1512   def __init__(self):
1513     """Initializes this class.
1514
1515     """
1516     JobPollReportCbBase.__init__(self)
1517
1518     self.notified_queued = False
1519     self.notified_waitlock = False
1520
1521   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1522     """Handles a log message.
1523
1524     """
1525     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1526              FormatLogMessage(log_type, log_msg))
1527
1528   def ReportNotChanged(self, job_id, status):
1529     """Called if a job hasn't changed in a while.
1530
1531     """
1532     if status is None:
1533       return
1534
1535     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1536       ToStderr("Job %s is waiting in queue", job_id)
1537       self.notified_queued = True
1538
1539     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1540       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1541       self.notified_waitlock = True
1542
1543
1544 def FormatLogMessage(log_type, log_msg):
1545   """Formats a job message according to its type.
1546
1547   """
1548   if log_type != constants.ELOG_MESSAGE:
1549     log_msg = str(log_msg)
1550
1551   return utils.SafeEncode(log_msg)
1552
1553
1554 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1555   """Function to poll for the result of a job.
1556
1557   @type job_id: job identified
1558   @param job_id: the job to poll for results
1559   @type cl: luxi.Client
1560   @param cl: the luxi client to use for communicating with the master;
1561              if None, a new client will be created
1562
1563   """
1564   if cl is None:
1565     cl = GetClient()
1566
1567   if reporter is None:
1568     if feedback_fn:
1569       reporter = FeedbackFnJobPollReportCb(feedback_fn)
1570     else:
1571       reporter = StdioJobPollReportCb()
1572   elif feedback_fn:
1573     raise errors.ProgrammerError("Can't specify reporter and feedback function")
1574
1575   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1576
1577
1578 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1579   """Legacy function to submit an opcode.
1580
1581   This is just a simple wrapper over the construction of the processor
1582   instance. It should be extended to better handle feedback and
1583   interaction functions.
1584
1585   """
1586   if cl is None:
1587     cl = GetClient()
1588
1589   SetGenericOpcodeOpts([op], opts)
1590
1591   job_id = SendJob([op], cl=cl)
1592
1593   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1594                        reporter=reporter)
1595
1596   return op_results[0]
1597
1598
1599 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1600   """Wrapper around SubmitOpCode or SendJob.
1601
1602   This function will decide, based on the 'opts' parameter, whether to
1603   submit and wait for the result of the opcode (and return it), or
1604   whether to just send the job and print its identifier. It is used in
1605   order to simplify the implementation of the '--submit' option.
1606
1607   It will also process the opcodes if we're sending the via SendJob
1608   (otherwise SubmitOpCode does it).
1609
1610   """
1611   if opts and opts.submit_only:
1612     job = [op]
1613     SetGenericOpcodeOpts(job, opts)
1614     job_id = SendJob(job, cl=cl)
1615     raise JobSubmittedException(job_id)
1616   else:
1617     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1618
1619
1620 def SetGenericOpcodeOpts(opcode_list, options):
1621   """Processor for generic options.
1622
1623   This function updates the given opcodes based on generic command
1624   line options (like debug, dry-run, etc.).
1625
1626   @param opcode_list: list of opcodes
1627   @param options: command line options or None
1628   @return: None (in-place modification)
1629
1630   """
1631   if not options:
1632     return
1633   for op in opcode_list:
1634     if hasattr(options, "dry_run"):
1635       op.dry_run = options.dry_run
1636     op.debug_level = options.debug
1637
1638
1639 def GetClient():
1640   # TODO: Cache object?
1641   try:
1642     client = luxi.Client()
1643   except luxi.NoMasterError:
1644     ss = ssconf.SimpleStore()
1645
1646     # Try to read ssconf file
1647     try:
1648       ss.GetMasterNode()
1649     except errors.ConfigurationError:
1650       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1651                                  " not part of a cluster")
1652
1653     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1654     if master != myself:
1655       raise errors.OpPrereqError("This is not the master node, please connect"
1656                                  " to node '%s' and rerun the command" %
1657                                  master)
1658     raise
1659   return client
1660
1661
1662 def FormatError(err):
1663   """Return a formatted error message for a given error.
1664
1665   This function takes an exception instance and returns a tuple
1666   consisting of two values: first, the recommended exit code, and
1667   second, a string describing the error message (not
1668   newline-terminated).
1669
1670   """
1671   retcode = 1
1672   obuf = StringIO()
1673   msg = str(err)
1674   if isinstance(err, errors.ConfigurationError):
1675     txt = "Corrupt configuration file: %s" % msg
1676     logging.error(txt)
1677     obuf.write(txt + "\n")
1678     obuf.write("Aborting.")
1679     retcode = 2
1680   elif isinstance(err, errors.HooksAbort):
1681     obuf.write("Failure: hooks execution failed:\n")
1682     for node, script, out in err.args[0]:
1683       if out:
1684         obuf.write("  node: %s, script: %s, output: %s\n" %
1685                    (node, script, out))
1686       else:
1687         obuf.write("  node: %s, script: %s (no output)\n" %
1688                    (node, script))
1689   elif isinstance(err, errors.HooksFailure):
1690     obuf.write("Failure: hooks general failure: %s" % msg)
1691   elif isinstance(err, errors.ResolverError):
1692     this_host = netutils.HostInfo.SysName()
1693     if err.args[0] == this_host:
1694       msg = "Failure: can't resolve my own hostname ('%s')"
1695     else:
1696       msg = "Failure: can't resolve hostname '%s'"
1697     obuf.write(msg % err.args[0])
1698   elif isinstance(err, errors.OpPrereqError):
1699     if len(err.args) == 2:
1700       obuf.write("Failure: prerequisites not met for this"
1701                " operation:\nerror type: %s, error details:\n%s" %
1702                  (err.args[1], err.args[0]))
1703     else:
1704       obuf.write("Failure: prerequisites not met for this"
1705                  " operation:\n%s" % msg)
1706   elif isinstance(err, errors.OpExecError):
1707     obuf.write("Failure: command execution error:\n%s" % msg)
1708   elif isinstance(err, errors.TagError):
1709     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1710   elif isinstance(err, errors.JobQueueDrainError):
1711     obuf.write("Failure: the job queue is marked for drain and doesn't"
1712                " accept new requests\n")
1713   elif isinstance(err, errors.JobQueueFull):
1714     obuf.write("Failure: the job queue is full and doesn't accept new"
1715                " job submissions until old jobs are archived\n")
1716   elif isinstance(err, errors.TypeEnforcementError):
1717     obuf.write("Parameter Error: %s" % msg)
1718   elif isinstance(err, errors.ParameterError):
1719     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1720   elif isinstance(err, luxi.NoMasterError):
1721     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1722                " and listening for connections?")
1723   elif isinstance(err, luxi.TimeoutError):
1724     obuf.write("Timeout while talking to the master daemon. Error:\n"
1725                "%s" % msg)
1726   elif isinstance(err, luxi.PermissionError):
1727     obuf.write("It seems you don't have permissions to connect to the"
1728                " master daemon.\nPlease retry as a different user.")
1729   elif isinstance(err, luxi.ProtocolError):
1730     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1731                "%s" % msg)
1732   elif isinstance(err, errors.JobLost):
1733     obuf.write("Error checking job status: %s" % msg)
1734   elif isinstance(err, errors.GenericError):
1735     obuf.write("Unhandled Ganeti error: %s" % msg)
1736   elif isinstance(err, JobSubmittedException):
1737     obuf.write("JobID: %s\n" % err.args[0])
1738     retcode = 0
1739   else:
1740     obuf.write("Unhandled exception: %s" % msg)
1741   return retcode, obuf.getvalue().rstrip('\n')
1742
1743
1744 def GenericMain(commands, override=None, aliases=None):
1745   """Generic main function for all the gnt-* commands.
1746
1747   Arguments:
1748     - commands: a dictionary with a special structure, see the design doc
1749                 for command line handling.
1750     - override: if not None, we expect a dictionary with keys that will
1751                 override command line options; this can be used to pass
1752                 options from the scripts to generic functions
1753     - aliases: dictionary with command aliases {'alias': 'target, ...}
1754
1755   """
1756   # save the program name and the entire command line for later logging
1757   if sys.argv:
1758     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1759     if len(sys.argv) >= 2:
1760       binary += " " + sys.argv[1]
1761       old_cmdline = " ".join(sys.argv[2:])
1762     else:
1763       old_cmdline = ""
1764   else:
1765     binary = "<unknown program>"
1766     old_cmdline = ""
1767
1768   if aliases is None:
1769     aliases = {}
1770
1771   try:
1772     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1773   except errors.ParameterError, err:
1774     result, err_msg = FormatError(err)
1775     ToStderr(err_msg)
1776     return 1
1777
1778   if func is None: # parse error
1779     return 1
1780
1781   if override is not None:
1782     for key, val in override.iteritems():
1783       setattr(options, key, val)
1784
1785   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1786                      stderr_logging=True, program=binary)
1787
1788   if old_cmdline:
1789     logging.info("run with arguments '%s'", old_cmdline)
1790   else:
1791     logging.info("run with no arguments")
1792
1793   try:
1794     result = func(options, args)
1795   except (errors.GenericError, luxi.ProtocolError,
1796           JobSubmittedException), err:
1797     result, err_msg = FormatError(err)
1798     logging.exception("Error during command processing")
1799     ToStderr(err_msg)
1800
1801   return result
1802
1803
1804 def GenericInstanceCreate(mode, opts, args):
1805   """Add an instance to the cluster via either creation or import.
1806
1807   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1808   @param opts: the command line options selected by the user
1809   @type args: list
1810   @param args: should contain only one element, the new instance name
1811   @rtype: int
1812   @return: the desired exit code
1813
1814   """
1815   instance = args[0]
1816
1817   (pnode, snode) = SplitNodeOption(opts.node)
1818
1819   hypervisor = None
1820   hvparams = {}
1821   if opts.hypervisor:
1822     hypervisor, hvparams = opts.hypervisor
1823
1824   if opts.nics:
1825     try:
1826       nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1827     except ValueError, err:
1828       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1829     nics = [{}] * nic_max
1830     for nidx, ndict in opts.nics:
1831       nidx = int(nidx)
1832       if not isinstance(ndict, dict):
1833         msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1834         raise errors.OpPrereqError(msg)
1835       nics[nidx] = ndict
1836   elif opts.no_nics:
1837     # no nics
1838     nics = []
1839   elif mode == constants.INSTANCE_CREATE:
1840     # default of one nic, all auto
1841     nics = [{}]
1842   else:
1843     # mode == import
1844     nics = []
1845
1846   if opts.disk_template == constants.DT_DISKLESS:
1847     if opts.disks or opts.sd_size is not None:
1848       raise errors.OpPrereqError("Diskless instance but disk"
1849                                  " information passed")
1850     disks = []
1851   else:
1852     if (not opts.disks and not opts.sd_size
1853         and mode == constants.INSTANCE_CREATE):
1854       raise errors.OpPrereqError("No disk information specified")
1855     if opts.disks and opts.sd_size is not None:
1856       raise errors.OpPrereqError("Please use either the '--disk' or"
1857                                  " '-s' option")
1858     if opts.sd_size is not None:
1859       opts.disks = [(0, {"size": opts.sd_size})]
1860
1861     if opts.disks:
1862       try:
1863         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1864       except ValueError, err:
1865         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1866       disks = [{}] * disk_max
1867     else:
1868       disks = []
1869     for didx, ddict in opts.disks:
1870       didx = int(didx)
1871       if not isinstance(ddict, dict):
1872         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1873         raise errors.OpPrereqError(msg)
1874       elif "size" in ddict:
1875         if "adopt" in ddict:
1876           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1877                                      " (disk %d)" % didx)
1878         try:
1879           ddict["size"] = utils.ParseUnit(ddict["size"])
1880         except ValueError, err:
1881           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1882                                      (didx, err))
1883       elif "adopt" in ddict:
1884         if mode == constants.INSTANCE_IMPORT:
1885           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1886                                      " import")
1887         ddict["size"] = 0
1888       else:
1889         raise errors.OpPrereqError("Missing size or adoption source for"
1890                                    " disk %d" % didx)
1891       disks[didx] = ddict
1892
1893   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1894   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1895
1896   if mode == constants.INSTANCE_CREATE:
1897     start = opts.start
1898     os_type = opts.os
1899     force_variant = opts.force_variant
1900     src_node = None
1901     src_path = None
1902     no_install = opts.no_install
1903     identify_defaults = False
1904   elif mode == constants.INSTANCE_IMPORT:
1905     start = False
1906     os_type = None
1907     force_variant = False
1908     src_node = opts.src_node
1909     src_path = opts.src_dir
1910     no_install = None
1911     identify_defaults = opts.identify_defaults
1912   else:
1913     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1914
1915   op = opcodes.OpCreateInstance(instance_name=instance,
1916                                 disks=disks,
1917                                 disk_template=opts.disk_template,
1918                                 nics=nics,
1919                                 pnode=pnode, snode=snode,
1920                                 ip_check=opts.ip_check,
1921                                 name_check=opts.name_check,
1922                                 wait_for_sync=opts.wait_for_sync,
1923                                 file_storage_dir=opts.file_storage_dir,
1924                                 file_driver=opts.file_driver,
1925                                 iallocator=opts.iallocator,
1926                                 hypervisor=hypervisor,
1927                                 hvparams=hvparams,
1928                                 beparams=opts.beparams,
1929                                 osparams=opts.osparams,
1930                                 mode=mode,
1931                                 start=start,
1932                                 os_type=os_type,
1933                                 force_variant=force_variant,
1934                                 src_node=src_node,
1935                                 src_path=src_path,
1936                                 no_install=no_install,
1937                                 identify_defaults=identify_defaults)
1938
1939   SubmitOrSend(op, opts)
1940   return 0
1941
1942
1943 class _RunWhileClusterStoppedHelper:
1944   """Helper class for L{RunWhileClusterStopped} to simplify state management
1945
1946   """
1947   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1948     """Initializes this class.
1949
1950     @type feedback_fn: callable
1951     @param feedback_fn: Feedback function
1952     @type cluster_name: string
1953     @param cluster_name: Cluster name
1954     @type master_node: string
1955     @param master_node Master node name
1956     @type online_nodes: list
1957     @param online_nodes: List of names of online nodes
1958
1959     """
1960     self.feedback_fn = feedback_fn
1961     self.cluster_name = cluster_name
1962     self.master_node = master_node
1963     self.online_nodes = online_nodes
1964
1965     self.ssh = ssh.SshRunner(self.cluster_name)
1966
1967     self.nonmaster_nodes = [name for name in online_nodes
1968                             if name != master_node]
1969
1970     assert self.master_node not in self.nonmaster_nodes
1971
1972   def _RunCmd(self, node_name, cmd):
1973     """Runs a command on the local or a remote machine.
1974
1975     @type node_name: string
1976     @param node_name: Machine name
1977     @type cmd: list
1978     @param cmd: Command
1979
1980     """
1981     if node_name is None or node_name == self.master_node:
1982       # No need to use SSH
1983       result = utils.RunCmd(cmd)
1984     else:
1985       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1986
1987     if result.failed:
1988       errmsg = ["Failed to run command %s" % result.cmd]
1989       if node_name:
1990         errmsg.append("on node %s" % node_name)
1991       errmsg.append(": exitcode %s and error %s" %
1992                     (result.exit_code, result.output))
1993       raise errors.OpExecError(" ".join(errmsg))
1994
1995   def Call(self, fn, *args):
1996     """Call function while all daemons are stopped.
1997
1998     @type fn: callable
1999     @param fn: Function to be called
2000
2001     """
2002     # Pause watcher by acquiring an exclusive lock on watcher state file
2003     self.feedback_fn("Blocking watcher")
2004     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
2005     try:
2006       # TODO: Currently, this just blocks. There's no timeout.
2007       # TODO: Should it be a shared lock?
2008       watcher_block.Exclusive(blocking=True)
2009
2010       # Stop master daemons, so that no new jobs can come in and all running
2011       # ones are finished
2012       self.feedback_fn("Stopping master daemons")
2013       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
2014       try:
2015         # Stop daemons on all nodes
2016         for node_name in self.online_nodes:
2017           self.feedback_fn("Stopping daemons on %s" % node_name)
2018           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
2019
2020         # All daemons are shut down now
2021         try:
2022           return fn(self, *args)
2023         except Exception, err:
2024           _, errmsg = FormatError(err)
2025           logging.exception("Caught exception")
2026           self.feedback_fn(errmsg)
2027           raise
2028       finally:
2029         # Start cluster again, master node last
2030         for node_name in self.nonmaster_nodes + [self.master_node]:
2031           self.feedback_fn("Starting daemons on %s" % node_name)
2032           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
2033     finally:
2034       # Resume watcher
2035       watcher_block.Close()
2036
2037
2038 def RunWhileClusterStopped(feedback_fn, fn, *args):
2039   """Calls a function while all cluster daemons are stopped.
2040
2041   @type feedback_fn: callable
2042   @param feedback_fn: Feedback function
2043   @type fn: callable
2044   @param fn: Function to be called when daemons are stopped
2045
2046   """
2047   feedback_fn("Gathering cluster information")
2048
2049   # This ensures we're running on the master daemon
2050   cl = GetClient()
2051
2052   (cluster_name, master_node) = \
2053     cl.QueryConfigValues(["cluster_name", "master_node"])
2054
2055   online_nodes = GetOnlineNodes([], cl=cl)
2056
2057   # Don't keep a reference to the client. The master daemon will go away.
2058   del cl
2059
2060   assert master_node in online_nodes
2061
2062   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2063                                        online_nodes).Call(fn, *args)
2064
2065
2066 def GenerateTable(headers, fields, separator, data,
2067                   numfields=None, unitfields=None,
2068                   units=None):
2069   """Prints a table with headers and different fields.
2070
2071   @type headers: dict
2072   @param headers: dictionary mapping field names to headers for
2073       the table
2074   @type fields: list
2075   @param fields: the field names corresponding to each row in
2076       the data field
2077   @param separator: the separator to be used; if this is None,
2078       the default 'smart' algorithm is used which computes optimal
2079       field width, otherwise just the separator is used between
2080       each field
2081   @type data: list
2082   @param data: a list of lists, each sublist being one row to be output
2083   @type numfields: list
2084   @param numfields: a list with the fields that hold numeric
2085       values and thus should be right-aligned
2086   @type unitfields: list
2087   @param unitfields: a list with the fields that hold numeric
2088       values that should be formatted with the units field
2089   @type units: string or None
2090   @param units: the units we should use for formatting, or None for
2091       automatic choice (human-readable for non-separator usage, otherwise
2092       megabytes); this is a one-letter string
2093
2094   """
2095   if units is None:
2096     if separator:
2097       units = "m"
2098     else:
2099       units = "h"
2100
2101   if numfields is None:
2102     numfields = []
2103   if unitfields is None:
2104     unitfields = []
2105
2106   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2107   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2108
2109   format_fields = []
2110   for field in fields:
2111     if headers and field not in headers:
2112       # TODO: handle better unknown fields (either revert to old
2113       # style of raising exception, or deal more intelligently with
2114       # variable fields)
2115       headers[field] = field
2116     if separator is not None:
2117       format_fields.append("%s")
2118     elif numfields.Matches(field):
2119       format_fields.append("%*s")
2120     else:
2121       format_fields.append("%-*s")
2122
2123   if separator is None:
2124     mlens = [0 for name in fields]
2125     format_str = ' '.join(format_fields)
2126   else:
2127     format_str = separator.replace("%", "%%").join(format_fields)
2128
2129   for row in data:
2130     if row is None:
2131       continue
2132     for idx, val in enumerate(row):
2133       if unitfields.Matches(fields[idx]):
2134         try:
2135           val = int(val)
2136         except (TypeError, ValueError):
2137           pass
2138         else:
2139           val = row[idx] = utils.FormatUnit(val, units)
2140       val = row[idx] = str(val)
2141       if separator is None:
2142         mlens[idx] = max(mlens[idx], len(val))
2143
2144   result = []
2145   if headers:
2146     args = []
2147     for idx, name in enumerate(fields):
2148       hdr = headers[name]
2149       if separator is None:
2150         mlens[idx] = max(mlens[idx], len(hdr))
2151         args.append(mlens[idx])
2152       args.append(hdr)
2153     result.append(format_str % tuple(args))
2154
2155   if separator is None:
2156     assert len(mlens) == len(fields)
2157
2158     if fields and not numfields.Matches(fields[-1]):
2159       mlens[-1] = 0
2160
2161   for line in data:
2162     args = []
2163     if line is None:
2164       line = ['-' for _ in fields]
2165     for idx in range(len(fields)):
2166       if separator is None:
2167         args.append(mlens[idx])
2168       args.append(line[idx])
2169     result.append(format_str % tuple(args))
2170
2171   return result
2172
2173
2174 def FormatTimestamp(ts):
2175   """Formats a given timestamp.
2176
2177   @type ts: timestamp
2178   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2179
2180   @rtype: string
2181   @return: a string with the formatted timestamp
2182
2183   """
2184   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2185     return '?'
2186   sec, usec = ts
2187   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2188
2189
2190 def ParseTimespec(value):
2191   """Parse a time specification.
2192
2193   The following suffixed will be recognized:
2194
2195     - s: seconds
2196     - m: minutes
2197     - h: hours
2198     - d: day
2199     - w: weeks
2200
2201   Without any suffix, the value will be taken to be in seconds.
2202
2203   """
2204   value = str(value)
2205   if not value:
2206     raise errors.OpPrereqError("Empty time specification passed")
2207   suffix_map = {
2208     's': 1,
2209     'm': 60,
2210     'h': 3600,
2211     'd': 86400,
2212     'w': 604800,
2213     }
2214   if value[-1] not in suffix_map:
2215     try:
2216       value = int(value)
2217     except (TypeError, ValueError):
2218       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2219   else:
2220     multiplier = suffix_map[value[-1]]
2221     value = value[:-1]
2222     if not value: # no data left after stripping the suffix
2223       raise errors.OpPrereqError("Invalid time specification (only"
2224                                  " suffix passed)")
2225     try:
2226       value = int(value) * multiplier
2227     except (TypeError, ValueError):
2228       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2229   return value
2230
2231
2232 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2233                    filter_master=False):
2234   """Returns the names of online nodes.
2235
2236   This function will also log a warning on stderr with the names of
2237   the online nodes.
2238
2239   @param nodes: if not empty, use only this subset of nodes (minus the
2240       offline ones)
2241   @param cl: if not None, luxi client to use
2242   @type nowarn: boolean
2243   @param nowarn: by default, this function will output a note with the
2244       offline nodes that are skipped; if this parameter is True the
2245       note is not displayed
2246   @type secondary_ips: boolean
2247   @param secondary_ips: if True, return the secondary IPs instead of the
2248       names, useful for doing network traffic over the replication interface
2249       (if any)
2250   @type filter_master: boolean
2251   @param filter_master: if True, do not return the master node in the list
2252       (useful in coordination with secondary_ips where we cannot check our
2253       node name against the list)
2254
2255   """
2256   if cl is None:
2257     cl = GetClient()
2258
2259   if secondary_ips:
2260     name_idx = 2
2261   else:
2262     name_idx = 0
2263
2264   if filter_master:
2265     master_node = cl.QueryConfigValues(["master_node"])[0]
2266     filter_fn = lambda x: x != master_node
2267   else:
2268     filter_fn = lambda _: True
2269
2270   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2271                          use_locking=False)
2272   offline = [row[0] for row in result if row[1]]
2273   if offline and not nowarn:
2274     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2275   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2276
2277
2278 def _ToStream(stream, txt, *args):
2279   """Write a message to a stream, bypassing the logging system
2280
2281   @type stream: file object
2282   @param stream: the file to which we should write
2283   @type txt: str
2284   @param txt: the message
2285
2286   """
2287   if args:
2288     args = tuple(args)
2289     stream.write(txt % args)
2290   else:
2291     stream.write(txt)
2292   stream.write('\n')
2293   stream.flush()
2294
2295
2296 def ToStdout(txt, *args):
2297   """Write a message to stdout only, bypassing the logging system
2298
2299   This is just a wrapper over _ToStream.
2300
2301   @type txt: str
2302   @param txt: the message
2303
2304   """
2305   _ToStream(sys.stdout, txt, *args)
2306
2307
2308 def ToStderr(txt, *args):
2309   """Write a message to stderr only, bypassing the logging system
2310
2311   This is just a wrapper over _ToStream.
2312
2313   @type txt: str
2314   @param txt: the message
2315
2316   """
2317   _ToStream(sys.stderr, txt, *args)
2318
2319
2320 class JobExecutor(object):
2321   """Class which manages the submission and execution of multiple jobs.
2322
2323   Note that instances of this class should not be reused between
2324   GetResults() calls.
2325
2326   """
2327   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2328     self.queue = []
2329     if cl is None:
2330       cl = GetClient()
2331     self.cl = cl
2332     self.verbose = verbose
2333     self.jobs = []
2334     self.opts = opts
2335     self.feedback_fn = feedback_fn
2336
2337   def QueueJob(self, name, *ops):
2338     """Record a job for later submit.
2339
2340     @type name: string
2341     @param name: a description of the job, will be used in WaitJobSet
2342     """
2343     SetGenericOpcodeOpts(ops, self.opts)
2344     self.queue.append((name, ops))
2345
2346   def SubmitPending(self, each=False):
2347     """Submit all pending jobs.
2348
2349     """
2350     if each:
2351       results = []
2352       for row in self.queue:
2353         # SubmitJob will remove the success status, but raise an exception if
2354         # the submission fails, so we'll notice that anyway.
2355         results.append([True, self.cl.SubmitJob(row[1])])
2356     else:
2357       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2358     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2359                                                             self.queue)):
2360       self.jobs.append((idx, status, data, name))
2361
2362   def _ChooseJob(self):
2363     """Choose a non-waiting/queued job to poll next.
2364
2365     """
2366     assert self.jobs, "_ChooseJob called with empty job list"
2367
2368     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2369     assert result
2370
2371     for job_data, status in zip(self.jobs, result):
2372       if (isinstance(status, list) and status and
2373           status[0] in (constants.JOB_STATUS_QUEUED,
2374                         constants.JOB_STATUS_WAITLOCK,
2375                         constants.JOB_STATUS_CANCELING)):
2376         # job is still present and waiting
2377         continue
2378       # good candidate found (either running job or lost job)
2379       self.jobs.remove(job_data)
2380       return job_data
2381
2382     # no job found
2383     return self.jobs.pop(0)
2384
2385   def GetResults(self):
2386     """Wait for and return the results of all jobs.
2387
2388     @rtype: list
2389     @return: list of tuples (success, job results), in the same order
2390         as the submitted jobs; if a job has failed, instead of the result
2391         there will be the error message
2392
2393     """
2394     if not self.jobs:
2395       self.SubmitPending()
2396     results = []
2397     if self.verbose:
2398       ok_jobs = [row[2] for row in self.jobs if row[1]]
2399       if ok_jobs:
2400         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2401
2402     # first, remove any non-submitted jobs
2403     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2404     for idx, _, jid, name in failures:
2405       ToStderr("Failed to submit job for %s: %s", name, jid)
2406       results.append((idx, False, jid))
2407
2408     while self.jobs:
2409       (idx, _, jid, name) = self._ChooseJob()
2410       ToStdout("Waiting for job %s for %s...", jid, name)
2411       try:
2412         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2413         success = True
2414       except errors.JobLost, err:
2415         _, job_result = FormatError(err)
2416         ToStderr("Job %s for %s has been archived, cannot check its result",
2417                  jid, name)
2418         success = False
2419       except (errors.GenericError, luxi.ProtocolError), err:
2420         _, job_result = FormatError(err)
2421         success = False
2422         # the error message will always be shown, verbose or not
2423         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2424
2425       results.append((idx, success, job_result))
2426
2427     # sort based on the index, then drop it
2428     results.sort()
2429     results = [i[1:] for i in results]
2430
2431     return results
2432
2433   def WaitOrShow(self, wait):
2434     """Wait for job results or only print the job IDs.
2435
2436     @type wait: boolean
2437     @param wait: whether to wait or not
2438
2439     """
2440     if wait:
2441       return self.GetResults()
2442     else:
2443       if not self.jobs:
2444         self.SubmitPending()
2445       for _, status, result, name in self.jobs:
2446         if status:
2447           ToStdout("%s: %s", result, name)
2448         else:
2449           ToStderr("Failure for %s: %s", name, result)
2450       return [row[1:3] for row in self.jobs]