Add nodegroup option to AddNode
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41 from ganeti import netutils
42
43 from optparse import (OptionParser, TitledHelpFormatter,
44                       Option, OptionValueError)
45
46
47 __all__ = [
48   # Command line options
49   "ADD_UIDS_OPT",
50   "ALLOCATABLE_OPT",
51   "ALL_OPT",
52   "AUTO_PROMOTE_OPT",
53   "AUTO_REPLACE_OPT",
54   "BACKEND_OPT",
55   "CLEANUP_OPT",
56   "CLUSTER_DOMAIN_SECRET_OPT",
57   "CONFIRM_OPT",
58   "CP_SIZE_OPT",
59   "DEBUG_OPT",
60   "DEBUG_SIMERR_OPT",
61   "DISKIDX_OPT",
62   "DISK_OPT",
63   "DISK_TEMPLATE_OPT",
64   "DRAINED_OPT",
65   "DRY_RUN_OPT",
66   "DRBD_HELPER_OPT",
67   "EARLY_RELEASE_OPT",
68   "ENABLED_HV_OPT",
69   "ERROR_CODES_OPT",
70   "FIELDS_OPT",
71   "FILESTORE_DIR_OPT",
72   "FILESTORE_DRIVER_OPT",
73   "FORCE_OPT",
74   "FORCE_VARIANT_OPT",
75   "GLOBAL_FILEDIR_OPT",
76   "HVLIST_OPT",
77   "HVOPTS_OPT",
78   "HYPERVISOR_OPT",
79   "IALLOCATOR_OPT",
80   "DEFAULT_IALLOCATOR_OPT",
81   "IDENTIFY_DEFAULTS_OPT",
82   "IGNORE_CONSIST_OPT",
83   "IGNORE_FAILURES_OPT",
84   "IGNORE_REMOVE_FAILURES_OPT",
85   "IGNORE_SECONDARIES_OPT",
86   "IGNORE_SIZE_OPT",
87   "INTERVAL_OPT",
88   "MAC_PREFIX_OPT",
89   "MAINTAIN_NODE_HEALTH_OPT",
90   "MASTER_NETDEV_OPT",
91   "MC_OPT",
92   "MIGRATION_MODE_OPT",
93   "NET_OPT",
94   "NEW_CLUSTER_CERT_OPT",
95   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
96   "NEW_CONFD_HMAC_KEY_OPT",
97   "NEW_RAPI_CERT_OPT",
98   "NEW_SECONDARY_OPT",
99   "NIC_PARAMS_OPT",
100   "NODE_LIST_OPT",
101   "NODE_PLACEMENT_OPT",
102   "NODEGROUP_OPT",
103   "NODRBD_STORAGE_OPT",
104   "NOHDR_OPT",
105   "NOIPCHECK_OPT",
106   "NO_INSTALL_OPT",
107   "NONAMECHECK_OPT",
108   "NOLVM_STORAGE_OPT",
109   "NOMODIFY_ETCHOSTS_OPT",
110   "NOMODIFY_SSH_SETUP_OPT",
111   "NONICS_OPT",
112   "NONLIVE_OPT",
113   "NONPLUS1_OPT",
114   "NOSHUTDOWN_OPT",
115   "NOSTART_OPT",
116   "NOSSH_KEYCHECK_OPT",
117   "NOVOTING_OPT",
118   "NWSYNC_OPT",
119   "ON_PRIMARY_OPT",
120   "ON_SECONDARY_OPT",
121   "OFFLINE_OPT",
122   "OSPARAMS_OPT",
123   "OS_OPT",
124   "OS_SIZE_OPT",
125   "PRIMARY_IP_VERSION_OPT",
126   "PRIORITY_OPT",
127   "RAPI_CERT_OPT",
128   "READD_OPT",
129   "REBOOT_TYPE_OPT",
130   "REMOVE_INSTANCE_OPT",
131   "REMOVE_UIDS_OPT",
132   "RESERVED_LVS_OPT",
133   "ROMAN_OPT",
134   "SECONDARY_IP_OPT",
135   "SELECT_OS_OPT",
136   "SEP_OPT",
137   "SHOWCMD_OPT",
138   "SHUTDOWN_TIMEOUT_OPT",
139   "SINGLE_NODE_OPT",
140   "SRC_DIR_OPT",
141   "SRC_NODE_OPT",
142   "SUBMIT_OPT",
143   "STATIC_OPT",
144   "SYNC_OPT",
145   "TAG_SRC_OPT",
146   "TIMEOUT_OPT",
147   "UIDPOOL_OPT",
148   "USEUNITS_OPT",
149   "USE_REPL_NET_OPT",
150   "VERBOSE_OPT",
151   "VG_NAME_OPT",
152   "YES_DOIT_OPT",
153   # Generic functions for CLI programs
154   "GenericMain",
155   "GenericInstanceCreate",
156   "GetClient",
157   "GetOnlineNodes",
158   "JobExecutor",
159   "JobSubmittedException",
160   "ParseTimespec",
161   "RunWhileClusterStopped",
162   "SubmitOpCode",
163   "SubmitOrSend",
164   "UsesRPC",
165   # Formatting functions
166   "ToStderr", "ToStdout",
167   "FormatError",
168   "GenerateTable",
169   "AskUser",
170   "FormatTimestamp",
171   "FormatLogMessage",
172   # Tags functions
173   "ListTags",
174   "AddTags",
175   "RemoveTags",
176   # command line options support infrastructure
177   "ARGS_MANY_INSTANCES",
178   "ARGS_MANY_NODES",
179   "ARGS_NONE",
180   "ARGS_ONE_INSTANCE",
181   "ARGS_ONE_NODE",
182   "ARGS_ONE_OS",
183   "ArgChoice",
184   "ArgCommand",
185   "ArgFile",
186   "ArgHost",
187   "ArgInstance",
188   "ArgJobId",
189   "ArgNode",
190   "ArgOs",
191   "ArgSuggest",
192   "ArgUnknown",
193   "OPT_COMPL_INST_ADD_NODES",
194   "OPT_COMPL_MANY_NODES",
195   "OPT_COMPL_ONE_IALLOCATOR",
196   "OPT_COMPL_ONE_INSTANCE",
197   "OPT_COMPL_ONE_NODE",
198   "OPT_COMPL_ONE_NODEGROUP",
199   "OPT_COMPL_ONE_OS",
200   "cli_option",
201   "SplitNodeOption",
202   "CalculateOSNames",
203   "ParseFields",
204   ]
205
206 NO_PREFIX = "no_"
207 UN_PREFIX = "-"
208
209 #: Priorities (sorted)
210 _PRIORITY_NAMES = [
211   ("low", constants.OP_PRIO_LOW),
212   ("normal", constants.OP_PRIO_NORMAL),
213   ("high", constants.OP_PRIO_HIGH),
214   ]
215
216 #: Priority dictionary for easier lookup
217 # TODO: Replace this and _PRIORITY_NAMES with a single sorted dictionary once
218 # we migrate to Python 2.6
219 _PRIONAME_TO_VALUE = dict(_PRIORITY_NAMES)
220
221
222 class _Argument:
223   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
224     self.min = min
225     self.max = max
226
227   def __repr__(self):
228     return ("<%s min=%s max=%s>" %
229             (self.__class__.__name__, self.min, self.max))
230
231
232 class ArgSuggest(_Argument):
233   """Suggesting argument.
234
235   Value can be any of the ones passed to the constructor.
236
237   """
238   # pylint: disable-msg=W0622
239   def __init__(self, min=0, max=None, choices=None):
240     _Argument.__init__(self, min=min, max=max)
241     self.choices = choices
242
243   def __repr__(self):
244     return ("<%s min=%s max=%s choices=%r>" %
245             (self.__class__.__name__, self.min, self.max, self.choices))
246
247
248 class ArgChoice(ArgSuggest):
249   """Choice argument.
250
251   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
252   but value must be one of the choices.
253
254   """
255
256
257 class ArgUnknown(_Argument):
258   """Unknown argument to program (e.g. determined at runtime).
259
260   """
261
262
263 class ArgInstance(_Argument):
264   """Instances argument.
265
266   """
267
268
269 class ArgNode(_Argument):
270   """Node argument.
271
272   """
273
274 class ArgJobId(_Argument):
275   """Job ID argument.
276
277   """
278
279
280 class ArgFile(_Argument):
281   """File path argument.
282
283   """
284
285
286 class ArgCommand(_Argument):
287   """Command argument.
288
289   """
290
291
292 class ArgHost(_Argument):
293   """Host argument.
294
295   """
296
297
298 class ArgOs(_Argument):
299   """OS argument.
300
301   """
302
303
304 ARGS_NONE = []
305 ARGS_MANY_INSTANCES = [ArgInstance()]
306 ARGS_MANY_NODES = [ArgNode()]
307 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
308 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
309 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
310
311
312 def _ExtractTagsObject(opts, args):
313   """Extract the tag type object.
314
315   Note that this function will modify its args parameter.
316
317   """
318   if not hasattr(opts, "tag_type"):
319     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
320   kind = opts.tag_type
321   if kind == constants.TAG_CLUSTER:
322     retval = kind, kind
323   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
324     if not args:
325       raise errors.OpPrereqError("no arguments passed to the command")
326     name = args.pop(0)
327     retval = kind, name
328   else:
329     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
330   return retval
331
332
333 def _ExtendTags(opts, args):
334   """Extend the args if a source file has been given.
335
336   This function will extend the tags with the contents of the file
337   passed in the 'tags_source' attribute of the opts parameter. A file
338   named '-' will be replaced by stdin.
339
340   """
341   fname = opts.tags_source
342   if fname is None:
343     return
344   if fname == "-":
345     new_fh = sys.stdin
346   else:
347     new_fh = open(fname, "r")
348   new_data = []
349   try:
350     # we don't use the nice 'new_data = [line.strip() for line in fh]'
351     # because of python bug 1633941
352     while True:
353       line = new_fh.readline()
354       if not line:
355         break
356       new_data.append(line.strip())
357   finally:
358     new_fh.close()
359   args.extend(new_data)
360
361
362 def ListTags(opts, args):
363   """List the tags on a given object.
364
365   This is a generic implementation that knows how to deal with all
366   three cases of tag objects (cluster, node, instance). The opts
367   argument is expected to contain a tag_type field denoting what
368   object type we work on.
369
370   """
371   kind, name = _ExtractTagsObject(opts, args)
372   cl = GetClient()
373   result = cl.QueryTags(kind, name)
374   result = list(result)
375   result.sort()
376   for tag in result:
377     ToStdout(tag)
378
379
380 def AddTags(opts, args):
381   """Add tags on a given object.
382
383   This is a generic implementation that knows how to deal with all
384   three cases of tag objects (cluster, node, instance). The opts
385   argument is expected to contain a tag_type field denoting what
386   object type we work on.
387
388   """
389   kind, name = _ExtractTagsObject(opts, args)
390   _ExtendTags(opts, args)
391   if not args:
392     raise errors.OpPrereqError("No tags to be added")
393   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
394   SubmitOpCode(op)
395
396
397 def RemoveTags(opts, args):
398   """Remove tags from a given object.
399
400   This is a generic implementation that knows how to deal with all
401   three cases of tag objects (cluster, node, instance). The opts
402   argument is expected to contain a tag_type field denoting what
403   object type we work on.
404
405   """
406   kind, name = _ExtractTagsObject(opts, args)
407   _ExtendTags(opts, args)
408   if not args:
409     raise errors.OpPrereqError("No tags to be removed")
410   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
411   SubmitOpCode(op)
412
413
414 def check_unit(option, opt, value): # pylint: disable-msg=W0613
415   """OptParsers custom converter for units.
416
417   """
418   try:
419     return utils.ParseUnit(value)
420   except errors.UnitParseError, err:
421     raise OptionValueError("option %s: %s" % (opt, err))
422
423
424 def _SplitKeyVal(opt, data):
425   """Convert a KeyVal string into a dict.
426
427   This function will convert a key=val[,...] string into a dict. Empty
428   values will be converted specially: keys which have the prefix 'no_'
429   will have the value=False and the prefix stripped, the others will
430   have value=True.
431
432   @type opt: string
433   @param opt: a string holding the option name for which we process the
434       data, used in building error messages
435   @type data: string
436   @param data: a string of the format key=val,key=val,...
437   @rtype: dict
438   @return: {key=val, key=val}
439   @raises errors.ParameterError: if there are duplicate keys
440
441   """
442   kv_dict = {}
443   if data:
444     for elem in utils.UnescapeAndSplit(data, sep=","):
445       if "=" in elem:
446         key, val = elem.split("=", 1)
447       else:
448         if elem.startswith(NO_PREFIX):
449           key, val = elem[len(NO_PREFIX):], False
450         elif elem.startswith(UN_PREFIX):
451           key, val = elem[len(UN_PREFIX):], None
452         else:
453           key, val = elem, True
454       if key in kv_dict:
455         raise errors.ParameterError("Duplicate key '%s' in option %s" %
456                                     (key, opt))
457       kv_dict[key] = val
458   return kv_dict
459
460
461 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
462   """Custom parser for ident:key=val,key=val options.
463
464   This will store the parsed values as a tuple (ident, {key: val}). As such,
465   multiple uses of this option via action=append is possible.
466
467   """
468   if ":" not in value:
469     ident, rest = value, ''
470   else:
471     ident, rest = value.split(":", 1)
472
473   if ident.startswith(NO_PREFIX):
474     if rest:
475       msg = "Cannot pass options when removing parameter groups: %s" % value
476       raise errors.ParameterError(msg)
477     retval = (ident[len(NO_PREFIX):], False)
478   elif ident.startswith(UN_PREFIX):
479     if rest:
480       msg = "Cannot pass options when removing parameter groups: %s" % value
481       raise errors.ParameterError(msg)
482     retval = (ident[len(UN_PREFIX):], None)
483   else:
484     kv_dict = _SplitKeyVal(opt, rest)
485     retval = (ident, kv_dict)
486   return retval
487
488
489 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
490   """Custom parser class for key=val,key=val options.
491
492   This will store the parsed values as a dict {key: val}.
493
494   """
495   return _SplitKeyVal(opt, value)
496
497
498 def check_bool(option, opt, value): # pylint: disable-msg=W0613
499   """Custom parser for yes/no options.
500
501   This will store the parsed value as either True or False.
502
503   """
504   value = value.lower()
505   if value == constants.VALUE_FALSE or value == "no":
506     return False
507   elif value == constants.VALUE_TRUE or value == "yes":
508     return True
509   else:
510     raise errors.ParameterError("Invalid boolean value '%s'" % value)
511
512
513 # completion_suggestion is normally a list. Using numeric values not evaluating
514 # to False for dynamic completion.
515 (OPT_COMPL_MANY_NODES,
516  OPT_COMPL_ONE_NODE,
517  OPT_COMPL_ONE_INSTANCE,
518  OPT_COMPL_ONE_OS,
519  OPT_COMPL_ONE_IALLOCATOR,
520  OPT_COMPL_INST_ADD_NODES,
521  OPT_COMPL_ONE_NODEGROUP) = range(100, 107)
522
523 OPT_COMPL_ALL = frozenset([
524   OPT_COMPL_MANY_NODES,
525   OPT_COMPL_ONE_NODE,
526   OPT_COMPL_ONE_INSTANCE,
527   OPT_COMPL_ONE_OS,
528   OPT_COMPL_ONE_IALLOCATOR,
529   OPT_COMPL_INST_ADD_NODES,
530   OPT_COMPL_ONE_NODEGROUP,
531   ])
532
533
534 class CliOption(Option):
535   """Custom option class for optparse.
536
537   """
538   ATTRS = Option.ATTRS + [
539     "completion_suggest",
540     ]
541   TYPES = Option.TYPES + (
542     "identkeyval",
543     "keyval",
544     "unit",
545     "bool",
546     )
547   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
548   TYPE_CHECKER["identkeyval"] = check_ident_key_val
549   TYPE_CHECKER["keyval"] = check_key_val
550   TYPE_CHECKER["unit"] = check_unit
551   TYPE_CHECKER["bool"] = check_bool
552
553
554 # optparse.py sets make_option, so we do it for our own option class, too
555 cli_option = CliOption
556
557
558 _YORNO = "yes|no"
559
560 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
561                        help="Increase debugging level")
562
563 NOHDR_OPT = cli_option("--no-headers", default=False,
564                        action="store_true", dest="no_headers",
565                        help="Don't display column headers")
566
567 SEP_OPT = cli_option("--separator", default=None,
568                      action="store", dest="separator",
569                      help=("Separator between output fields"
570                            " (defaults to one space)"))
571
572 USEUNITS_OPT = cli_option("--units", default=None,
573                           dest="units", choices=('h', 'm', 'g', 't'),
574                           help="Specify units for output (one of hmgt)")
575
576 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
577                         type="string", metavar="FIELDS",
578                         help="Comma separated list of output fields")
579
580 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
581                        default=False, help="Force the operation")
582
583 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
584                          default=False, help="Do not require confirmation")
585
586 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
587                          default=None, help="File with tag names")
588
589 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
590                         default=False, action="store_true",
591                         help=("Submit the job and return the job ID, but"
592                               " don't wait for the job to finish"))
593
594 SYNC_OPT = cli_option("--sync", dest="do_locking",
595                       default=False, action="store_true",
596                       help=("Grab locks while doing the queries"
597                             " in order to ensure more consistent results"))
598
599 DRY_RUN_OPT = cli_option("--dry-run", default=False,
600                          action="store_true",
601                          help=("Do not execute the operation, just run the"
602                                " check steps and verify it it could be"
603                                " executed"))
604
605 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
606                          action="store_true",
607                          help="Increase the verbosity of the operation")
608
609 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
610                               action="store_true", dest="simulate_errors",
611                               help="Debugging option that makes the operation"
612                               " treat most runtime checks as failed")
613
614 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
615                         default=True, action="store_false",
616                         help="Don't wait for sync (DANGEROUS!)")
617
618 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
619                                help="Custom disk setup (diskless, file,"
620                                " plain or drbd)",
621                                default=None, metavar="TEMPL",
622                                choices=list(constants.DISK_TEMPLATES))
623
624 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
625                         help="Do not create any network cards for"
626                         " the instance")
627
628 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
629                                help="Relative path under default cluster-wide"
630                                " file storage dir to store file-based disks",
631                                default=None, metavar="<DIR>")
632
633 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
634                                   help="Driver to use for image files",
635                                   default="loop", metavar="<DRIVER>",
636                                   choices=list(constants.FILE_DRIVER))
637
638 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
639                             help="Select nodes for the instance automatically"
640                             " using the <NAME> iallocator plugin",
641                             default=None, type="string",
642                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
643
644 DEFAULT_IALLOCATOR_OPT = cli_option("-I", "--default-iallocator",
645                             metavar="<NAME>",
646                             help="Set the default instance allocator plugin",
647                             default=None, type="string",
648                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
649
650 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
651                     metavar="<os>",
652                     completion_suggest=OPT_COMPL_ONE_OS)
653
654 OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
655                          type="keyval", default={},
656                          help="OS parameters")
657
658 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
659                                action="store_true", default=False,
660                                help="Force an unknown variant")
661
662 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
663                             action="store_true", default=False,
664                             help="Do not install the OS (will"
665                             " enable no-start)")
666
667 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
668                          type="keyval", default={},
669                          help="Backend parameters")
670
671 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
672                          default={}, dest="hvparams",
673                          help="Hypervisor parameters")
674
675 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
676                             help="Hypervisor and hypervisor options, in the"
677                             " format hypervisor:option=value,option=value,...",
678                             default=None, type="identkeyval")
679
680 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
681                         help="Hypervisor and hypervisor options, in the"
682                         " format hypervisor:option=value,option=value,...",
683                         default=[], action="append", type="identkeyval")
684
685 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
686                            action="store_false",
687                            help="Don't check that the instance's IP"
688                            " is alive")
689
690 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
691                              default=True, action="store_false",
692                              help="Don't check that the instance's name"
693                              " is resolvable")
694
695 NET_OPT = cli_option("--net",
696                      help="NIC parameters", default=[],
697                      dest="nics", action="append", type="identkeyval")
698
699 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
700                       dest="disks", action="append", type="identkeyval")
701
702 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
703                          help="Comma-separated list of disks"
704                          " indices to act on (e.g. 0,2) (optional,"
705                          " defaults to all disks)")
706
707 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
708                          help="Enforces a single-disk configuration using the"
709                          " given disk size, in MiB unless a suffix is used",
710                          default=None, type="unit", metavar="<size>")
711
712 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
713                                 dest="ignore_consistency",
714                                 action="store_true", default=False,
715                                 help="Ignore the consistency of the disks on"
716                                 " the secondary")
717
718 NONLIVE_OPT = cli_option("--non-live", dest="live",
719                          default=True, action="store_false",
720                          help="Do a non-live migration (this usually means"
721                          " freeze the instance, save the state, transfer and"
722                          " only then resume running on the secondary node)")
723
724 MIGRATION_MODE_OPT = cli_option("--migration-mode", dest="migration_mode",
725                                 default=None,
726                                 choices=list(constants.HT_MIGRATION_MODES),
727                                 help="Override default migration mode (choose"
728                                 " either live or non-live")
729
730 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
731                                 help="Target node and optional secondary node",
732                                 metavar="<pnode>[:<snode>]",
733                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
734
735 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
736                            action="append", metavar="<node>",
737                            help="Use only this node (can be used multiple"
738                            " times, if not given defaults to all nodes)",
739                            completion_suggest=OPT_COMPL_ONE_NODE)
740
741 NODEGROUP_OPT = cli_option("-g", "--nodegroup",
742                            dest="nodegroup",
743                            help="Node group (name or uuid)",
744                            metavar="<nodegroup>",
745                            default=None, type="string",
746                            completion_suggest=OPT_COMPL_ONE_NODEGROUP)
747
748 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
749                              metavar="<node>",
750                              completion_suggest=OPT_COMPL_ONE_NODE)
751
752 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
753                          action="store_false",
754                          help="Don't start the instance after creation")
755
756 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
757                          action="store_true", default=False,
758                          help="Show command instead of executing it")
759
760 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
761                          default=False, action="store_true",
762                          help="Instead of performing the migration, try to"
763                          " recover from a failed cleanup. This is safe"
764                          " to run even if the instance is healthy, but it"
765                          " will create extra replication traffic and "
766                          " disrupt briefly the replication (like during the"
767                          " migration")
768
769 STATIC_OPT = cli_option("-s", "--static", dest="static",
770                         action="store_true", default=False,
771                         help="Only show configuration data, not runtime data")
772
773 ALL_OPT = cli_option("--all", dest="show_all",
774                      default=False, action="store_true",
775                      help="Show info on all instances on the cluster."
776                      " This can take a long time to run, use wisely")
777
778 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
779                            action="store_true", default=False,
780                            help="Interactive OS reinstall, lists available"
781                            " OS templates for selection")
782
783 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
784                                  action="store_true", default=False,
785                                  help="Remove the instance from the cluster"
786                                  " configuration even if there are failures"
787                                  " during the removal process")
788
789 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
790                                         dest="ignore_remove_failures",
791                                         action="store_true", default=False,
792                                         help="Remove the instance from the"
793                                         " cluster configuration even if there"
794                                         " are failures during the removal"
795                                         " process")
796
797 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
798                                  action="store_true", default=False,
799                                  help="Remove the instance from the cluster")
800
801 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
802                                help="Specifies the new secondary node",
803                                metavar="NODE", default=None,
804                                completion_suggest=OPT_COMPL_ONE_NODE)
805
806 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
807                             default=False, action="store_true",
808                             help="Replace the disk(s) on the primary"
809                             " node (only for the drbd template)")
810
811 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
812                               default=False, action="store_true",
813                               help="Replace the disk(s) on the secondary"
814                               " node (only for the drbd template)")
815
816 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
817                               default=False, action="store_true",
818                               help="Lock all nodes and auto-promote as needed"
819                               " to MC status")
820
821 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
822                               default=False, action="store_true",
823                               help="Automatically replace faulty disks"
824                               " (only for the drbd template)")
825
826 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
827                              default=False, action="store_true",
828                              help="Ignore current recorded size"
829                              " (useful for forcing activation when"
830                              " the recorded size is wrong)")
831
832 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
833                           metavar="<node>",
834                           completion_suggest=OPT_COMPL_ONE_NODE)
835
836 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
837                          metavar="<dir>")
838
839 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
840                               help="Specify the secondary ip for the node",
841                               metavar="ADDRESS", default=None)
842
843 READD_OPT = cli_option("--readd", dest="readd",
844                        default=False, action="store_true",
845                        help="Readd old node after replacing it")
846
847 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
848                                 default=True, action="store_false",
849                                 help="Disable SSH key fingerprint checking")
850
851
852 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
853                     type="bool", default=None, metavar=_YORNO,
854                     help="Set the master_candidate flag on the node")
855
856 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
857                          type="bool", default=None,
858                          help="Set the offline flag on the node")
859
860 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
861                          type="bool", default=None,
862                          help="Set the drained flag on the node")
863
864 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
865                              type="bool", default=None, metavar=_YORNO,
866                              help="Set the allocatable flag on a volume")
867
868 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
869                                help="Disable support for lvm based instances"
870                                " (cluster-wide)",
871                                action="store_false", default=True)
872
873 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
874                             dest="enabled_hypervisors",
875                             help="Comma-separated list of hypervisors",
876                             type="string", default=None)
877
878 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
879                             type="keyval", default={},
880                             help="NIC parameters")
881
882 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
883                          dest="candidate_pool_size", type="int",
884                          help="Set the candidate pool size")
885
886 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
887                          help="Enables LVM and specifies the volume group"
888                          " name (cluster-wide) for disk allocation [xenvg]",
889                          metavar="VG", default=None)
890
891 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
892                           help="Destroy cluster", action="store_true")
893
894 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
895                           help="Skip node agreement check (dangerous)",
896                           action="store_true", default=False)
897
898 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
899                             help="Specify the mac prefix for the instance IP"
900                             " addresses, in the format XX:XX:XX",
901                             metavar="PREFIX",
902                             default=None)
903
904 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
905                                help="Specify the node interface (cluster-wide)"
906                                " on which the master IP address will be added "
907                                " [%s]" % constants.DEFAULT_BRIDGE,
908                                metavar="NETDEV",
909                                default=constants.DEFAULT_BRIDGE)
910
911 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
912                                 help="Specify the default directory (cluster-"
913                                 "wide) for storing the file-based disks [%s]" %
914                                 constants.DEFAULT_FILE_STORAGE_DIR,
915                                 metavar="DIR",
916                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
917
918 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
919                                    help="Don't modify /etc/hosts",
920                                    action="store_false", default=True)
921
922 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
923                                     help="Don't initialize SSH keys",
924                                     action="store_false", default=True)
925
926 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
927                              help="Enable parseable error messages",
928                              action="store_true", default=False)
929
930 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
931                           help="Skip N+1 memory redundancy tests",
932                           action="store_true", default=False)
933
934 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
935                              help="Type of reboot: soft/hard/full",
936                              default=constants.INSTANCE_REBOOT_HARD,
937                              metavar="<REBOOT>",
938                              choices=list(constants.REBOOT_TYPES))
939
940 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
941                                     dest="ignore_secondaries",
942                                     default=False, action="store_true",
943                                     help="Ignore errors from secondaries")
944
945 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
946                             action="store_false", default=True,
947                             help="Don't shutdown the instance (unsafe)")
948
949 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
950                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
951                          help="Maximum time to wait")
952
953 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
954                          dest="shutdown_timeout", type="int",
955                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
956                          help="Maximum time to wait for instance shutdown")
957
958 INTERVAL_OPT = cli_option("--interval", dest="interval", type="int",
959                           default=None,
960                           help=("Number of seconds between repetions of the"
961                                 " command"))
962
963 EARLY_RELEASE_OPT = cli_option("--early-release",
964                                dest="early_release", default=False,
965                                action="store_true",
966                                help="Release the locks on the secondary"
967                                " node(s) early")
968
969 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
970                                   dest="new_cluster_cert",
971                                   default=False, action="store_true",
972                                   help="Generate a new cluster certificate")
973
974 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
975                            default=None,
976                            help="File containing new RAPI certificate")
977
978 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
979                                default=None, action="store_true",
980                                help=("Generate a new self-signed RAPI"
981                                      " certificate"))
982
983 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
984                                     dest="new_confd_hmac_key",
985                                     default=False, action="store_true",
986                                     help=("Create a new HMAC key for %s" %
987                                           constants.CONFD))
988
989 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
990                                        dest="cluster_domain_secret",
991                                        default=None,
992                                        help=("Load new new cluster domain"
993                                              " secret from file"))
994
995 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
996                                            dest="new_cluster_domain_secret",
997                                            default=False, action="store_true",
998                                            help=("Create a new cluster domain"
999                                                  " secret"))
1000
1001 USE_REPL_NET_OPT = cli_option("--use-replication-network",
1002                               dest="use_replication_network",
1003                               help="Whether to use the replication network"
1004                               " for talking to the nodes",
1005                               action="store_true", default=False)
1006
1007 MAINTAIN_NODE_HEALTH_OPT = \
1008     cli_option("--maintain-node-health", dest="maintain_node_health",
1009                metavar=_YORNO, default=None, type="bool",
1010                help="Configure the cluster to automatically maintain node"
1011                " health, by shutting down unknown instances, shutting down"
1012                " unknown DRBD devices, etc.")
1013
1014 IDENTIFY_DEFAULTS_OPT = \
1015     cli_option("--identify-defaults", dest="identify_defaults",
1016                default=False, action="store_true",
1017                help="Identify which saved instance parameters are equal to"
1018                " the current cluster defaults and set them as such, instead"
1019                " of marking them as overridden")
1020
1021 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
1022                          action="store", dest="uid_pool",
1023                          help=("A list of user-ids or user-id"
1024                                " ranges separated by commas"))
1025
1026 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
1027                           action="store", dest="add_uids",
1028                           help=("A list of user-ids or user-id"
1029                                 " ranges separated by commas, to be"
1030                                 " added to the user-id pool"))
1031
1032 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
1033                              action="store", dest="remove_uids",
1034                              help=("A list of user-ids or user-id"
1035                                    " ranges separated by commas, to be"
1036                                    " removed from the user-id pool"))
1037
1038 RESERVED_LVS_OPT = cli_option("--reserved-lvs", default=None,
1039                              action="store", dest="reserved_lvs",
1040                              help=("A comma-separated list of reserved"
1041                                    " logical volumes names, that will be"
1042                                    " ignored by cluster verify"))
1043
1044 ROMAN_OPT = cli_option("--roman",
1045                        dest="roman_integers", default=False,
1046                        action="store_true",
1047                        help="Use roman numbers for positive integers")
1048
1049 DRBD_HELPER_OPT = cli_option("--drbd-usermode-helper", dest="drbd_helper",
1050                              action="store", default=None,
1051                              help="Specifies usermode helper for DRBD")
1052
1053 NODRBD_STORAGE_OPT = cli_option("--no-drbd-storage", dest="drbd_storage",
1054                                 action="store_false", default=True,
1055                                 help="Disable support for DRBD")
1056
1057 PRIMARY_IP_VERSION_OPT = \
1058     cli_option("--primary-ip-version", default=constants.IP4_VERSION,
1059                action="store", dest="primary_ip_version",
1060                metavar="%d|%d" % (constants.IP4_VERSION,
1061                                   constants.IP6_VERSION),
1062                help="Cluster-wide IP version for primary IP")
1063
1064 PRIORITY_OPT = cli_option("--priority", default=None, dest="priority",
1065                           metavar="|".join(name for name, _ in _PRIORITY_NAMES),
1066                           choices=_PRIONAME_TO_VALUE.keys(),
1067                           help="Priority for opcode(s) processing")
1068
1069 #: Options provided by all commands
1070 COMMON_OPTS = [DEBUG_OPT]
1071
1072
1073 def _ParseArgs(argv, commands, aliases):
1074   """Parser for the command line arguments.
1075
1076   This function parses the arguments and returns the function which
1077   must be executed together with its (modified) arguments.
1078
1079   @param argv: the command line
1080   @param commands: dictionary with special contents, see the design
1081       doc for cmdline handling
1082   @param aliases: dictionary with command aliases {'alias': 'target, ...}
1083
1084   """
1085   if len(argv) == 0:
1086     binary = "<command>"
1087   else:
1088     binary = argv[0].split("/")[-1]
1089
1090   if len(argv) > 1 and argv[1] == "--version":
1091     ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1092              constants.RELEASE_VERSION)
1093     # Quit right away. That way we don't have to care about this special
1094     # argument. optparse.py does it the same.
1095     sys.exit(0)
1096
1097   if len(argv) < 2 or not (argv[1] in commands or
1098                            argv[1] in aliases):
1099     # let's do a nice thing
1100     sortedcmds = commands.keys()
1101     sortedcmds.sort()
1102
1103     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1104     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1105     ToStdout("")
1106
1107     # compute the max line length for cmd + usage
1108     mlen = max([len(" %s" % cmd) for cmd in commands])
1109     mlen = min(60, mlen) # should not get here...
1110
1111     # and format a nice command list
1112     ToStdout("Commands:")
1113     for cmd in sortedcmds:
1114       cmdstr = " %s" % (cmd,)
1115       help_text = commands[cmd][4]
1116       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1117       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1118       for line in help_lines:
1119         ToStdout("%-*s   %s", mlen, "", line)
1120
1121     ToStdout("")
1122
1123     return None, None, None
1124
1125   # get command, unalias it, and look it up in commands
1126   cmd = argv.pop(1)
1127   if cmd in aliases:
1128     if cmd in commands:
1129       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1130                                    " command" % cmd)
1131
1132     if aliases[cmd] not in commands:
1133       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1134                                    " command '%s'" % (cmd, aliases[cmd]))
1135
1136     cmd = aliases[cmd]
1137
1138   func, args_def, parser_opts, usage, description = commands[cmd]
1139   parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
1140                         description=description,
1141                         formatter=TitledHelpFormatter(),
1142                         usage="%%prog %s %s" % (cmd, usage))
1143   parser.disable_interspersed_args()
1144   options, args = parser.parse_args()
1145
1146   if not _CheckArguments(cmd, args_def, args):
1147     return None, None, None
1148
1149   return func, options, args
1150
1151
1152 def _CheckArguments(cmd, args_def, args):
1153   """Verifies the arguments using the argument definition.
1154
1155   Algorithm:
1156
1157     1. Abort with error if values specified by user but none expected.
1158
1159     1. For each argument in definition
1160
1161       1. Keep running count of minimum number of values (min_count)
1162       1. Keep running count of maximum number of values (max_count)
1163       1. If it has an unlimited number of values
1164
1165         1. Abort with error if it's not the last argument in the definition
1166
1167     1. If last argument has limited number of values
1168
1169       1. Abort with error if number of values doesn't match or is too large
1170
1171     1. Abort with error if user didn't pass enough values (min_count)
1172
1173   """
1174   if args and not args_def:
1175     ToStderr("Error: Command %s expects no arguments", cmd)
1176     return False
1177
1178   min_count = None
1179   max_count = None
1180   check_max = None
1181
1182   last_idx = len(args_def) - 1
1183
1184   for idx, arg in enumerate(args_def):
1185     if min_count is None:
1186       min_count = arg.min
1187     elif arg.min is not None:
1188       min_count += arg.min
1189
1190     if max_count is None:
1191       max_count = arg.max
1192     elif arg.max is not None:
1193       max_count += arg.max
1194
1195     if idx == last_idx:
1196       check_max = (arg.max is not None)
1197
1198     elif arg.max is None:
1199       raise errors.ProgrammerError("Only the last argument can have max=None")
1200
1201   if check_max:
1202     # Command with exact number of arguments
1203     if (min_count is not None and max_count is not None and
1204         min_count == max_count and len(args) != min_count):
1205       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1206       return False
1207
1208     # Command with limited number of arguments
1209     if max_count is not None and len(args) > max_count:
1210       ToStderr("Error: Command %s expects only %d argument(s)",
1211                cmd, max_count)
1212       return False
1213
1214   # Command with some required arguments
1215   if min_count is not None and len(args) < min_count:
1216     ToStderr("Error: Command %s expects at least %d argument(s)",
1217              cmd, min_count)
1218     return False
1219
1220   return True
1221
1222
1223 def SplitNodeOption(value):
1224   """Splits the value of a --node option.
1225
1226   """
1227   if value and ':' in value:
1228     return value.split(':', 1)
1229   else:
1230     return (value, None)
1231
1232
1233 def CalculateOSNames(os_name, os_variants):
1234   """Calculates all the names an OS can be called, according to its variants.
1235
1236   @type os_name: string
1237   @param os_name: base name of the os
1238   @type os_variants: list or None
1239   @param os_variants: list of supported variants
1240   @rtype: list
1241   @return: list of valid names
1242
1243   """
1244   if os_variants:
1245     return ['%s+%s' % (os_name, v) for v in os_variants]
1246   else:
1247     return [os_name]
1248
1249
1250 def ParseFields(selected, default):
1251   """Parses the values of "--field"-like options.
1252
1253   @type selected: string or None
1254   @param selected: User-selected options
1255   @type default: list
1256   @param default: Default fields
1257
1258   """
1259   if selected is None:
1260     return default
1261
1262   if selected.startswith("+"):
1263     return default + selected[1:].split(",")
1264
1265   return selected.split(",")
1266
1267
1268 UsesRPC = rpc.RunWithRPC
1269
1270
1271 def AskUser(text, choices=None):
1272   """Ask the user a question.
1273
1274   @param text: the question to ask
1275
1276   @param choices: list with elements tuples (input_char, return_value,
1277       description); if not given, it will default to: [('y', True,
1278       'Perform the operation'), ('n', False, 'Do no do the operation')];
1279       note that the '?' char is reserved for help
1280
1281   @return: one of the return values from the choices list; if input is
1282       not possible (i.e. not running with a tty, we return the last
1283       entry from the list
1284
1285   """
1286   if choices is None:
1287     choices = [('y', True, 'Perform the operation'),
1288                ('n', False, 'Do not perform the operation')]
1289   if not choices or not isinstance(choices, list):
1290     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1291   for entry in choices:
1292     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1293       raise errors.ProgrammerError("Invalid choices element to AskUser")
1294
1295   answer = choices[-1][1]
1296   new_text = []
1297   for line in text.splitlines():
1298     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1299   text = "\n".join(new_text)
1300   try:
1301     f = file("/dev/tty", "a+")
1302   except IOError:
1303     return answer
1304   try:
1305     chars = [entry[0] for entry in choices]
1306     chars[-1] = "[%s]" % chars[-1]
1307     chars.append('?')
1308     maps = dict([(entry[0], entry[1]) for entry in choices])
1309     while True:
1310       f.write(text)
1311       f.write('\n')
1312       f.write("/".join(chars))
1313       f.write(": ")
1314       line = f.readline(2).strip().lower()
1315       if line in maps:
1316         answer = maps[line]
1317         break
1318       elif line == '?':
1319         for entry in choices:
1320           f.write(" %s - %s\n" % (entry[0], entry[2]))
1321         f.write("\n")
1322         continue
1323   finally:
1324     f.close()
1325   return answer
1326
1327
1328 class JobSubmittedException(Exception):
1329   """Job was submitted, client should exit.
1330
1331   This exception has one argument, the ID of the job that was
1332   submitted. The handler should print this ID.
1333
1334   This is not an error, just a structured way to exit from clients.
1335
1336   """
1337
1338
1339 def SendJob(ops, cl=None):
1340   """Function to submit an opcode without waiting for the results.
1341
1342   @type ops: list
1343   @param ops: list of opcodes
1344   @type cl: luxi.Client
1345   @param cl: the luxi client to use for communicating with the master;
1346              if None, a new client will be created
1347
1348   """
1349   if cl is None:
1350     cl = GetClient()
1351
1352   job_id = cl.SubmitJob(ops)
1353
1354   return job_id
1355
1356
1357 def GenericPollJob(job_id, cbs, report_cbs):
1358   """Generic job-polling function.
1359
1360   @type job_id: number
1361   @param job_id: Job ID
1362   @type cbs: Instance of L{JobPollCbBase}
1363   @param cbs: Data callbacks
1364   @type report_cbs: Instance of L{JobPollReportCbBase}
1365   @param report_cbs: Reporting callbacks
1366
1367   """
1368   prev_job_info = None
1369   prev_logmsg_serial = None
1370
1371   status = None
1372
1373   while True:
1374     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1375                                       prev_logmsg_serial)
1376     if not result:
1377       # job not found, go away!
1378       raise errors.JobLost("Job with id %s lost" % job_id)
1379
1380     if result == constants.JOB_NOTCHANGED:
1381       report_cbs.ReportNotChanged(job_id, status)
1382
1383       # Wait again
1384       continue
1385
1386     # Split result, a tuple of (field values, log entries)
1387     (job_info, log_entries) = result
1388     (status, ) = job_info
1389
1390     if log_entries:
1391       for log_entry in log_entries:
1392         (serial, timestamp, log_type, message) = log_entry
1393         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1394                                     log_type, message)
1395         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1396
1397     # TODO: Handle canceled and archived jobs
1398     elif status in (constants.JOB_STATUS_SUCCESS,
1399                     constants.JOB_STATUS_ERROR,
1400                     constants.JOB_STATUS_CANCELING,
1401                     constants.JOB_STATUS_CANCELED):
1402       break
1403
1404     prev_job_info = job_info
1405
1406   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1407   if not jobs:
1408     raise errors.JobLost("Job with id %s lost" % job_id)
1409
1410   status, opstatus, result = jobs[0]
1411
1412   if status == constants.JOB_STATUS_SUCCESS:
1413     return result
1414
1415   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1416     raise errors.OpExecError("Job was canceled")
1417
1418   has_ok = False
1419   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1420     if status == constants.OP_STATUS_SUCCESS:
1421       has_ok = True
1422     elif status == constants.OP_STATUS_ERROR:
1423       errors.MaybeRaise(msg)
1424
1425       if has_ok:
1426         raise errors.OpExecError("partial failure (opcode %d): %s" %
1427                                  (idx, msg))
1428
1429       raise errors.OpExecError(str(msg))
1430
1431   # default failure mode
1432   raise errors.OpExecError(result)
1433
1434
1435 class JobPollCbBase:
1436   """Base class for L{GenericPollJob} callbacks.
1437
1438   """
1439   def __init__(self):
1440     """Initializes this class.
1441
1442     """
1443
1444   def WaitForJobChangeOnce(self, job_id, fields,
1445                            prev_job_info, prev_log_serial):
1446     """Waits for changes on a job.
1447
1448     """
1449     raise NotImplementedError()
1450
1451   def QueryJobs(self, job_ids, fields):
1452     """Returns the selected fields for the selected job IDs.
1453
1454     @type job_ids: list of numbers
1455     @param job_ids: Job IDs
1456     @type fields: list of strings
1457     @param fields: Fields
1458
1459     """
1460     raise NotImplementedError()
1461
1462
1463 class JobPollReportCbBase:
1464   """Base class for L{GenericPollJob} reporting callbacks.
1465
1466   """
1467   def __init__(self):
1468     """Initializes this class.
1469
1470     """
1471
1472   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1473     """Handles a log message.
1474
1475     """
1476     raise NotImplementedError()
1477
1478   def ReportNotChanged(self, job_id, status):
1479     """Called for if a job hasn't changed in a while.
1480
1481     @type job_id: number
1482     @param job_id: Job ID
1483     @type status: string or None
1484     @param status: Job status if available
1485
1486     """
1487     raise NotImplementedError()
1488
1489
1490 class _LuxiJobPollCb(JobPollCbBase):
1491   def __init__(self, cl):
1492     """Initializes this class.
1493
1494     """
1495     JobPollCbBase.__init__(self)
1496     self.cl = cl
1497
1498   def WaitForJobChangeOnce(self, job_id, fields,
1499                            prev_job_info, prev_log_serial):
1500     """Waits for changes on a job.
1501
1502     """
1503     return self.cl.WaitForJobChangeOnce(job_id, fields,
1504                                         prev_job_info, prev_log_serial)
1505
1506   def QueryJobs(self, job_ids, fields):
1507     """Returns the selected fields for the selected job IDs.
1508
1509     """
1510     return self.cl.QueryJobs(job_ids, fields)
1511
1512
1513 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1514   def __init__(self, feedback_fn):
1515     """Initializes this class.
1516
1517     """
1518     JobPollReportCbBase.__init__(self)
1519
1520     self.feedback_fn = feedback_fn
1521
1522     assert callable(feedback_fn)
1523
1524   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1525     """Handles a log message.
1526
1527     """
1528     self.feedback_fn((timestamp, log_type, log_msg))
1529
1530   def ReportNotChanged(self, job_id, status):
1531     """Called if a job hasn't changed in a while.
1532
1533     """
1534     # Ignore
1535
1536
1537 class StdioJobPollReportCb(JobPollReportCbBase):
1538   def __init__(self):
1539     """Initializes this class.
1540
1541     """
1542     JobPollReportCbBase.__init__(self)
1543
1544     self.notified_queued = False
1545     self.notified_waitlock = False
1546
1547   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1548     """Handles a log message.
1549
1550     """
1551     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1552              FormatLogMessage(log_type, log_msg))
1553
1554   def ReportNotChanged(self, job_id, status):
1555     """Called if a job hasn't changed in a while.
1556
1557     """
1558     if status is None:
1559       return
1560
1561     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1562       ToStderr("Job %s is waiting in queue", job_id)
1563       self.notified_queued = True
1564
1565     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1566       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1567       self.notified_waitlock = True
1568
1569
1570 def FormatLogMessage(log_type, log_msg):
1571   """Formats a job message according to its type.
1572
1573   """
1574   if log_type != constants.ELOG_MESSAGE:
1575     log_msg = str(log_msg)
1576
1577   return utils.SafeEncode(log_msg)
1578
1579
1580 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
1581   """Function to poll for the result of a job.
1582
1583   @type job_id: job identified
1584   @param job_id: the job to poll for results
1585   @type cl: luxi.Client
1586   @param cl: the luxi client to use for communicating with the master;
1587              if None, a new client will be created
1588
1589   """
1590   if cl is None:
1591     cl = GetClient()
1592
1593   if reporter is None:
1594     if feedback_fn:
1595       reporter = FeedbackFnJobPollReportCb(feedback_fn)
1596     else:
1597       reporter = StdioJobPollReportCb()
1598   elif feedback_fn:
1599     raise errors.ProgrammerError("Can't specify reporter and feedback function")
1600
1601   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1602
1603
1604 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
1605   """Legacy function to submit an opcode.
1606
1607   This is just a simple wrapper over the construction of the processor
1608   instance. It should be extended to better handle feedback and
1609   interaction functions.
1610
1611   """
1612   if cl is None:
1613     cl = GetClient()
1614
1615   SetGenericOpcodeOpts([op], opts)
1616
1617   job_id = SendJob([op], cl=cl)
1618
1619   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
1620                        reporter=reporter)
1621
1622   return op_results[0]
1623
1624
1625 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1626   """Wrapper around SubmitOpCode or SendJob.
1627
1628   This function will decide, based on the 'opts' parameter, whether to
1629   submit and wait for the result of the opcode (and return it), or
1630   whether to just send the job and print its identifier. It is used in
1631   order to simplify the implementation of the '--submit' option.
1632
1633   It will also process the opcodes if we're sending the via SendJob
1634   (otherwise SubmitOpCode does it).
1635
1636   """
1637   if opts and opts.submit_only:
1638     job = [op]
1639     SetGenericOpcodeOpts(job, opts)
1640     job_id = SendJob(job, cl=cl)
1641     raise JobSubmittedException(job_id)
1642   else:
1643     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1644
1645
1646 def SetGenericOpcodeOpts(opcode_list, options):
1647   """Processor for generic options.
1648
1649   This function updates the given opcodes based on generic command
1650   line options (like debug, dry-run, etc.).
1651
1652   @param opcode_list: list of opcodes
1653   @param options: command line options or None
1654   @return: None (in-place modification)
1655
1656   """
1657   if not options:
1658     return
1659   for op in opcode_list:
1660     if hasattr(options, "dry_run"):
1661       op.dry_run = options.dry_run
1662     op.debug_level = options.debug
1663
1664
1665 def GetClient():
1666   # TODO: Cache object?
1667   try:
1668     client = luxi.Client()
1669   except luxi.NoMasterError:
1670     ss = ssconf.SimpleStore()
1671
1672     # Try to read ssconf file
1673     try:
1674       ss.GetMasterNode()
1675     except errors.ConfigurationError:
1676       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1677                                  " not part of a cluster")
1678
1679     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1680     if master != myself:
1681       raise errors.OpPrereqError("This is not the master node, please connect"
1682                                  " to node '%s' and rerun the command" %
1683                                  master)
1684     raise
1685   return client
1686
1687
1688 def FormatError(err):
1689   """Return a formatted error message for a given error.
1690
1691   This function takes an exception instance and returns a tuple
1692   consisting of two values: first, the recommended exit code, and
1693   second, a string describing the error message (not
1694   newline-terminated).
1695
1696   """
1697   retcode = 1
1698   obuf = StringIO()
1699   msg = str(err)
1700   if isinstance(err, errors.ConfigurationError):
1701     txt = "Corrupt configuration file: %s" % msg
1702     logging.error(txt)
1703     obuf.write(txt + "\n")
1704     obuf.write("Aborting.")
1705     retcode = 2
1706   elif isinstance(err, errors.HooksAbort):
1707     obuf.write("Failure: hooks execution failed:\n")
1708     for node, script, out in err.args[0]:
1709       if out:
1710         obuf.write("  node: %s, script: %s, output: %s\n" %
1711                    (node, script, out))
1712       else:
1713         obuf.write("  node: %s, script: %s (no output)\n" %
1714                    (node, script))
1715   elif isinstance(err, errors.HooksFailure):
1716     obuf.write("Failure: hooks general failure: %s" % msg)
1717   elif isinstance(err, errors.ResolverError):
1718     this_host = netutils.Hostname.GetSysName()
1719     if err.args[0] == this_host:
1720       msg = "Failure: can't resolve my own hostname ('%s')"
1721     else:
1722       msg = "Failure: can't resolve hostname '%s'"
1723     obuf.write(msg % err.args[0])
1724   elif isinstance(err, errors.OpPrereqError):
1725     if len(err.args) == 2:
1726       obuf.write("Failure: prerequisites not met for this"
1727                " operation:\nerror type: %s, error details:\n%s" %
1728                  (err.args[1], err.args[0]))
1729     else:
1730       obuf.write("Failure: prerequisites not met for this"
1731                  " operation:\n%s" % msg)
1732   elif isinstance(err, errors.OpExecError):
1733     obuf.write("Failure: command execution error:\n%s" % msg)
1734   elif isinstance(err, errors.TagError):
1735     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1736   elif isinstance(err, errors.JobQueueDrainError):
1737     obuf.write("Failure: the job queue is marked for drain and doesn't"
1738                " accept new requests\n")
1739   elif isinstance(err, errors.JobQueueFull):
1740     obuf.write("Failure: the job queue is full and doesn't accept new"
1741                " job submissions until old jobs are archived\n")
1742   elif isinstance(err, errors.TypeEnforcementError):
1743     obuf.write("Parameter Error: %s" % msg)
1744   elif isinstance(err, errors.ParameterError):
1745     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1746   elif isinstance(err, luxi.NoMasterError):
1747     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1748                " and listening for connections?")
1749   elif isinstance(err, luxi.TimeoutError):
1750     obuf.write("Timeout while talking to the master daemon. Error:\n"
1751                "%s" % msg)
1752   elif isinstance(err, luxi.PermissionError):
1753     obuf.write("It seems you don't have permissions to connect to the"
1754                " master daemon.\nPlease retry as a different user.")
1755   elif isinstance(err, luxi.ProtocolError):
1756     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1757                "%s" % msg)
1758   elif isinstance(err, errors.JobLost):
1759     obuf.write("Error checking job status: %s" % msg)
1760   elif isinstance(err, errors.GenericError):
1761     obuf.write("Unhandled Ganeti error: %s" % msg)
1762   elif isinstance(err, JobSubmittedException):
1763     obuf.write("JobID: %s\n" % err.args[0])
1764     retcode = 0
1765   else:
1766     obuf.write("Unhandled exception: %s" % msg)
1767   return retcode, obuf.getvalue().rstrip('\n')
1768
1769
1770 def GenericMain(commands, override=None, aliases=None):
1771   """Generic main function for all the gnt-* commands.
1772
1773   Arguments:
1774     - commands: a dictionary with a special structure, see the design doc
1775                 for command line handling.
1776     - override: if not None, we expect a dictionary with keys that will
1777                 override command line options; this can be used to pass
1778                 options from the scripts to generic functions
1779     - aliases: dictionary with command aliases {'alias': 'target, ...}
1780
1781   """
1782   # save the program name and the entire command line for later logging
1783   if sys.argv:
1784     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1785     if len(sys.argv) >= 2:
1786       binary += " " + sys.argv[1]
1787       old_cmdline = " ".join(sys.argv[2:])
1788     else:
1789       old_cmdline = ""
1790   else:
1791     binary = "<unknown program>"
1792     old_cmdline = ""
1793
1794   if aliases is None:
1795     aliases = {}
1796
1797   try:
1798     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1799   except errors.ParameterError, err:
1800     result, err_msg = FormatError(err)
1801     ToStderr(err_msg)
1802     return 1
1803
1804   if func is None: # parse error
1805     return 1
1806
1807   if override is not None:
1808     for key, val in override.iteritems():
1809       setattr(options, key, val)
1810
1811   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1812                      stderr_logging=True, program=binary)
1813
1814   if old_cmdline:
1815     logging.info("run with arguments '%s'", old_cmdline)
1816   else:
1817     logging.info("run with no arguments")
1818
1819   try:
1820     result = func(options, args)
1821   except (errors.GenericError, luxi.ProtocolError,
1822           JobSubmittedException), err:
1823     result, err_msg = FormatError(err)
1824     logging.exception("Error during command processing")
1825     ToStderr(err_msg)
1826
1827   return result
1828
1829
1830 def GenericInstanceCreate(mode, opts, args):
1831   """Add an instance to the cluster via either creation or import.
1832
1833   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1834   @param opts: the command line options selected by the user
1835   @type args: list
1836   @param args: should contain only one element, the new instance name
1837   @rtype: int
1838   @return: the desired exit code
1839
1840   """
1841   instance = args[0]
1842
1843   (pnode, snode) = SplitNodeOption(opts.node)
1844
1845   hypervisor = None
1846   hvparams = {}
1847   if opts.hypervisor:
1848     hypervisor, hvparams = opts.hypervisor
1849
1850   if opts.nics:
1851     try:
1852       nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1853     except ValueError, err:
1854       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1855     nics = [{}] * nic_max
1856     for nidx, ndict in opts.nics:
1857       nidx = int(nidx)
1858       if not isinstance(ndict, dict):
1859         msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1860         raise errors.OpPrereqError(msg)
1861       nics[nidx] = ndict
1862   elif opts.no_nics:
1863     # no nics
1864     nics = []
1865   elif mode == constants.INSTANCE_CREATE:
1866     # default of one nic, all auto
1867     nics = [{}]
1868   else:
1869     # mode == import
1870     nics = []
1871
1872   if opts.disk_template == constants.DT_DISKLESS:
1873     if opts.disks or opts.sd_size is not None:
1874       raise errors.OpPrereqError("Diskless instance but disk"
1875                                  " information passed")
1876     disks = []
1877   else:
1878     if (not opts.disks and not opts.sd_size
1879         and mode == constants.INSTANCE_CREATE):
1880       raise errors.OpPrereqError("No disk information specified")
1881     if opts.disks and opts.sd_size is not None:
1882       raise errors.OpPrereqError("Please use either the '--disk' or"
1883                                  " '-s' option")
1884     if opts.sd_size is not None:
1885       opts.disks = [(0, {"size": opts.sd_size})]
1886
1887     if opts.disks:
1888       try:
1889         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1890       except ValueError, err:
1891         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1892       disks = [{}] * disk_max
1893     else:
1894       disks = []
1895     for didx, ddict in opts.disks:
1896       didx = int(didx)
1897       if not isinstance(ddict, dict):
1898         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1899         raise errors.OpPrereqError(msg)
1900       elif "size" in ddict:
1901         if "adopt" in ddict:
1902           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1903                                      " (disk %d)" % didx)
1904         try:
1905           ddict["size"] = utils.ParseUnit(ddict["size"])
1906         except ValueError, err:
1907           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1908                                      (didx, err))
1909       elif "adopt" in ddict:
1910         if mode == constants.INSTANCE_IMPORT:
1911           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1912                                      " import")
1913         ddict["size"] = 0
1914       else:
1915         raise errors.OpPrereqError("Missing size or adoption source for"
1916                                    " disk %d" % didx)
1917       disks[didx] = ddict
1918
1919   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1920   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1921
1922   if mode == constants.INSTANCE_CREATE:
1923     start = opts.start
1924     os_type = opts.os
1925     force_variant = opts.force_variant
1926     src_node = None
1927     src_path = None
1928     no_install = opts.no_install
1929     identify_defaults = False
1930   elif mode == constants.INSTANCE_IMPORT:
1931     start = False
1932     os_type = None
1933     force_variant = False
1934     src_node = opts.src_node
1935     src_path = opts.src_dir
1936     no_install = None
1937     identify_defaults = opts.identify_defaults
1938   else:
1939     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1940
1941   op = opcodes.OpCreateInstance(instance_name=instance,
1942                                 disks=disks,
1943                                 disk_template=opts.disk_template,
1944                                 nics=nics,
1945                                 pnode=pnode, snode=snode,
1946                                 ip_check=opts.ip_check,
1947                                 name_check=opts.name_check,
1948                                 wait_for_sync=opts.wait_for_sync,
1949                                 file_storage_dir=opts.file_storage_dir,
1950                                 file_driver=opts.file_driver,
1951                                 iallocator=opts.iallocator,
1952                                 hypervisor=hypervisor,
1953                                 hvparams=hvparams,
1954                                 beparams=opts.beparams,
1955                                 osparams=opts.osparams,
1956                                 mode=mode,
1957                                 start=start,
1958                                 os_type=os_type,
1959                                 force_variant=force_variant,
1960                                 src_node=src_node,
1961                                 src_path=src_path,
1962                                 no_install=no_install,
1963                                 identify_defaults=identify_defaults)
1964
1965   SubmitOrSend(op, opts)
1966   return 0
1967
1968
1969 class _RunWhileClusterStoppedHelper:
1970   """Helper class for L{RunWhileClusterStopped} to simplify state management
1971
1972   """
1973   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1974     """Initializes this class.
1975
1976     @type feedback_fn: callable
1977     @param feedback_fn: Feedback function
1978     @type cluster_name: string
1979     @param cluster_name: Cluster name
1980     @type master_node: string
1981     @param master_node Master node name
1982     @type online_nodes: list
1983     @param online_nodes: List of names of online nodes
1984
1985     """
1986     self.feedback_fn = feedback_fn
1987     self.cluster_name = cluster_name
1988     self.master_node = master_node
1989     self.online_nodes = online_nodes
1990
1991     self.ssh = ssh.SshRunner(self.cluster_name)
1992
1993     self.nonmaster_nodes = [name for name in online_nodes
1994                             if name != master_node]
1995
1996     assert self.master_node not in self.nonmaster_nodes
1997
1998   def _RunCmd(self, node_name, cmd):
1999     """Runs a command on the local or a remote machine.
2000
2001     @type node_name: string
2002     @param node_name: Machine name
2003     @type cmd: list
2004     @param cmd: Command
2005
2006     """
2007     if node_name is None or node_name == self.master_node:
2008       # No need to use SSH
2009       result = utils.RunCmd(cmd)
2010     else:
2011       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
2012
2013     if result.failed:
2014       errmsg = ["Failed to run command %s" % result.cmd]
2015       if node_name:
2016         errmsg.append("on node %s" % node_name)
2017       errmsg.append(": exitcode %s and error %s" %
2018                     (result.exit_code, result.output))
2019       raise errors.OpExecError(" ".join(errmsg))
2020
2021   def Call(self, fn, *args):
2022     """Call function while all daemons are stopped.
2023
2024     @type fn: callable
2025     @param fn: Function to be called
2026
2027     """
2028     # Pause watcher by acquiring an exclusive lock on watcher state file
2029     self.feedback_fn("Blocking watcher")
2030     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
2031     try:
2032       # TODO: Currently, this just blocks. There's no timeout.
2033       # TODO: Should it be a shared lock?
2034       watcher_block.Exclusive(blocking=True)
2035
2036       # Stop master daemons, so that no new jobs can come in and all running
2037       # ones are finished
2038       self.feedback_fn("Stopping master daemons")
2039       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
2040       try:
2041         # Stop daemons on all nodes
2042         for node_name in self.online_nodes:
2043           self.feedback_fn("Stopping daemons on %s" % node_name)
2044           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
2045
2046         # All daemons are shut down now
2047         try:
2048           return fn(self, *args)
2049         except Exception, err:
2050           _, errmsg = FormatError(err)
2051           logging.exception("Caught exception")
2052           self.feedback_fn(errmsg)
2053           raise
2054       finally:
2055         # Start cluster again, master node last
2056         for node_name in self.nonmaster_nodes + [self.master_node]:
2057           self.feedback_fn("Starting daemons on %s" % node_name)
2058           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
2059     finally:
2060       # Resume watcher
2061       watcher_block.Close()
2062
2063
2064 def RunWhileClusterStopped(feedback_fn, fn, *args):
2065   """Calls a function while all cluster daemons are stopped.
2066
2067   @type feedback_fn: callable
2068   @param feedback_fn: Feedback function
2069   @type fn: callable
2070   @param fn: Function to be called when daemons are stopped
2071
2072   """
2073   feedback_fn("Gathering cluster information")
2074
2075   # This ensures we're running on the master daemon
2076   cl = GetClient()
2077
2078   (cluster_name, master_node) = \
2079     cl.QueryConfigValues(["cluster_name", "master_node"])
2080
2081   online_nodes = GetOnlineNodes([], cl=cl)
2082
2083   # Don't keep a reference to the client. The master daemon will go away.
2084   del cl
2085
2086   assert master_node in online_nodes
2087
2088   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2089                                        online_nodes).Call(fn, *args)
2090
2091
2092 def GenerateTable(headers, fields, separator, data,
2093                   numfields=None, unitfields=None,
2094                   units=None):
2095   """Prints a table with headers and different fields.
2096
2097   @type headers: dict
2098   @param headers: dictionary mapping field names to headers for
2099       the table
2100   @type fields: list
2101   @param fields: the field names corresponding to each row in
2102       the data field
2103   @param separator: the separator to be used; if this is None,
2104       the default 'smart' algorithm is used which computes optimal
2105       field width, otherwise just the separator is used between
2106       each field
2107   @type data: list
2108   @param data: a list of lists, each sublist being one row to be output
2109   @type numfields: list
2110   @param numfields: a list with the fields that hold numeric
2111       values and thus should be right-aligned
2112   @type unitfields: list
2113   @param unitfields: a list with the fields that hold numeric
2114       values that should be formatted with the units field
2115   @type units: string or None
2116   @param units: the units we should use for formatting, or None for
2117       automatic choice (human-readable for non-separator usage, otherwise
2118       megabytes); this is a one-letter string
2119
2120   """
2121   if units is None:
2122     if separator:
2123       units = "m"
2124     else:
2125       units = "h"
2126
2127   if numfields is None:
2128     numfields = []
2129   if unitfields is None:
2130     unitfields = []
2131
2132   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2133   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2134
2135   format_fields = []
2136   for field in fields:
2137     if headers and field not in headers:
2138       # TODO: handle better unknown fields (either revert to old
2139       # style of raising exception, or deal more intelligently with
2140       # variable fields)
2141       headers[field] = field
2142     if separator is not None:
2143       format_fields.append("%s")
2144     elif numfields.Matches(field):
2145       format_fields.append("%*s")
2146     else:
2147       format_fields.append("%-*s")
2148
2149   if separator is None:
2150     mlens = [0 for name in fields]
2151     format_str = ' '.join(format_fields)
2152   else:
2153     format_str = separator.replace("%", "%%").join(format_fields)
2154
2155   for row in data:
2156     if row is None:
2157       continue
2158     for idx, val in enumerate(row):
2159       if unitfields.Matches(fields[idx]):
2160         try:
2161           val = int(val)
2162         except (TypeError, ValueError):
2163           pass
2164         else:
2165           val = row[idx] = utils.FormatUnit(val, units)
2166       val = row[idx] = str(val)
2167       if separator is None:
2168         mlens[idx] = max(mlens[idx], len(val))
2169
2170   result = []
2171   if headers:
2172     args = []
2173     for idx, name in enumerate(fields):
2174       hdr = headers[name]
2175       if separator is None:
2176         mlens[idx] = max(mlens[idx], len(hdr))
2177         args.append(mlens[idx])
2178       args.append(hdr)
2179     result.append(format_str % tuple(args))
2180
2181   if separator is None:
2182     assert len(mlens) == len(fields)
2183
2184     if fields and not numfields.Matches(fields[-1]):
2185       mlens[-1] = 0
2186
2187   for line in data:
2188     args = []
2189     if line is None:
2190       line = ['-' for _ in fields]
2191     for idx in range(len(fields)):
2192       if separator is None:
2193         args.append(mlens[idx])
2194       args.append(line[idx])
2195     result.append(format_str % tuple(args))
2196
2197   return result
2198
2199
2200 def FormatTimestamp(ts):
2201   """Formats a given timestamp.
2202
2203   @type ts: timestamp
2204   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2205
2206   @rtype: string
2207   @return: a string with the formatted timestamp
2208
2209   """
2210   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2211     return '?'
2212   sec, usec = ts
2213   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2214
2215
2216 def ParseTimespec(value):
2217   """Parse a time specification.
2218
2219   The following suffixed will be recognized:
2220
2221     - s: seconds
2222     - m: minutes
2223     - h: hours
2224     - d: day
2225     - w: weeks
2226
2227   Without any suffix, the value will be taken to be in seconds.
2228
2229   """
2230   value = str(value)
2231   if not value:
2232     raise errors.OpPrereqError("Empty time specification passed")
2233   suffix_map = {
2234     's': 1,
2235     'm': 60,
2236     'h': 3600,
2237     'd': 86400,
2238     'w': 604800,
2239     }
2240   if value[-1] not in suffix_map:
2241     try:
2242       value = int(value)
2243     except (TypeError, ValueError):
2244       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2245   else:
2246     multiplier = suffix_map[value[-1]]
2247     value = value[:-1]
2248     if not value: # no data left after stripping the suffix
2249       raise errors.OpPrereqError("Invalid time specification (only"
2250                                  " suffix passed)")
2251     try:
2252       value = int(value) * multiplier
2253     except (TypeError, ValueError):
2254       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2255   return value
2256
2257
2258 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2259                    filter_master=False):
2260   """Returns the names of online nodes.
2261
2262   This function will also log a warning on stderr with the names of
2263   the online nodes.
2264
2265   @param nodes: if not empty, use only this subset of nodes (minus the
2266       offline ones)
2267   @param cl: if not None, luxi client to use
2268   @type nowarn: boolean
2269   @param nowarn: by default, this function will output a note with the
2270       offline nodes that are skipped; if this parameter is True the
2271       note is not displayed
2272   @type secondary_ips: boolean
2273   @param secondary_ips: if True, return the secondary IPs instead of the
2274       names, useful for doing network traffic over the replication interface
2275       (if any)
2276   @type filter_master: boolean
2277   @param filter_master: if True, do not return the master node in the list
2278       (useful in coordination with secondary_ips where we cannot check our
2279       node name against the list)
2280
2281   """
2282   if cl is None:
2283     cl = GetClient()
2284
2285   if secondary_ips:
2286     name_idx = 2
2287   else:
2288     name_idx = 0
2289
2290   if filter_master:
2291     master_node = cl.QueryConfigValues(["master_node"])[0]
2292     filter_fn = lambda x: x != master_node
2293   else:
2294     filter_fn = lambda _: True
2295
2296   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2297                          use_locking=False)
2298   offline = [row[0] for row in result if row[1]]
2299   if offline and not nowarn:
2300     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2301   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2302
2303
2304 def _ToStream(stream, txt, *args):
2305   """Write a message to a stream, bypassing the logging system
2306
2307   @type stream: file object
2308   @param stream: the file to which we should write
2309   @type txt: str
2310   @param txt: the message
2311
2312   """
2313   if args:
2314     args = tuple(args)
2315     stream.write(txt % args)
2316   else:
2317     stream.write(txt)
2318   stream.write('\n')
2319   stream.flush()
2320
2321
2322 def ToStdout(txt, *args):
2323   """Write a message to stdout only, bypassing the logging system
2324
2325   This is just a wrapper over _ToStream.
2326
2327   @type txt: str
2328   @param txt: the message
2329
2330   """
2331   _ToStream(sys.stdout, txt, *args)
2332
2333
2334 def ToStderr(txt, *args):
2335   """Write a message to stderr only, bypassing the logging system
2336
2337   This is just a wrapper over _ToStream.
2338
2339   @type txt: str
2340   @param txt: the message
2341
2342   """
2343   _ToStream(sys.stderr, txt, *args)
2344
2345
2346 class JobExecutor(object):
2347   """Class which manages the submission and execution of multiple jobs.
2348
2349   Note that instances of this class should not be reused between
2350   GetResults() calls.
2351
2352   """
2353   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2354     self.queue = []
2355     if cl is None:
2356       cl = GetClient()
2357     self.cl = cl
2358     self.verbose = verbose
2359     self.jobs = []
2360     self.opts = opts
2361     self.feedback_fn = feedback_fn
2362
2363   def QueueJob(self, name, *ops):
2364     """Record a job for later submit.
2365
2366     @type name: string
2367     @param name: a description of the job, will be used in WaitJobSet
2368     """
2369     SetGenericOpcodeOpts(ops, self.opts)
2370     self.queue.append((name, ops))
2371
2372   def SubmitPending(self, each=False):
2373     """Submit all pending jobs.
2374
2375     """
2376     if each:
2377       results = []
2378       for row in self.queue:
2379         # SubmitJob will remove the success status, but raise an exception if
2380         # the submission fails, so we'll notice that anyway.
2381         results.append([True, self.cl.SubmitJob(row[1])])
2382     else:
2383       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2384     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2385                                                             self.queue)):
2386       self.jobs.append((idx, status, data, name))
2387
2388   def _ChooseJob(self):
2389     """Choose a non-waiting/queued job to poll next.
2390
2391     """
2392     assert self.jobs, "_ChooseJob called with empty job list"
2393
2394     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2395     assert result
2396
2397     for job_data, status in zip(self.jobs, result):
2398       if (isinstance(status, list) and status and
2399           status[0] in (constants.JOB_STATUS_QUEUED,
2400                         constants.JOB_STATUS_WAITLOCK,
2401                         constants.JOB_STATUS_CANCELING)):
2402         # job is still present and waiting
2403         continue
2404       # good candidate found (either running job or lost job)
2405       self.jobs.remove(job_data)
2406       return job_data
2407
2408     # no job found
2409     return self.jobs.pop(0)
2410
2411   def GetResults(self):
2412     """Wait for and return the results of all jobs.
2413
2414     @rtype: list
2415     @return: list of tuples (success, job results), in the same order
2416         as the submitted jobs; if a job has failed, instead of the result
2417         there will be the error message
2418
2419     """
2420     if not self.jobs:
2421       self.SubmitPending()
2422     results = []
2423     if self.verbose:
2424       ok_jobs = [row[2] for row in self.jobs if row[1]]
2425       if ok_jobs:
2426         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2427
2428     # first, remove any non-submitted jobs
2429     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2430     for idx, _, jid, name in failures:
2431       ToStderr("Failed to submit job for %s: %s", name, jid)
2432       results.append((idx, False, jid))
2433
2434     while self.jobs:
2435       (idx, _, jid, name) = self._ChooseJob()
2436       ToStdout("Waiting for job %s for %s...", jid, name)
2437       try:
2438         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2439         success = True
2440       except errors.JobLost, err:
2441         _, job_result = FormatError(err)
2442         ToStderr("Job %s for %s has been archived, cannot check its result",
2443                  jid, name)
2444         success = False
2445       except (errors.GenericError, luxi.ProtocolError), err:
2446         _, job_result = FormatError(err)
2447         success = False
2448         # the error message will always be shown, verbose or not
2449         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2450
2451       results.append((idx, success, job_result))
2452
2453     # sort based on the index, then drop it
2454     results.sort()
2455     results = [i[1:] for i in results]
2456
2457     return results
2458
2459   def WaitOrShow(self, wait):
2460     """Wait for job results or only print the job IDs.
2461
2462     @type wait: boolean
2463     @param wait: whether to wait or not
2464
2465     """
2466     if wait:
2467       return self.GetResults()
2468     else:
2469       if not self.jobs:
2470         self.SubmitPending()
2471       for _, status, result, name in self.jobs:
2472         if status:
2473           ToStdout("%s: %s", result, name)
2474         else:
2475           ToStderr("Failure for %s: %s", name, result)
2476       return [row[1:3] for row in self.jobs]