Remove mcpu's ReportLocks callback
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41 from ganeti import netutils
42
43 from optparse import (OptionParser, TitledHelpFormatter,
44                       Option, OptionValueError)
45
46
47 __all__ = [
48   # Command line options
49   "ADD_UIDS_OPT",
50   "ALLOCATABLE_OPT",
51   "ALL_OPT",
52   "AUTO_PROMOTE_OPT",
53   "AUTO_REPLACE_OPT",
54   "BACKEND_OPT",
55   "CLEANUP_OPT",
56   "CLUSTER_DOMAIN_SECRET_OPT",
57   "CONFIRM_OPT",
58   "CP_SIZE_OPT",
59   "DEBUG_OPT",
60   "DEBUG_SIMERR_OPT",
61   "DISKIDX_OPT",
62   "DISK_OPT",
63   "DISK_TEMPLATE_OPT",
64   "DRAINED_OPT",
65   "DRY_RUN_OPT",
66   "DRBD_HELPER_OPT",
67   "EARLY_RELEASE_OPT",
68   "ENABLED_HV_OPT",
69   "ERROR_CODES_OPT",
70   "FIELDS_OPT",
71   "FILESTORE_DIR_OPT",
72   "FILESTORE_DRIVER_OPT",
73   "FORCE_OPT",
74   "FORCE_VARIANT_OPT",
75   "GLOBAL_FILEDIR_OPT",
76   "HVLIST_OPT",
77   "HVOPTS_OPT",
78   "HYPERVISOR_OPT",
79   "IALLOCATOR_OPT",
80   "DEFAULT_IALLOCATOR_OPT",
81   "IDENTIFY_DEFAULTS_OPT",
82   "IGNORE_CONSIST_OPT",
83   "IGNORE_FAILURES_OPT",
84   "IGNORE_REMOVE_FAILURES_OPT",
85   "IGNORE_SECONDARIES_OPT",
86   "IGNORE_SIZE_OPT",
87   "INTERVAL_OPT",
88   "MAC_PREFIX_OPT",
89   "MAINTAIN_NODE_HEALTH_OPT",
90   "MASTER_NETDEV_OPT",
91   "MC_OPT",
92   "MIGRATION_MODE_OPT",
93   "NET_OPT",
94   "NEW_CLUSTER_CERT_OPT",
95   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
96   "NEW_CONFD_HMAC_KEY_OPT",
97   "NEW_RAPI_CERT_OPT",
98   "NEW_SECONDARY_OPT",
99   "NIC_PARAMS_OPT",
100   "NODE_LIST_OPT",
101   "NODE_PLACEMENT_OPT",
102   "NODRBD_STORAGE_OPT",
103   "NOHDR_OPT",
104   "NOIPCHECK_OPT",
105   "NO_INSTALL_OPT",
106   "NONAMECHECK_OPT",
107   "NOLVM_STORAGE_OPT",
108   "NOMODIFY_ETCHOSTS_OPT",
109   "NOMODIFY_SSH_SETUP_OPT",
110   "NONICS_OPT",
111   "NONLIVE_OPT",
112   "NONPLUS1_OPT",
113   "NOSHUTDOWN_OPT",
114   "NOSTART_OPT",
115   "NOSSH_KEYCHECK_OPT",
116   "NOVOTING_OPT",
117   "NWSYNC_OPT",
118   "ON_PRIMARY_OPT",
119   "ON_SECONDARY_OPT",
120   "OFFLINE_OPT",
121   "OSPARAMS_OPT",
122   "OS_OPT",
123   "OS_SIZE_OPT",
124   "RAPI_CERT_OPT",
125   "READD_OPT",
126   "REBOOT_TYPE_OPT",
127   "REMOVE_INSTANCE_OPT",
128   "REMOVE_UIDS_OPT",
129   "RESERVED_LVS_OPT",
130   "ROMAN_OPT",
131   "SECONDARY_IP_OPT",
132   "SELECT_OS_OPT",
133   "SEP_OPT",
134   "SHOWCMD_OPT",
135   "SHUTDOWN_TIMEOUT_OPT",
136   "SINGLE_NODE_OPT",
137   "SRC_DIR_OPT",
138   "SRC_NODE_OPT",
139   "SUBMIT_OPT",
140   "STATIC_OPT",
141   "SYNC_OPT",
142   "TAG_SRC_OPT",
143   "TIMEOUT_OPT",
144   "UIDPOOL_OPT",
145   "USEUNITS_OPT",
146   "USE_REPL_NET_OPT",
147   "VERBOSE_OPT",
148   "VG_NAME_OPT",
149   "YES_DOIT_OPT",
150   # Generic functions for CLI programs
151   "GenericMain",
152   "GenericInstanceCreate",
153   "GetClient",
154   "GetOnlineNodes",
155   "JobExecutor",
156   "JobSubmittedException",
157   "ParseTimespec",
158   "RunWhileClusterStopped",
159   "SubmitOpCode",
160   "SubmitOrSend",
161   "UsesRPC",
162   # Formatting functions
163   "ToStderr", "ToStdout",
164   "FormatError",
165   "GenerateTable",
166   "AskUser",
167   "FormatTimestamp",
168   "FormatLogMessage",
169   # Tags functions
170   "ListTags",
171   "AddTags",
172   "RemoveTags",
173   # command line options support infrastructure
174   "ARGS_MANY_INSTANCES",
175   "ARGS_MANY_NODES",
176   "ARGS_NONE",
177   "ARGS_ONE_INSTANCE",
178   "ARGS_ONE_NODE",
179   "ARGS_ONE_OS",
180   "ArgChoice",
181   "ArgCommand",
182   "ArgFile",
183   "ArgHost",
184   "ArgInstance",
185   "ArgJobId",
186   "ArgNode",
187   "ArgOs",
188   "ArgSuggest",
189   "ArgUnknown",
190   "OPT_COMPL_INST_ADD_NODES",
191   "OPT_COMPL_MANY_NODES",
192   "OPT_COMPL_ONE_IALLOCATOR",
193   "OPT_COMPL_ONE_INSTANCE",
194   "OPT_COMPL_ONE_NODE",
195   "OPT_COMPL_ONE_OS",
196   "cli_option",
197   "SplitNodeOption",
198   "CalculateOSNames",
199   "ParseFields",
200   ]
201
202 NO_PREFIX = "no_"
203 UN_PREFIX = "-"
204
205
206 class _Argument:
207   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
208     self.min = min
209     self.max = max
210
211   def __repr__(self):
212     return ("<%s min=%s max=%s>" %
213             (self.__class__.__name__, self.min, self.max))
214
215
216 class ArgSuggest(_Argument):
217   """Suggesting argument.
218
219   Value can be any of the ones passed to the constructor.
220
221   """
222   # pylint: disable-msg=W0622
223   def __init__(self, min=0, max=None, choices=None):
224     _Argument.__init__(self, min=min, max=max)
225     self.choices = choices
226
227   def __repr__(self):
228     return ("<%s min=%s max=%s choices=%r>" %
229             (self.__class__.__name__, self.min, self.max, self.choices))
230
231
232 class ArgChoice(ArgSuggest):
233   """Choice argument.
234
235   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
236   but value must be one of the choices.
237
238   """
239
240
241 class ArgUnknown(_Argument):
242   """Unknown argument to program (e.g. determined at runtime).
243
244   """
245
246
247 class ArgInstance(_Argument):
248   """Instances argument.
249
250   """
251
252
253 class ArgNode(_Argument):
254   """Node argument.
255
256   """
257
258 class ArgJobId(_Argument):
259   """Job ID argument.
260
261   """
262
263
264 class ArgFile(_Argument):
265   """File path argument.
266
267   """
268
269
270 class ArgCommand(_Argument):
271   """Command argument.
272
273   """
274
275
276 class ArgHost(_Argument):
277   """Host argument.
278
279   """
280
281
282 class ArgOs(_Argument):
283   """OS argument.
284
285   """
286
287
288 ARGS_NONE = []
289 ARGS_MANY_INSTANCES = [ArgInstance()]
290 ARGS_MANY_NODES = [ArgNode()]
291 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
292 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
293 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
294
295
296 def _ExtractTagsObject(opts, args):
297   """Extract the tag type object.
298
299   Note that this function will modify its args parameter.
300
301   """
302   if not hasattr(opts, "tag_type"):
303     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
304   kind = opts.tag_type
305   if kind == constants.TAG_CLUSTER:
306     retval = kind, kind
307   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
308     if not args:
309       raise errors.OpPrereqError("no arguments passed to the command")
310     name = args.pop(0)
311     retval = kind, name
312   else:
313     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
314   return retval
315
316
317 def _ExtendTags(opts, args):
318   """Extend the args if a source file has been given.
319
320   This function will extend the tags with the contents of the file
321   passed in the 'tags_source' attribute of the opts parameter. A file
322   named '-' will be replaced by stdin.
323
324   """
325   fname = opts.tags_source
326   if fname is None:
327     return
328   if fname == "-":
329     new_fh = sys.stdin
330   else:
331     new_fh = open(fname, "r")
332   new_data = []
333   try:
334     # we don't use the nice 'new_data = [line.strip() for line in fh]'
335     # because of python bug 1633941
336     while True:
337       line = new_fh.readline()
338       if not line:
339         break
340       new_data.append(line.strip())
341   finally:
342     new_fh.close()
343   args.extend(new_data)
344
345
346 def ListTags(opts, args):
347   """List the tags on a given object.
348
349   This is a generic implementation that knows how to deal with all
350   three cases of tag objects (cluster, node, instance). The opts
351   argument is expected to contain a tag_type field denoting what
352   object type we work on.
353
354   """
355   kind, name = _ExtractTagsObject(opts, args)
356   cl = GetClient()
357   result = cl.QueryTags(kind, name)
358   result = list(result)
359   result.sort()
360   for tag in result:
361     ToStdout(tag)
362
363
364 def AddTags(opts, args):
365   """Add tags on a given object.
366
367   This is a generic implementation that knows how to deal with all
368   three cases of tag objects (cluster, node, instance). The opts
369   argument is expected to contain a tag_type field denoting what
370   object type we work on.
371
372   """
373   kind, name = _ExtractTagsObject(opts, args)
374   _ExtendTags(opts, args)
375   if not args:
376     raise errors.OpPrereqError("No tags to be added")
377   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
378   SubmitOpCode(op)
379
380
381 def RemoveTags(opts, args):
382   """Remove tags from a given object.
383
384   This is a generic implementation that knows how to deal with all
385   three cases of tag objects (cluster, node, instance). The opts
386   argument is expected to contain a tag_type field denoting what
387   object type we work on.
388
389   """
390   kind, name = _ExtractTagsObject(opts, args)
391   _ExtendTags(opts, args)
392   if not args:
393     raise errors.OpPrereqError("No tags to be removed")
394   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
395   SubmitOpCode(op)
396
397
398 def check_unit(option, opt, value): # pylint: disable-msg=W0613
399   """OptParsers custom converter for units.
400
401   """
402   try:
403     return utils.ParseUnit(value)
404   except errors.UnitParseError, err:
405     raise OptionValueError("option %s: %s" % (opt, err))
406
407
408 def _SplitKeyVal(opt, data):
409   """Convert a KeyVal string into a dict.
410
411   This function will convert a key=val[,...] string into a dict. Empty
412   values will be converted specially: keys which have the prefix 'no_'
413   will have the value=False and the prefix stripped, the others will
414   have value=True.
415
416   @type opt: string
417   @param opt: a string holding the option name for which we process the
418       data, used in building error messages
419   @type data: string
420   @param data: a string of the format key=val,key=val,...
421   @rtype: dict
422   @return: {key=val, key=val}
423   @raises errors.ParameterError: if there are duplicate keys
424
425   """
426   kv_dict = {}
427   if data:
428     for elem in utils.UnescapeAndSplit(data, sep=","):
429       if "=" in elem:
430         key, val = elem.split("=", 1)
431       else:
432         if elem.startswith(NO_PREFIX):
433           key, val = elem[len(NO_PREFIX):], False
434         elif elem.startswith(UN_PREFIX):
435           key, val = elem[len(UN_PREFIX):], None
436         else:
437           key, val = elem, True
438       if key in kv_dict:
439         raise errors.ParameterError("Duplicate key '%s' in option %s" %
440                                     (key, opt))
441       kv_dict[key] = val
442   return kv_dict
443
444
445 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
446   """Custom parser for ident:key=val,key=val options.
447
448   This will store the parsed values as a tuple (ident, {key: val}). As such,
449   multiple uses of this option via action=append is possible.
450
451   """
452   if ":" not in value:
453     ident, rest = value, ''
454   else:
455     ident, rest = value.split(":", 1)
456
457   if ident.startswith(NO_PREFIX):
458     if rest:
459       msg = "Cannot pass options when removing parameter groups: %s" % value
460       raise errors.ParameterError(msg)
461     retval = (ident[len(NO_PREFIX):], False)
462   elif ident.startswith(UN_PREFIX):
463     if rest:
464       msg = "Cannot pass options when removing parameter groups: %s" % value
465       raise errors.ParameterError(msg)
466     retval = (ident[len(UN_PREFIX):], None)
467   else:
468     kv_dict = _SplitKeyVal(opt, rest)
469     retval = (ident, kv_dict)
470   return retval
471
472
473 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
474   """Custom parser class for key=val,key=val options.
475
476   This will store the parsed values as a dict {key: val}.
477
478   """
479   return _SplitKeyVal(opt, value)
480
481
482 def check_bool(option, opt, value): # pylint: disable-msg=W0613
483   """Custom parser for yes/no options.
484
485   This will store the parsed value as either True or False.
486
487   """
488   value = value.lower()
489   if value == constants.VALUE_FALSE or value == "no":
490     return False
491   elif value == constants.VALUE_TRUE or value == "yes":
492     return True
493   else:
494     raise errors.ParameterError("Invalid boolean value '%s'" % value)
495
496
497 # completion_suggestion is normally a list. Using numeric values not evaluating
498 # to False for dynamic completion.
499 (OPT_COMPL_MANY_NODES,
500  OPT_COMPL_ONE_NODE,
501  OPT_COMPL_ONE_INSTANCE,
502  OPT_COMPL_ONE_OS,
503  OPT_COMPL_ONE_IALLOCATOR,
504  OPT_COMPL_INST_ADD_NODES) = range(100, 106)
505
506 OPT_COMPL_ALL = frozenset([
507   OPT_COMPL_MANY_NODES,
508   OPT_COMPL_ONE_NODE,
509   OPT_COMPL_ONE_INSTANCE,
510   OPT_COMPL_ONE_OS,
511   OPT_COMPL_ONE_IALLOCATOR,
512   OPT_COMPL_INST_ADD_NODES,
513   ])
514
515
516 class CliOption(Option):
517   """Custom option class for optparse.
518
519   """
520   ATTRS = Option.ATTRS + [
521     "completion_suggest",
522     ]
523   TYPES = Option.TYPES + (
524     "identkeyval",
525     "keyval",
526     "unit",
527     "bool",
528     )
529   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
530   TYPE_CHECKER["identkeyval"] = check_ident_key_val
531   TYPE_CHECKER["keyval"] = check_key_val
532   TYPE_CHECKER["unit"] = check_unit
533   TYPE_CHECKER["bool"] = check_bool
534
535
536 # optparse.py sets make_option, so we do it for our own option class, too
537 cli_option = CliOption
538
539
540 _YORNO = "yes|no"
541
542 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
543                        help="Increase debugging level")
544
545 NOHDR_OPT = cli_option("--no-headers", default=False,
546                        action="store_true", dest="no_headers",
547                        help="Don't display column headers")
548
549 SEP_OPT = cli_option("--separator", default=None,
550                      action="store", dest="separator",
551                      help=("Separator between output fields"
552                            " (defaults to one space)"))
553
554 USEUNITS_OPT = cli_option("--units", default=None,
555                           dest="units", choices=('h', 'm', 'g', 't'),
556                           help="Specify units for output (one of hmgt)")
557
558 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
559                         type="string", metavar="FIELDS",
560                         help="Comma separated list of output fields")
561
562 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
563                        default=False, help="Force the operation")
564
565 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
566                          default=False, help="Do not require confirmation")
567
568 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
569                          default=None, help="File with tag names")
570
571 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
572                         default=False, action="store_true",
573                         help=("Submit the job and return the job ID, but"
574                               " don't wait for the job to finish"))
575
576 SYNC_OPT = cli_option("--sync", dest="do_locking",
577                       default=False, action="store_true",
578                       help=("Grab locks while doing the queries"
579                             " in order to ensure more consistent results"))
580
581 DRY_RUN_OPT = cli_option("--dry-run", default=False,
582                          action="store_true",
583                          help=("Do not execute the operation, just run the"
584                                " check steps and verify it it could be"
585                                " executed"))
586
587 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
588                          action="store_true",
589                          help="Increase the verbosity of the operation")
590
591 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
592                               action="store_true", dest="simulate_errors",
593                               help="Debugging option that makes the operation"
594                               " treat most runtime checks as failed")
595
596 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
597                         default=True, action="store_false",
598                         help="Don't wait for sync (DANGEROUS!)")
599
600 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
601                                help="Custom disk setup (diskless, file,"
602                                " plain or drbd)",
603                                default=None, metavar="TEMPL",
604                                choices=list(constants.DISK_TEMPLATES))
605
606 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
607                         help="Do not create any network cards for"
608                         " the instance")
609
610 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
611                                help="Relative path under default cluster-wide"
612                                " file storage dir to store file-based disks",
613                                default=None, metavar="<DIR>")
614
615 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
616                                   help="Driver to use for image files",
617                                   default="loop", metavar="<DRIVER>",
618                                   choices=list(constants.FILE_DRIVER))
619
620 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
621                             help="Select nodes for the instance automatically"
622                             " using the <NAME> iallocator plugin",
623                             default=None, type="string",
624                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
625
626 DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
627                             metavar="<NAME>",
628                             help="Set the default instance allocator plugin",
629                             default=None, type="string",
630                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
631
632 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
633                     metavar="<os>",
634                     completion_suggest=OPT_COMPL_ONE_OS)
635
636 OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
637                          type="keyval", default={},
638                          help="OS parameters")
639
640 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
641                                action="store_true", default=False,
642                                help="Force an unknown variant")
643
644 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
645                             action="store_true", default=False,
646                             help="Do not install the OS (will"
647                             " enable no-start)")
648
649 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
650                          type="keyval", default={},
651                          help="Backend parameters")
652
653 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
654                          default={}, dest="hvparams",
655                          help="Hypervisor parameters")
656
657 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
658                             help="Hypervisor and hypervisor options, in the"
659                             " format hypervisor:option=value,option=value,...",
660                             default=None, type="identkeyval")
661
662 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
663                         help="Hypervisor and hypervisor options, in the"
664                         " format hypervisor:option=value,option=value,...",
665                         default=[], action="append", type="identkeyval")
666
667 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
668                            action="store_false",
669                            help="Don't check that the instance's IP"
670                            " is alive")
671
672 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
673                              default=True, action="store_false",
674                              help="Don't check that the instance's name"
675                              " is resolvable")
676
677 NET_OPT = cli_option("--net",
678                      help="NIC parameters", default=[],
679                      dest="nics", action="append", type="identkeyval")
680
681 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
682                       dest="disks", action="append", type="identkeyval")
683
684 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
685                          help="Comma-separated list of disks"
686                          " indices to act on (e.g. 0,2) (optional,"
687                          " defaults to all disks)")
688
689 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
690                          help="Enforces a single-disk configuration using the"
691                          " given disk size, in MiB unless a suffix is used",
692                          default=None, type="unit", metavar="<size>")
693
694 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
695                                 dest="ignore_consistency",
696                                 action="store_true", default=False,
697                                 help="Ignore the consistency of the disks on"
698                                 " the secondary")
699
700 NONLIVE_OPT = cli_option("--non-live", dest="live",
701                          default=True, action="store_false",
702                          help="Do a non-live migration (this usually means"
703                          " freeze the instance, save the state, transfer and"
704                          " only then resume running on the secondary node)")
705
706 MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode",
707                                 default=None,
708                                 choices=list(constants.HT_MIGRATION_MODES),
709                                 help="Override default migration mode (choose"
710                                 " either live or non-live")
711
712 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
713                                 help="Target node and optional secondary node",
714                                 metavar="<pnode>[:<snode>]",
715                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
716
717 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
718                            action="append", metavar="<node>",
719                            help="Use only this node (can be used multiple"
720                            " times, if not given defaults to all nodes)",
721                            completion_suggest=OPT_COMPL_ONE_NODE)
722
723 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
724                              metavar="<node>",
725                              completion_suggest=OPT_COMPL_ONE_NODE)
726
727 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
728                          action="store_false",
729                          help="Don't start the instance after creation")
730
731 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
732                          action="store_true", default=False,
733                          help="Show command instead of executing it")
734
735 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
736                          default=False, action="store_true",
737                          help="Instead of performing the migration, try to"
738                          " recover from a failed cleanup. This is safe"
739                          " to run even if the instance is healthy, but it"
740                          " will create extra replication traffic and "
741                          " disrupt briefly the replication (like during the"
742                          " migration")
743
744 STATIC_OPT = cli_option("-s", "--static", dest="static",
745                         action="store_true", default=False,
746                         help="Only show configuration data, not runtime data")
747
748 ALL_OPT = cli_option("--all", dest="show_all",
749                      default=False, action="store_true",
750                      help="Show info on all instances on the cluster."
751                      " This can take a long time to run, use wisely")
752
753 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
754                            action="store_true", default=False,
755                            help="Interactive OS reinstall, lists available"
756                            " OS templates for selection")
757
758 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
759                                  action="store_true", default=False,
760                                  help="Remove the instance from the cluster"
761                                  " configuration even if there are failures"
762                                  " during the removal process")
763
764 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
765                                         dest="ignore_remove_failures",
766                                         action="store_true", default=False,
767                                         help="Remove the instance from the"
768                                         " cluster configuration even if there"
769                                         " are failures during the removal"
770                                         " process")
771
772 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
773                                  action="store_true", default=False,
774                                  help="Remove the instance from the cluster")
775
776 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
777                                help="Specifies the new secondary node",
778                                metavar="NODE", default=None,
779                                completion_suggest=OPT_COMPL_ONE_NODE)
780
781 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
782                             default=False, action="store_true",
783                             help="Replace the disk(s) on the primary"
784                             " node (only for the drbd template)")
785
786 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
787                               default=False, action="store_true",
788                               help="Replace the disk(s) on the secondary"
789                               " node (only for the drbd template)")
790
791 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
792                               default=False, action="store_true",
793                               help="Lock all nodes and auto-promote as needed"
794                               " to MC status")
795
796 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
797                               default=False, action="store_true",
798                               help="Automatically replace faulty disks"
799                               " (only for the drbd template)")
800
801 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
802                              default=False, action="store_true",
803                              help="Ignore current recorded size"
804                              " (useful for forcing activation when"
805                              " the recorded size is wrong)")
806
807 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
808                           metavar="<node>",
809                           completion_suggest=OPT_COMPL_ONE_NODE)
810
811 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
812                          metavar="<dir>")
813
814 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
815                               help="Specify the secondary ip for the node",
816                               metavar="ADDRESS", default=None)
817
818 READD_OPT = cli_option("--readd", dest="readd",
819                        default=False, action="store_true",
820                        help="Readd old node after replacing it")
821
822 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
823                                 default=True, action="store_false",
824                                 help="Disable SSH key fingerprint checking")
825
826
827 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
828                     type="bool", default=None, metavar=_YORNO,
829                     help="Set the master_candidate flag on the node")
830
831 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
832                          type="bool", default=None,
833                          help="Set the offline flag on the node")
834
835 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
836                          type="bool", default=None,
837                          help="Set the drained flag on the node")
838
839 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
840                              type="bool", default=None, metavar=_YORNO,
841                              help="Set the allocatable flag on a volume")
842
843 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
844                                help="Disable support for lvm based instances"
845                                " (cluster-wide)",
846                                action="store_false", default=True)
847
848 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
849                             dest="enabled_hypervisors",
850                             help="Comma-separated list of hypervisors",
851                             type="string", default=None)
852
853 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
854                             type="keyval", default={},
855                             help="NIC parameters")
856
857 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
858                          dest="candidate_pool_size", type="int",
859                          help="Set the candidate pool size")
860
861 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
862                          help="Enables LVM and specifies the volume group"
863                          " name (cluster-wide) for disk allocation [xenvg]",
864                          metavar="VG", default=None)
865
866 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
867                           help="Destroy cluster", action="store_true")
868
869 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
870                           help="Skip node agreement check (dangerous)",
871                           action="store_true", default=False)
872
873 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
874                             help="Specify the mac prefix for the instance IP"
875                             " addresses, in the format XX:XX:XX",
876                             metavar="PREFIX",
877                             default=None)
878
879 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
880                                help="Specify the node interface (cluster-wide)"
881                                " on which the master IP address will be added "
882                                " [%s]" % constants.DEFAULT_BRIDGE,
883                                metavar="NETDEV",
884                                default=constants.DEFAULT_BRIDGE)
885
886 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
887                                 help="Specify the default directory (cluster-"
888                                 "wide) for storing the file-based disks [%s]" %
889                                 constants.DEFAULT_FILE_STORAGE_DIR,
890                                 metavar="DIR",
891                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
892
893 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
894                                    help="Don't modify /etc/hosts",
895                                    action="store_false", default=True)
896
897 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
898                                     help="Don't initialize SSH keys",
899                                     action="store_false", default=True)
900
901 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
902                              help="Enable parseable error messages",
903                              action="store_true", default=False)
904
905 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
906                           help="Skip N+1 memory redundancy tests",
907                           action="store_true", default=False)
908
909 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
910                              help="Type of reboot: soft/hard/full",
911                              default=constants.INSTANCE_REBOOT_HARD,
912                              metavar="<REBOOT>",
913                              choices=list(constants.REBOOT_TYPES))
914
915 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
916                                     dest="ignore_secondaries",
917                                     default=False, action="store_true",
918                                     help="Ignore errors from secondaries")
919
920 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
921                             action="store_false", default=True,
922                             help="Don't shutdown the instance (unsafe)")
923
924 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
925                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
926                          help="Maximum time to wait")
927
928 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
929                          dest="shutdown_timeout", type="int",
930                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
931                          help="Maximum time to wait for instance shutdown")
932
933 INTERVAL_OPT = cli_option("--interval", dest="interval", type="int",
934                           default=None,
935                           help=("Number of seconds between repetions of the"
936                                 " command"))
937
938 EARLY_RELEASE_OPT = cli_option("--early-release",
939                                dest="early_release", default=False,
940                                action="store_true",
941                                help="Release the locks on the secondary"
942                                " node(s) early")
943
944 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
945                                   dest="new_cluster_cert",
946                                   default=False, action="store_true",
947                                   help="Generate a new cluster certificate")
948
949 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
950                            default=None,
951                            help="File containing new RAPI certificate")
952
953 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
954                                default=None, action="store_true",
955                                help=("Generate a new self-signed RAPI"
956                                      " certificate"))
957
958 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
959                                     dest="new_confd_hmac_key",
960                                     default=False, action="store_true",
961                                     help=("Create a new HMAC key for %s" %
962                                           constants.CONFD))
963
964 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
965                                        dest="cluster_domain_secret",
966                                        default=None,
967                                        help=("Load new new cluster domain"
968                                              " secret from file"))
969
970 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
971                                            dest="new_cluster_domain_secret",
972                                            default=False, action="store_true",
973                                            help=("Create a new cluster domain"
974                                                  " secret"))
975
976 USE_REPL_NET_OPT = cli_option("--use-replication-network",
977                               dest="use_replication_network",
978                               help="Whether to use the replication network"
979                               " for talking to the nodes",
980                               action="store_true", default=False)
981
982 MAINTAIN_NODE_HEALTH_OPT = \
983     cli_option("--maintain-node-health", dest="maintain_node_health",
984                metavar=_YORNO, default=None, type="bool",
985                help="Configure the cluster to automatically maintain node"
986                " health, by shutting down unknown instances, shutting down"
987                " unknown DRBD devices, etc.")
988
989 IDENTIFY_DEFAULTS_OPT = \
990     cli_option("--identify-defaults", dest="identify_defaults",
991                default=False, action="store_true",
992                help="Identify which saved instance parameters are equal to"
993                " the current cluster defaults and set them as such, instead"
994                " of marking them as overridden")
995
996 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
997                          action="store", dest="uid_pool",
998                          help=("A list of user-ids or user-id"
999                                " ranges separated by commas"))
1000
1001 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
1002                           action="store", dest="add_uids",
1003                           help=("A list of user-ids or user-id"
1004                                 " ranges separated by commas, to be"
1005                                 " added to the user-id pool"))
1006
1007 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
1008                              action="store", dest="remove_uids",
1009                              help=("A list of user-ids or user-id"
1010                                    " ranges separated by commas, to be"
1011                                    " removed from the user-id pool"))
1012
1013 RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None,
1014                              action="store", dest="reserved_lvs",
1015                              help=("A comma-separated list of reserved"
1016                                    " logical volumes names, that will be"
1017                                    " ignored by cluster verify"))
1018
1019 ROMAN_OPT = cli_option("--roman",
1020                        dest="roman_integers", default=False,
1021                        action="store_true",
1022                        help="Use roman numbers for positive integers")
1023
1024 DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1025                              action="store", default=None,
1026                              help="Specifies usermode helper for DRBD")
1027
1028 NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1029                                 action="store_false", default=True,
1030                                 help="Disable support for DRBD")
1031
1032 #: Options provided by all commands
1033 COMMON_OPTS = [DEBUG_OPT]
1034
1035
1036 def _ParseArgs(argv, commands, aliases):
1037   """Parser for the command line arguments.
1038
1039   This function parses the arguments and returns the function which
1040   must be executed together with its (modified) arguments.
1041
1042   @param argv: the command line
1043   @param commands: dictionary with special contents, see the design
1044       doc for cmdline handling
1045   @param aliases: dictionary with command aliases {'alias': 'target, ...}
1046
1047   """
1048   if len(argv) == 0:
1049     binary = "<command>"
1050   else:
1051     binary = argv[0].split("/")[-1]
1052
1053   if len(argv) > 1 and argv[1] == "--version":
1054     ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1055              constants.RELEASE_VERSION)
1056     # Quit right away. That way we don't have to care about this special
1057     # argument. optparse.py does it the same.
1058     sys.exit(0)
1059
1060   if len(argv) < 2 or not (argv[1] in commands or
1061                            argv[1] in aliases):
1062     # let's do a nice thing
1063     sortedcmds = commands.keys()
1064     sortedcmds.sort()
1065
1066     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1067     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1068     ToStdout("")
1069
1070     # compute the max line length for cmd + usage
1071     mlen = max([len(" %s" % cmd) for cmd in commands])
1072     mlen = min(60, mlen) # should not get here...
1073
1074     # and format a nice command list
1075     ToStdout("Commands:")
1076     for cmd in sortedcmds:
1077       cmdstr = " %s" % (cmd,)
1078       help_text = commands[cmd][4]
1079       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1080       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1081       for line in help_lines:
1082         ToStdout("%-*s   %s", mlen, "", line)
1083
1084     ToStdout("")
1085
1086     return None, None, None
1087
1088   # get command, unalias it, and look it up in commands
1089   cmd = argv.pop(1)
1090   if cmd in aliases:
1091     if cmd in commands:
1092       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1093                                    " command" % cmd)
1094
1095     if aliases[cmd] not in commands:
1096       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1097                                    " command '%s'" % (cmd, aliases[cmd]))
1098
1099     cmd = aliases[cmd]
1100
1101   func, args_def, parser_opts, usage, description = commands[cmd]
1102   parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
1103                         description=description,
1104                         formatter=TitledHelpFormatter(),
1105                         usage="%%prog %s %s" % (cmd, usage))
1106   parser.disable_interspersed_args()
1107   options, args = parser.parse_args()
1108
1109   if not _CheckArguments(cmd, args_def, args):
1110     return None, None, None
1111
1112   return func, options, args
1113
1114
1115 def _CheckArguments(cmd, args_def, args):
1116   """Verifies the arguments using the argument definition.
1117
1118   Algorithm:
1119
1120     1. Abort with error if values specified by user but none expected.
1121
1122     1. For each argument in definition
1123
1124       1. Keep running count of minimum number of values (min_count)
1125       1. Keep running count of maximum number of values (max_count)
1126       1. If it has an unlimited number of values
1127
1128         1. Abort with error if it's not the last argument in the definition
1129
1130     1. If last argument has limited number of values
1131
1132       1. Abort with error if number of values doesn't match or is too large
1133
1134     1. Abort with error if user didn't pass enough values (min_count)
1135
1136   """
1137   if args and not args_def:
1138     ToStderr("Error: Command %s expects no arguments", cmd)
1139     return False
1140
1141   min_count = None
1142   max_count = None
1143   check_max = None
1144
1145   last_idx = len(args_def) - 1
1146
1147   for idx, arg in enumerate(args_def):
1148     if min_count is None:
1149       min_count = arg.min
1150     elif arg.min is not None:
1151       min_count += arg.min
1152
1153     if max_count is None:
1154       max_count = arg.max
1155     elif arg.max is not None:
1156       max_count += arg.max
1157
1158     if idx == last_idx:
1159       check_max = (arg.max is not None)
1160
1161     elif arg.max is None:
1162       raise errors.ProgrammerError("Only the last argument can have max=None")
1163
1164   if check_max:
1165     # Command with exact number of arguments
1166     if (min_count is not None and max_count is not None and
1167         min_count == max_count and len(args) != min_count):
1168       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1169       return False
1170
1171     # Command with limited number of arguments
1172     if max_count is not None and len(args) > max_count:
1173       ToStderr("Error: Command %s expects only %d argument(s)",
1174                cmd, max_count)
1175       return False
1176
1177   # Command with some required arguments
1178   if min_count is not None and len(args) < min_count:
1179     ToStderr("Error: Command %s expects at least %d argument(s)",
1180              cmd, min_count)
1181     return False
1182
1183   return True
1184
1185
1186 def SplitNodeOption(value):
1187   """Splits the value of a --node option.
1188
1189   """
1190   if value and ':' in value:
1191     return value.split(':', 1)
1192   else:
1193     return (value, None)
1194
1195
1196 def CalculateOSNames(os_name, os_variants):
1197   """Calculates all the names an OS can be called, according to its variants.
1198
1199   @type os_name: string
1200   @param os_name: base name of the os
1201   @type os_variants: list or None
1202   @param os_variants: list of supported variants
1203   @rtype: list
1204   @return: list of valid names
1205
1206   """
1207   if os_variants:
1208     return ['%s+%s' % (os_name, v) for v in os_variants]
1209   else:
1210     return [os_name]
1211
1212
1213 def ParseFields(selected, default):
1214   """Parses the values of "--field"-like options.
1215
1216   @type selected: string or None
1217   @param selected: User-selected options
1218   @type default: list
1219   @param default: Default fields
1220
1221   """
1222   if selected is None:
1223     return default
1224
1225   if selected.startswith("+"):
1226     return default + selected[1:].split(",")
1227
1228   return selected.split(",")
1229
1230
1231 UsesRPC = rpc.RunWithRPC
1232
1233
1234 def AskUser(text, choices=None):
1235   """Ask the user a question.
1236
1237   @param text: the question to ask
1238
1239   @param choices: list with elements tuples (input_char, return_value,
1240       description); if not given, it will default to: [('y', True,
1241       'Perform the operation'), ('n', False, 'Do no do the operation')];
1242       note that the '?' char is reserved for help
1243
1244   @return: one of the return values from the choices list; if input is
1245       not possible (i.e. not running with a tty, we return the last
1246       entry from the list
1247
1248   """
1249   if choices is None:
1250     choices = [('y', True, 'Perform the operation'),
1251                ('n', False, 'Do not perform the operation')]
1252   if not choices or not isinstance(choices, list):
1253     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1254   for entry in choices:
1255     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1256       raise errors.ProgrammerError("Invalid choices element to AskUser")
1257
1258   answer = choices[-1][1]
1259   new_text = []
1260   for line in text.splitlines():
1261     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1262   text = "\n".join(new_text)
1263   try:
1264     f = file("/dev/tty", "a+")
1265   except IOError:
1266     return answer
1267   try:
1268     chars = [entry[0] for entry in choices]
1269     chars[-1] = "[%s]" % chars[-1]
1270     chars.append('?')
1271     maps = dict([(entry[0], entry[1]) for entry in choices])
1272     while True:
1273       f.write(text)
1274       f.write('\n')
1275       f.write("/".join(chars))
1276       f.write(": ")
1277       line = f.readline(2).strip().lower()
1278       if line in maps:
1279         answer = maps[line]
1280         break
1281       elif line == '?':
1282         for entry in choices:
1283           f.write(" %s - %s\n" % (entry[0], entry[2]))
1284         f.write("\n")
1285         continue
1286   finally:
1287     f.close()
1288   return answer
1289
1290
1291 class JobSubmittedException(Exception):
1292   """Job was submitted, client should exit.
1293
1294   This exception has one argument, the ID of the job that was
1295   submitted. The handler should print this ID.
1296
1297   This is not an error, just a structured way to exit from clients.
1298
1299   """
1300
1301
1302 def SendJob(ops, cl=None):
1303   """Function to submit an opcode without waiting for the results.
1304
1305   @type ops: list
1306   @param ops: list of opcodes
1307   @type cl: luxi.Client
1308   @param cl: the luxi client to use for communicating with the master;
1309              if None, a new client will be created
1310
1311   """
1312   if cl is None:
1313     cl = GetClient()
1314
1315   job_id = cl.SubmitJob(ops)
1316
1317   return job_id
1318
1319
1320 def GenericPollJob(job_id, cbs, report_cbs):
1321   """Generic job-polling function.
1322
1323   @type job_id: number
1324   @param job_id: Job ID
1325   @type cbs: Instance of L{JobPollCbBase}
1326   @param cbs: Data callbacks
1327   @type report_cbs: Instance of L{JobPollReportCbBase}
1328   @param report_cbs: Reporting callbacks
1329
1330   """
1331   prev_job_info = None
1332   prev_logmsg_serial = None
1333
1334   status = None
1335
1336   while True:
1337     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1338                                       prev_logmsg_serial)
1339     if not result:
1340       # job not found, go away!
1341       raise errors.JobLost("Job with id %s lost" % job_id)
1342
1343     if result == constants.JOB_NOTCHANGED:
1344       report_cbs.ReportNotChanged(job_id, status)
1345
1346       # Wait again
1347       continue
1348
1349     # Split result, a tuple of (field values, log entries)
1350     (job_info, log_entries) = result
1351     (status, ) = job_info
1352
1353     if log_entries:
1354       for log_entry in log_entries:
1355         (serial, timestamp, log_type, message) = log_entry
1356         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1357                                     log_type, message)
1358         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1359
1360     # TODO: Handle canceled and archived jobs
1361     elif status in (constants.JOB_STATUS_SUCCESS,
1362                     constants.JOB_STATUS_ERROR,
1363                     constants.JOB_STATUS_CANCELING,
1364                     constants.JOB_STATUS_CANCELED):
1365       break
1366
1367     prev_job_info = job_info
1368
1369   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1370   if not jobs:
1371     raise errors.JobLost("Job with id %s lost" % job_id)
1372
1373   status, opstatus, result = jobs[0]
1374
1375   if status == constants.JOB_STATUS_SUCCESS:
1376     return result
1377
1378   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1379     raise errors.OpExecError("Job was canceled")
1380
1381   has_ok = False
1382   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1383     if status == constants.OP_STATUS_SUCCESS:
1384       has_ok = True
1385     elif status == constants.OP_STATUS_ERROR:
1386       errors.MaybeRaise(msg)
1387
1388       if has_ok:
1389         raise errors.OpExecError("partial failure (opcode %d): %s" %
1390                                  (idx, msg))
1391
1392       raise errors.OpExecError(str(msg))
1393
1394   # default failure mode
1395   raise errors.OpExecError(result)
1396
1397
1398 class JobPollCbBase:
1399   """Base class for L{GenericPollJob} callbacks.
1400
1401   """
1402   def __init__(self):
1403     """Initializes this class.
1404
1405     """
1406
1407   def WaitForJobChangeOnce(self, job_id, fields,
1408                            prev_job_info, prev_log_serial):
1409     """Waits for changes on a job.
1410
1411     """
1412     raise NotImplementedError()
1413
1414   def QueryJobs(self, job_ids, fields):
1415     """Returns the selected fields for the selected job IDs.
1416
1417     @type job_ids: list of numbers
1418     @param job_ids: Job IDs
1419     @type fields: list of strings
1420     @param fields: Fields
1421
1422     """
1423     raise NotImplementedError()
1424
1425
1426 class JobPollReportCbBase:
1427   """Base class for L{GenericPollJob} reporting callbacks.
1428
1429   """
1430   def __init__(self):
1431     """Initializes this class.
1432
1433     """
1434
1435   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1436     """Handles a log message.
1437
1438     """
1439     raise NotImplementedError()
1440
1441   def ReportNotChanged(self, job_id, status):
1442     """Called for if a job hasn't changed in a while.
1443
1444     @type job_id: number
1445     @param job_id: Job ID
1446     @type status: string or None
1447     @param status: Job status if available
1448
1449     """
1450     raise NotImplementedError()
1451
1452
1453 class _LuxiJobPollCb(JobPollCbBase):
1454   def __init__(self, cl):
1455     """Initializes this class.
1456
1457     """
1458     JobPollCbBase.__init__(self)
1459     self.cl = cl
1460
1461   def WaitForJobChangeOnce(self, job_id, fields,
1462                            prev_job_info, prev_log_serial):
1463     """Waits for changes on a job.
1464
1465     """
1466     return self.cl.WaitForJobChangeOnce(job_id, fields,
1467                                         prev_job_info, prev_log_serial)
1468
1469   def QueryJobs(self, job_ids, fields):
1470     """Returns the selected fields for the selected job IDs.
1471
1472     """
1473     return self.cl.QueryJobs(job_ids, fields)
1474
1475
1476 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1477   def __init__(self, feedback_fn):
1478     """Initializes this class.
1479
1480     """
1481     JobPollReportCbBase.__init__(self)
1482
1483     self.feedback_fn = feedback_fn
1484
1485     assert callable(feedback_fn)
1486
1487   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1488     """Handles a log message.
1489
1490     """
1491     self.feedback_fn((timestamp, log_type, log_msg))
1492
1493   def ReportNotChanged(self, job_id, status):
1494     """Called if a job hasn't changed in a while.
1495
1496     """
1497     # Ignore
1498
1499
1500 class StdioJobPollReportCb(JobPollReportCbBase):
1501   def __init__(self):
1502     """Initializes this class.
1503
1504     """
1505     JobPollReportCbBase.__init__(self)
1506
1507     self.notified_queued = False
1508     self.notified_waitlock = False
1509
1510   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1511     """Handles a log message.
1512
1513     """
1514     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1515              FormatLogMessage(log_type, log_msg))
1516
1517   def ReportNotChanged(self, job_id, status):
1518     """Called if a job hasn't changed in a while.
1519
1520     """
1521     if status is None:
1522       return
1523
1524     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1525       ToStderr("Job %s is waiting in queue", job_id)
1526       self.notified_queued = True
1527
1528     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1529       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1530       self.notified_waitlock = True
1531
1532
1533 def FormatLogMessage(log_type, log_msg):
1534   """Formats a job message according to its type.
1535
1536   """
1537   if log_type != constants.ELOG_MESSAGE:
1538     log_msg = str(log_msg)
1539
1540   return utils.SafeEncode(log_msg)
1541
1542
1543 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1544   """Function to poll for the result of a job.
1545
1546   @type job_id: job identified
1547   @param job_id: the job to poll for results
1548   @type cl: luxi.Client
1549   @param cl: the luxi client to use for communicating with the master;
1550              if None, a new client will be created
1551
1552   """
1553   if cl is None:
1554     cl = GetClient()
1555
1556   if reporter is None:
1557     if feedback_fn:
1558       reporter = FeedbackFnJobPollReportCb(feedback_fn)
1559     else:
1560       reporter = StdioJobPollReportCb()
1561   elif feedback_fn:
1562     raise errors.ProgrammerError("Can't specify reporter and feedback function")
1563
1564   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1565
1566
1567 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1568   """Legacy function to submit an opcode.
1569
1570   This is just a simple wrapper over the construction of the processor
1571   instance. It should be extended to better handle feedback and
1572   interaction functions.
1573
1574   """
1575   if cl is None:
1576     cl = GetClient()
1577
1578   SetGenericOpcodeOpts([op], opts)
1579
1580   job_id = SendJob([op], cl=cl)
1581
1582   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1583                        reporter=reporter)
1584
1585   return op_results[0]
1586
1587
1588 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1589   """Wrapper around SubmitOpCode or SendJob.
1590
1591   This function will decide, based on the 'opts' parameter, whether to
1592   submit and wait for the result of the opcode (and return it), or
1593   whether to just send the job and print its identifier. It is used in
1594   order to simplify the implementation of the '--submit' option.
1595
1596   It will also process the opcodes if we're sending the via SendJob
1597   (otherwise SubmitOpCode does it).
1598
1599   """
1600   if opts and opts.submit_only:
1601     job = [op]
1602     SetGenericOpcodeOpts(job, opts)
1603     job_id = SendJob(job, cl=cl)
1604     raise JobSubmittedException(job_id)
1605   else:
1606     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1607
1608
1609 def SetGenericOpcodeOpts(opcode_list, options):
1610   """Processor for generic options.
1611
1612   This function updates the given opcodes based on generic command
1613   line options (like debug, dry-run, etc.).
1614
1615   @param opcode_list: list of opcodes
1616   @param options: command line options or None
1617   @return: None (in-place modification)
1618
1619   """
1620   if not options:
1621     return
1622   for op in opcode_list:
1623     if hasattr(options, "dry_run"):
1624       op.dry_run = options.dry_run
1625     op.debug_level = options.debug
1626
1627
1628 def GetClient():
1629   # TODO: Cache object?
1630   try:
1631     client = luxi.Client()
1632   except luxi.NoMasterError:
1633     ss = ssconf.SimpleStore()
1634
1635     # Try to read ssconf file
1636     try:
1637       ss.GetMasterNode()
1638     except errors.ConfigurationError:
1639       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1640                                  " not part of a cluster")
1641
1642     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1643     if master != myself:
1644       raise errors.OpPrereqError("This is not the master node, please connect"
1645                                  " to node '%s' and rerun the command" %
1646                                  master)
1647     raise
1648   return client
1649
1650
1651 def FormatError(err):
1652   """Return a formatted error message for a given error.
1653
1654   This function takes an exception instance and returns a tuple
1655   consisting of two values: first, the recommended exit code, and
1656   second, a string describing the error message (not
1657   newline-terminated).
1658
1659   """
1660   retcode = 1
1661   obuf = StringIO()
1662   msg = str(err)
1663   if isinstance(err, errors.ConfigurationError):
1664     txt = "Corrupt configuration file: %s" % msg
1665     logging.error(txt)
1666     obuf.write(txt + "\n")
1667     obuf.write("Aborting.")
1668     retcode = 2
1669   elif isinstance(err, errors.HooksAbort):
1670     obuf.write("Failure: hooks execution failed:\n")
1671     for node, script, out in err.args[0]:
1672       if out:
1673         obuf.write("  node: %s, script: %s, output: %s\n" %
1674                    (node, script, out))
1675       else:
1676         obuf.write("  node: %s, script: %s (no output)\n" %
1677                    (node, script))
1678   elif isinstance(err, errors.HooksFailure):
1679     obuf.write("Failure: hooks general failure: %s" % msg)
1680   elif isinstance(err, errors.ResolverError):
1681     this_host = netutils.HostInfo.SysName()
1682     if err.args[0] == this_host:
1683       msg = "Failure: can't resolve my own hostname ('%s')"
1684     else:
1685       msg = "Failure: can't resolve hostname '%s'"
1686     obuf.write(msg % err.args[0])
1687   elif isinstance(err, errors.OpPrereqError):
1688     if len(err.args) == 2:
1689       obuf.write("Failure: prerequisites not met for this"
1690                " operation:\nerror type: %s, error details:\n%s" %
1691                  (err.args[1], err.args[0]))
1692     else:
1693       obuf.write("Failure: prerequisites not met for this"
1694                  " operation:\n%s" % msg)
1695   elif isinstance(err, errors.OpExecError):
1696     obuf.write("Failure: command execution error:\n%s" % msg)
1697   elif isinstance(err, errors.TagError):
1698     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1699   elif isinstance(err, errors.JobQueueDrainError):
1700     obuf.write("Failure: the job queue is marked for drain and doesn't"
1701                " accept new requests\n")
1702   elif isinstance(err, errors.JobQueueFull):
1703     obuf.write("Failure: the job queue is full and doesn't accept new"
1704                " job submissions until old jobs are archived\n")
1705   elif isinstance(err, errors.TypeEnforcementError):
1706     obuf.write("Parameter Error: %s" % msg)
1707   elif isinstance(err, errors.ParameterError):
1708     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1709   elif isinstance(err, luxi.NoMasterError):
1710     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1711                " and listening for connections?")
1712   elif isinstance(err, luxi.TimeoutError):
1713     obuf.write("Timeout while talking to the master daemon. Error:\n"
1714                "%s" % msg)
1715   elif isinstance(err, luxi.PermissionError):
1716     obuf.write("It seems you don't have permissions to connect to the"
1717                " master daemon.\nPlease retry as a different user.")
1718   elif isinstance(err, luxi.ProtocolError):
1719     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1720                "%s" % msg)
1721   elif isinstance(err, errors.JobLost):
1722     obuf.write("Error checking job status: %s" % msg)
1723   elif isinstance(err, errors.GenericError):
1724     obuf.write("Unhandled Ganeti error: %s" % msg)
1725   elif isinstance(err, JobSubmittedException):
1726     obuf.write("JobID: %s\n" % err.args[0])
1727     retcode = 0
1728   else:
1729     obuf.write("Unhandled exception: %s" % msg)
1730   return retcode, obuf.getvalue().rstrip('\n')
1731
1732
1733 def GenericMain(commands, override=None, aliases=None):
1734   """Generic main function for all the gnt-* commands.
1735
1736   Arguments:
1737     - commands: a dictionary with a special structure, see the design doc
1738                 for command line handling.
1739     - override: if not None, we expect a dictionary with keys that will
1740                 override command line options; this can be used to pass
1741                 options from the scripts to generic functions
1742     - aliases: dictionary with command aliases {'alias': 'target, ...}
1743
1744   """
1745   # save the program name and the entire command line for later logging
1746   if sys.argv:
1747     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1748     if len(sys.argv) >= 2:
1749       binary += " " + sys.argv[1]
1750       old_cmdline = " ".join(sys.argv[2:])
1751     else:
1752       old_cmdline = ""
1753   else:
1754     binary = "<unknown program>"
1755     old_cmdline = ""
1756
1757   if aliases is None:
1758     aliases = {}
1759
1760   try:
1761     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1762   except errors.ParameterError, err:
1763     result, err_msg = FormatError(err)
1764     ToStderr(err_msg)
1765     return 1
1766
1767   if func is None: # parse error
1768     return 1
1769
1770   if override is not None:
1771     for key, val in override.iteritems():
1772       setattr(options, key, val)
1773
1774   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1775                      stderr_logging=True, program=binary)
1776
1777   if old_cmdline:
1778     logging.info("run with arguments '%s'", old_cmdline)
1779   else:
1780     logging.info("run with no arguments")
1781
1782   try:
1783     result = func(options, args)
1784   except (errors.GenericError, luxi.ProtocolError,
1785           JobSubmittedException), err:
1786     result, err_msg = FormatError(err)
1787     logging.exception("Error during command processing")
1788     ToStderr(err_msg)
1789
1790   return result
1791
1792
1793 def GenericInstanceCreate(mode, opts, args):
1794   """Add an instance to the cluster via either creation or import.
1795
1796   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1797   @param opts: the command line options selected by the user
1798   @type args: list
1799   @param args: should contain only one element, the new instance name
1800   @rtype: int
1801   @return: the desired exit code
1802
1803   """
1804   instance = args[0]
1805
1806   (pnode, snode) = SplitNodeOption(opts.node)
1807
1808   hypervisor = None
1809   hvparams = {}
1810   if opts.hypervisor:
1811     hypervisor, hvparams = opts.hypervisor
1812
1813   if opts.nics:
1814     try:
1815       nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1816     except ValueError, err:
1817       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1818     nics = [{}] * nic_max
1819     for nidx, ndict in opts.nics:
1820       nidx = int(nidx)
1821       if not isinstance(ndict, dict):
1822         msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1823         raise errors.OpPrereqError(msg)
1824       nics[nidx] = ndict
1825   elif opts.no_nics:
1826     # no nics
1827     nics = []
1828   elif mode == constants.INSTANCE_CREATE:
1829     # default of one nic, all auto
1830     nics = [{}]
1831   else:
1832     # mode == import
1833     nics = []
1834
1835   if opts.disk_template == constants.DT_DISKLESS:
1836     if opts.disks or opts.sd_size is not None:
1837       raise errors.OpPrereqError("Diskless instance but disk"
1838                                  " information passed")
1839     disks = []
1840   else:
1841     if (not opts.disks and not opts.sd_size
1842         and mode == constants.INSTANCE_CREATE):
1843       raise errors.OpPrereqError("No disk information specified")
1844     if opts.disks and opts.sd_size is not None:
1845       raise errors.OpPrereqError("Please use either the '--disk' or"
1846                                  " '-s' option")
1847     if opts.sd_size is not None:
1848       opts.disks = [(0, {"size": opts.sd_size})]
1849
1850     if opts.disks:
1851       try:
1852         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1853       except ValueError, err:
1854         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1855       disks = [{}] * disk_max
1856     else:
1857       disks = []
1858     for didx, ddict in opts.disks:
1859       didx = int(didx)
1860       if not isinstance(ddict, dict):
1861         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1862         raise errors.OpPrereqError(msg)
1863       elif "size" in ddict:
1864         if "adopt" in ddict:
1865           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1866                                      " (disk %d)" % didx)
1867         try:
1868           ddict["size"] = utils.ParseUnit(ddict["size"])
1869         except ValueError, err:
1870           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1871                                      (didx, err))
1872       elif "adopt" in ddict:
1873         if mode == constants.INSTANCE_IMPORT:
1874           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1875                                      " import")
1876         ddict["size"] = 0
1877       else:
1878         raise errors.OpPrereqError("Missing size or adoption source for"
1879                                    " disk %d" % didx)
1880       disks[didx] = ddict
1881
1882   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1883   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1884
1885   if mode == constants.INSTANCE_CREATE:
1886     start = opts.start
1887     os_type = opts.os
1888     force_variant = opts.force_variant
1889     src_node = None
1890     src_path = None
1891     no_install = opts.no_install
1892     identify_defaults = False
1893   elif mode == constants.INSTANCE_IMPORT:
1894     start = False
1895     os_type = None
1896     force_variant = False
1897     src_node = opts.src_node
1898     src_path = opts.src_dir
1899     no_install = None
1900     identify_defaults = opts.identify_defaults
1901   else:
1902     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1903
1904   op = opcodes.OpCreateInstance(instance_name=instance,
1905                                 disks=disks,
1906                                 disk_template=opts.disk_template,
1907                                 nics=nics,
1908                                 pnode=pnode, snode=snode,
1909                                 ip_check=opts.ip_check,
1910                                 name_check=opts.name_check,
1911                                 wait_for_sync=opts.wait_for_sync,
1912                                 file_storage_dir=opts.file_storage_dir,
1913                                 file_driver=opts.file_driver,
1914                                 iallocator=opts.iallocator,
1915                                 hypervisor=hypervisor,
1916                                 hvparams=hvparams,
1917                                 beparams=opts.beparams,
1918                                 osparams=opts.osparams,
1919                                 mode=mode,
1920                                 start=start,
1921                                 os_type=os_type,
1922                                 force_variant=force_variant,
1923                                 src_node=src_node,
1924                                 src_path=src_path,
1925                                 no_install=no_install,
1926                                 identify_defaults=identify_defaults)
1927
1928   SubmitOrSend(op, opts)
1929   return 0
1930
1931
1932 class _RunWhileClusterStoppedHelper:
1933   """Helper class for L{RunWhileClusterStopped} to simplify state management
1934
1935   """
1936   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1937     """Initializes this class.
1938
1939     @type feedback_fn: callable
1940     @param feedback_fn: Feedback function
1941     @type cluster_name: string
1942     @param cluster_name: Cluster name
1943     @type master_node: string
1944     @param master_node Master node name
1945     @type online_nodes: list
1946     @param online_nodes: List of names of online nodes
1947
1948     """
1949     self.feedback_fn = feedback_fn
1950     self.cluster_name = cluster_name
1951     self.master_node = master_node
1952     self.online_nodes = online_nodes
1953
1954     self.ssh = ssh.SshRunner(self.cluster_name)
1955
1956     self.nonmaster_nodes = [name for name in online_nodes
1957                             if name != master_node]
1958
1959     assert self.master_node not in self.nonmaster_nodes
1960
1961   def _RunCmd(self, node_name, cmd):
1962     """Runs a command on the local or a remote machine.
1963
1964     @type node_name: string
1965     @param node_name: Machine name
1966     @type cmd: list
1967     @param cmd: Command
1968
1969     """
1970     if node_name is None or node_name == self.master_node:
1971       # No need to use SSH
1972       result = utils.RunCmd(cmd)
1973     else:
1974       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1975
1976     if result.failed:
1977       errmsg = ["Failed to run command %s" % result.cmd]
1978       if node_name:
1979         errmsg.append("on node %s" % node_name)
1980       errmsg.append(": exitcode %s and error %s" %
1981                     (result.exit_code, result.output))
1982       raise errors.OpExecError(" ".join(errmsg))
1983
1984   def Call(self, fn, *args):
1985     """Call function while all daemons are stopped.
1986
1987     @type fn: callable
1988     @param fn: Function to be called
1989
1990     """
1991     # Pause watcher by acquiring an exclusive lock on watcher state file
1992     self.feedback_fn("Blocking watcher")
1993     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1994     try:
1995       # TODO: Currently, this just blocks. There's no timeout.
1996       # TODO: Should it be a shared lock?
1997       watcher_block.Exclusive(blocking=True)
1998
1999       # Stop master daemons, so that no new jobs can come in and all running
2000       # ones are finished
2001       self.feedback_fn("Stopping master daemons")
2002       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
2003       try:
2004         # Stop daemons on all nodes
2005         for node_name in self.online_nodes:
2006           self.feedback_fn("Stopping daemons on %s" % node_name)
2007           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
2008
2009         # All daemons are shut down now
2010         try:
2011           return fn(self, *args)
2012         except Exception, err:
2013           _, errmsg = FormatError(err)
2014           logging.exception("Caught exception")
2015           self.feedback_fn(errmsg)
2016           raise
2017       finally:
2018         # Start cluster again, master node last
2019         for node_name in self.nonmaster_nodes + [self.master_node]:
2020           self.feedback_fn("Starting daemons on %s" % node_name)
2021           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
2022     finally:
2023       # Resume watcher
2024       watcher_block.Close()
2025
2026
2027 def RunWhileClusterStopped(feedback_fn, fn, *args):
2028   """Calls a function while all cluster daemons are stopped.
2029
2030   @type feedback_fn: callable
2031   @param feedback_fn: Feedback function
2032   @type fn: callable
2033   @param fn: Function to be called when daemons are stopped
2034
2035   """
2036   feedback_fn("Gathering cluster information")
2037
2038   # This ensures we're running on the master daemon
2039   cl = GetClient()
2040
2041   (cluster_name, master_node) = \
2042     cl.QueryConfigValues(["cluster_name", "master_node"])
2043
2044   online_nodes = GetOnlineNodes([], cl=cl)
2045
2046   # Don't keep a reference to the client. The master daemon will go away.
2047   del cl
2048
2049   assert master_node in online_nodes
2050
2051   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2052                                        online_nodes).Call(fn, *args)
2053
2054
2055 def GenerateTable(headers, fields, separator, data,
2056                   numfields=None, unitfields=None,
2057                   units=None):
2058   """Prints a table with headers and different fields.
2059
2060   @type headers: dict
2061   @param headers: dictionary mapping field names to headers for
2062       the table
2063   @type fields: list
2064   @param fields: the field names corresponding to each row in
2065       the data field
2066   @param separator: the separator to be used; if this is None,
2067       the default 'smart' algorithm is used which computes optimal
2068       field width, otherwise just the separator is used between
2069       each field
2070   @type data: list
2071   @param data: a list of lists, each sublist being one row to be output
2072   @type numfields: list
2073   @param numfields: a list with the fields that hold numeric
2074       values and thus should be right-aligned
2075   @type unitfields: list
2076   @param unitfields: a list with the fields that hold numeric
2077       values that should be formatted with the units field
2078   @type units: string or None
2079   @param units: the units we should use for formatting, or None for
2080       automatic choice (human-readable for non-separator usage, otherwise
2081       megabytes); this is a one-letter string
2082
2083   """
2084   if units is None:
2085     if separator:
2086       units = "m"
2087     else:
2088       units = "h"
2089
2090   if numfields is None:
2091     numfields = []
2092   if unitfields is None:
2093     unitfields = []
2094
2095   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2096   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2097
2098   format_fields = []
2099   for field in fields:
2100     if headers and field not in headers:
2101       # TODO: handle better unknown fields (either revert to old
2102       # style of raising exception, or deal more intelligently with
2103       # variable fields)
2104       headers[field] = field
2105     if separator is not None:
2106       format_fields.append("%s")
2107     elif numfields.Matches(field):
2108       format_fields.append("%*s")
2109     else:
2110       format_fields.append("%-*s")
2111
2112   if separator is None:
2113     mlens = [0 for name in fields]
2114     format_str = ' '.join(format_fields)
2115   else:
2116     format_str = separator.replace("%", "%%").join(format_fields)
2117
2118   for row in data:
2119     if row is None:
2120       continue
2121     for idx, val in enumerate(row):
2122       if unitfields.Matches(fields[idx]):
2123         try:
2124           val = int(val)
2125         except (TypeError, ValueError):
2126           pass
2127         else:
2128           val = row[idx] = utils.FormatUnit(val, units)
2129       val = row[idx] = str(val)
2130       if separator is None:
2131         mlens[idx] = max(mlens[idx], len(val))
2132
2133   result = []
2134   if headers:
2135     args = []
2136     for idx, name in enumerate(fields):
2137       hdr = headers[name]
2138       if separator is None:
2139         mlens[idx] = max(mlens[idx], len(hdr))
2140         args.append(mlens[idx])
2141       args.append(hdr)
2142     result.append(format_str % tuple(args))
2143
2144   if separator is None:
2145     assert len(mlens) == len(fields)
2146
2147     if fields and not numfields.Matches(fields[-1]):
2148       mlens[-1] = 0
2149
2150   for line in data:
2151     args = []
2152     if line is None:
2153       line = ['-' for _ in fields]
2154     for idx in range(len(fields)):
2155       if separator is None:
2156         args.append(mlens[idx])
2157       args.append(line[idx])
2158     result.append(format_str % tuple(args))
2159
2160   return result
2161
2162
2163 def FormatTimestamp(ts):
2164   """Formats a given timestamp.
2165
2166   @type ts: timestamp
2167   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2168
2169   @rtype: string
2170   @return: a string with the formatted timestamp
2171
2172   """
2173   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2174     return '?'
2175   sec, usec = ts
2176   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2177
2178
2179 def ParseTimespec(value):
2180   """Parse a time specification.
2181
2182   The following suffixed will be recognized:
2183
2184     - s: seconds
2185     - m: minutes
2186     - h: hours
2187     - d: day
2188     - w: weeks
2189
2190   Without any suffix, the value will be taken to be in seconds.
2191
2192   """
2193   value = str(value)
2194   if not value:
2195     raise errors.OpPrereqError("Empty time specification passed")
2196   suffix_map = {
2197     's': 1,
2198     'm': 60,
2199     'h': 3600,
2200     'd': 86400,
2201     'w': 604800,
2202     }
2203   if value[-1] not in suffix_map:
2204     try:
2205       value = int(value)
2206     except (TypeError, ValueError):
2207       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2208   else:
2209     multiplier = suffix_map[value[-1]]
2210     value = value[:-1]
2211     if not value: # no data left after stripping the suffix
2212       raise errors.OpPrereqError("Invalid time specification (only"
2213                                  " suffix passed)")
2214     try:
2215       value = int(value) * multiplier
2216     except (TypeError, ValueError):
2217       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2218   return value
2219
2220
2221 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2222                    filter_master=False):
2223   """Returns the names of online nodes.
2224
2225   This function will also log a warning on stderr with the names of
2226   the online nodes.
2227
2228   @param nodes: if not empty, use only this subset of nodes (minus the
2229       offline ones)
2230   @param cl: if not None, luxi client to use
2231   @type nowarn: boolean
2232   @param nowarn: by default, this function will output a note with the
2233       offline nodes that are skipped; if this parameter is True the
2234       note is not displayed
2235   @type secondary_ips: boolean
2236   @param secondary_ips: if True, return the secondary IPs instead of the
2237       names, useful for doing network traffic over the replication interface
2238       (if any)
2239   @type filter_master: boolean
2240   @param filter_master: if True, do not return the master node in the list
2241       (useful in coordination with secondary_ips where we cannot check our
2242       node name against the list)
2243
2244   """
2245   if cl is None:
2246     cl = GetClient()
2247
2248   if secondary_ips:
2249     name_idx = 2
2250   else:
2251     name_idx = 0
2252
2253   if filter_master:
2254     master_node = cl.QueryConfigValues(["master_node"])[0]
2255     filter_fn = lambda x: x != master_node
2256   else:
2257     filter_fn = lambda _: True
2258
2259   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2260                          use_locking=False)
2261   offline = [row[0] for row in result if row[1]]
2262   if offline and not nowarn:
2263     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2264   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2265
2266
2267 def _ToStream(stream, txt, *args):
2268   """Write a message to a stream, bypassing the logging system
2269
2270   @type stream: file object
2271   @param stream: the file to which we should write
2272   @type txt: str
2273   @param txt: the message
2274
2275   """
2276   if args:
2277     args = tuple(args)
2278     stream.write(txt % args)
2279   else:
2280     stream.write(txt)
2281   stream.write('\n')
2282   stream.flush()
2283
2284
2285 def ToStdout(txt, *args):
2286   """Write a message to stdout only, bypassing the logging system
2287
2288   This is just a wrapper over _ToStream.
2289
2290   @type txt: str
2291   @param txt: the message
2292
2293   """
2294   _ToStream(sys.stdout, txt, *args)
2295
2296
2297 def ToStderr(txt, *args):
2298   """Write a message to stderr only, bypassing the logging system
2299
2300   This is just a wrapper over _ToStream.
2301
2302   @type txt: str
2303   @param txt: the message
2304
2305   """
2306   _ToStream(sys.stderr, txt, *args)
2307
2308
2309 class JobExecutor(object):
2310   """Class which manages the submission and execution of multiple jobs.
2311
2312   Note that instances of this class should not be reused between
2313   GetResults() calls.
2314
2315   """
2316   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2317     self.queue = []
2318     if cl is None:
2319       cl = GetClient()
2320     self.cl = cl
2321     self.verbose = verbose
2322     self.jobs = []
2323     self.opts = opts
2324     self.feedback_fn = feedback_fn
2325
2326   def QueueJob(self, name, *ops):
2327     """Record a job for later submit.
2328
2329     @type name: string
2330     @param name: a description of the job, will be used in WaitJobSet
2331     """
2332     SetGenericOpcodeOpts(ops, self.opts)
2333     self.queue.append((name, ops))
2334
2335   def SubmitPending(self, each=False):
2336     """Submit all pending jobs.
2337
2338     """
2339     if each:
2340       results = []
2341       for row in self.queue:
2342         # SubmitJob will remove the success status, but raise an exception if
2343         # the submission fails, so we'll notice that anyway.
2344         results.append([True, self.cl.SubmitJob(row[1])])
2345     else:
2346       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2347     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2348                                                             self.queue)):
2349       self.jobs.append((idx, status, data, name))
2350
2351   def _ChooseJob(self):
2352     """Choose a non-waiting/queued job to poll next.
2353
2354     """
2355     assert self.jobs, "_ChooseJob called with empty job list"
2356
2357     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2358     assert result
2359
2360     for job_data, status in zip(self.jobs, result):
2361       if (isinstance(status, list) and status and
2362           status[0] in (constants.JOB_STATUS_QUEUED,
2363                         constants.JOB_STATUS_WAITLOCK,
2364                         constants.JOB_STATUS_CANCELING)):
2365         # job is still present and waiting
2366         continue
2367       # good candidate found (either running job or lost job)
2368       self.jobs.remove(job_data)
2369       return job_data
2370
2371     # no job found
2372     return self.jobs.pop(0)
2373
2374   def GetResults(self):
2375     """Wait for and return the results of all jobs.
2376
2377     @rtype: list
2378     @return: list of tuples (success, job results), in the same order
2379         as the submitted jobs; if a job has failed, instead of the result
2380         there will be the error message
2381
2382     """
2383     if not self.jobs:
2384       self.SubmitPending()
2385     results = []
2386     if self.verbose:
2387       ok_jobs = [row[2] for row in self.jobs if row[1]]
2388       if ok_jobs:
2389         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2390
2391     # first, remove any non-submitted jobs
2392     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2393     for idx, _, jid, name in failures:
2394       ToStderr("Failed to submit job for %s: %s", name, jid)
2395       results.append((idx, False, jid))
2396
2397     while self.jobs:
2398       (idx, _, jid, name) = self._ChooseJob()
2399       ToStdout("Waiting for job %s for %s...", jid, name)
2400       try:
2401         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2402         success = True
2403       except errors.JobLost, err:
2404         _, job_result = FormatError(err)
2405         ToStderr("Job %s for %s has been archived, cannot check its result",
2406                  jid, name)
2407         success = False
2408       except (errors.GenericError, luxi.ProtocolError), err:
2409         _, job_result = FormatError(err)
2410         success = False
2411         # the error message will always be shown, verbose or not
2412         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2413
2414       results.append((idx, success, job_result))
2415
2416     # sort based on the index, then drop it
2417     results.sort()
2418     results = [i[1:] for i in results]
2419
2420     return results
2421
2422   def WaitOrShow(self, wait):
2423     """Wait for job results or only print the job IDs.
2424
2425     @type wait: boolean
2426     @param wait: whether to wait or not
2427
2428     """
2429     if wait:
2430       return self.GetResults()
2431     else:
2432       if not self.jobs:
2433         self.SubmitPending()
2434       for _, status, result, name in self.jobs:
2435         if status:
2436           ToStdout("%s: %s", result, name)
2437         else:
2438           ToStderr("Failure for %s: %s", name, result)
2439       return [row[1:3] for row in self.jobs]