Daemons conditionally setup console logging
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41
42 from optparse import (OptionParser, TitledHelpFormatter,
43                       Option, OptionValueError)
44
45
46 __all__ = [
47   # Command line options
48   "ADD_UIDS_OPT",
49   "ALLOCATABLE_OPT",
50   "ALL_OPT",
51   "AUTO_PROMOTE_OPT",
52   "AUTO_REPLACE_OPT",
53   "BACKEND_OPT",
54   "CLEANUP_OPT",
55   "CONFIRM_OPT",
56   "CP_SIZE_OPT",
57   "DEBUG_OPT",
58   "DEBUG_SIMERR_OPT",
59   "DISKIDX_OPT",
60   "DISK_OPT",
61   "DISK_TEMPLATE_OPT",
62   "DRAINED_OPT",
63   "EARLY_RELEASE_OPT",
64   "ENABLED_HV_OPT",
65   "ERROR_CODES_OPT",
66   "FIELDS_OPT",
67   "FILESTORE_DIR_OPT",
68   "FILESTORE_DRIVER_OPT",
69   "FORCE_OPT",
70   "FORCE_VARIANT_OPT",
71   "GLOBAL_FILEDIR_OPT",
72   "HVLIST_OPT",
73   "HVOPTS_OPT",
74   "HYPERVISOR_OPT",
75   "IALLOCATOR_OPT",
76   "IDENTIFY_DEFAULTS_OPT",
77   "IGNORE_CONSIST_OPT",
78   "IGNORE_FAILURES_OPT",
79   "IGNORE_SECONDARIES_OPT",
80   "IGNORE_SIZE_OPT",
81   "MAC_PREFIX_OPT",
82   "MAINTAIN_NODE_HEALTH_OPT",
83   "MASTER_NETDEV_OPT",
84   "MC_OPT",
85   "NET_OPT",
86   "NEW_CLUSTER_CERT_OPT",
87   "NEW_CONFD_HMAC_KEY_OPT",
88   "NEW_RAPI_CERT_OPT",
89   "NEW_SECONDARY_OPT",
90   "NIC_PARAMS_OPT",
91   "NODE_LIST_OPT",
92   "NODE_PLACEMENT_OPT",
93   "NOHDR_OPT",
94   "NOIPCHECK_OPT",
95   "NO_INSTALL_OPT",
96   "NONAMECHECK_OPT",
97   "NOLVM_STORAGE_OPT",
98   "NOMODIFY_ETCHOSTS_OPT",
99   "NOMODIFY_SSH_SETUP_OPT",
100   "NONICS_OPT",
101   "NONLIVE_OPT",
102   "NONPLUS1_OPT",
103   "NOSHUTDOWN_OPT",
104   "NOSTART_OPT",
105   "NOSSH_KEYCHECK_OPT",
106   "NOVOTING_OPT",
107   "NWSYNC_OPT",
108   "ON_PRIMARY_OPT",
109   "ON_SECONDARY_OPT",
110   "OFFLINE_OPT",
111   "OS_OPT",
112   "OS_SIZE_OPT",
113   "RAPI_CERT_OPT",
114   "READD_OPT",
115   "REBOOT_TYPE_OPT",
116   "REMOVE_UIDS_OPT",
117   "SECONDARY_IP_OPT",
118   "SELECT_OS_OPT",
119   "SEP_OPT",
120   "SHOWCMD_OPT",
121   "SHUTDOWN_TIMEOUT_OPT",
122   "SINGLE_NODE_OPT",
123   "SRC_DIR_OPT",
124   "SRC_NODE_OPT",
125   "SUBMIT_OPT",
126   "STATIC_OPT",
127   "SYNC_OPT",
128   "TAG_SRC_OPT",
129   "TIMEOUT_OPT",
130   "UIDPOOL_OPT",
131   "USEUNITS_OPT",
132   "USE_REPL_NET_OPT",
133   "VERBOSE_OPT",
134   "VG_NAME_OPT",
135   "YES_DOIT_OPT",
136   # Generic functions for CLI programs
137   "GenericMain",
138   "GenericInstanceCreate",
139   "GetClient",
140   "GetOnlineNodes",
141   "JobExecutor",
142   "JobSubmittedException",
143   "ParseTimespec",
144   "RunWhileClusterStopped",
145   "SubmitOpCode",
146   "SubmitOrSend",
147   "UsesRPC",
148   # Formatting functions
149   "ToStderr", "ToStdout",
150   "FormatError",
151   "GenerateTable",
152   "AskUser",
153   "FormatTimestamp",
154   # Tags functions
155   "ListTags",
156   "AddTags",
157   "RemoveTags",
158   # command line options support infrastructure
159   "ARGS_MANY_INSTANCES",
160   "ARGS_MANY_NODES",
161   "ARGS_NONE",
162   "ARGS_ONE_INSTANCE",
163   "ARGS_ONE_NODE",
164   "ARGS_ONE_OS",
165   "ArgChoice",
166   "ArgCommand",
167   "ArgFile",
168   "ArgHost",
169   "ArgInstance",
170   "ArgJobId",
171   "ArgNode",
172   "ArgOs",
173   "ArgSuggest",
174   "ArgUnknown",
175   "OPT_COMPL_INST_ADD_NODES",
176   "OPT_COMPL_MANY_NODES",
177   "OPT_COMPL_ONE_IALLOCATOR",
178   "OPT_COMPL_ONE_INSTANCE",
179   "OPT_COMPL_ONE_NODE",
180   "OPT_COMPL_ONE_OS",
181   "cli_option",
182   "SplitNodeOption",
183   "CalculateOSNames",
184   ]
185
186 NO_PREFIX = "no_"
187 UN_PREFIX = "-"
188
189
190 class _Argument:
191   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
192     self.min = min
193     self.max = max
194
195   def __repr__(self):
196     return ("<%s min=%s max=%s>" %
197             (self.__class__.__name__, self.min, self.max))
198
199
200 class ArgSuggest(_Argument):
201   """Suggesting argument.
202
203   Value can be any of the ones passed to the constructor.
204
205   """
206   # pylint: disable-msg=W0622
207   def __init__(self, min=0, max=None, choices=None):
208     _Argument.__init__(self, min=min, max=max)
209     self.choices = choices
210
211   def __repr__(self):
212     return ("<%s min=%s max=%s choices=%r>" %
213             (self.__class__.__name__, self.min, self.max, self.choices))
214
215
216 class ArgChoice(ArgSuggest):
217   """Choice argument.
218
219   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
220   but value must be one of the choices.
221
222   """
223
224
225 class ArgUnknown(_Argument):
226   """Unknown argument to program (e.g. determined at runtime).
227
228   """
229
230
231 class ArgInstance(_Argument):
232   """Instances argument.
233
234   """
235
236
237 class ArgNode(_Argument):
238   """Node argument.
239
240   """
241
242 class ArgJobId(_Argument):
243   """Job ID argument.
244
245   """
246
247
248 class ArgFile(_Argument):
249   """File path argument.
250
251   """
252
253
254 class ArgCommand(_Argument):
255   """Command argument.
256
257   """
258
259
260 class ArgHost(_Argument):
261   """Host argument.
262
263   """
264
265
266 class ArgOs(_Argument):
267   """OS argument.
268
269   """
270
271
272 ARGS_NONE = []
273 ARGS_MANY_INSTANCES = [ArgInstance()]
274 ARGS_MANY_NODES = [ArgNode()]
275 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
276 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
277 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
278
279
280 def _ExtractTagsObject(opts, args):
281   """Extract the tag type object.
282
283   Note that this function will modify its args parameter.
284
285   """
286   if not hasattr(opts, "tag_type"):
287     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
288   kind = opts.tag_type
289   if kind == constants.TAG_CLUSTER:
290     retval = kind, kind
291   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
292     if not args:
293       raise errors.OpPrereqError("no arguments passed to the command")
294     name = args.pop(0)
295     retval = kind, name
296   else:
297     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
298   return retval
299
300
301 def _ExtendTags(opts, args):
302   """Extend the args if a source file has been given.
303
304   This function will extend the tags with the contents of the file
305   passed in the 'tags_source' attribute of the opts parameter. A file
306   named '-' will be replaced by stdin.
307
308   """
309   fname = opts.tags_source
310   if fname is None:
311     return
312   if fname == "-":
313     new_fh = sys.stdin
314   else:
315     new_fh = open(fname, "r")
316   new_data = []
317   try:
318     # we don't use the nice 'new_data = [line.strip() for line in fh]'
319     # because of python bug 1633941
320     while True:
321       line = new_fh.readline()
322       if not line:
323         break
324       new_data.append(line.strip())
325   finally:
326     new_fh.close()
327   args.extend(new_data)
328
329
330 def ListTags(opts, args):
331   """List the tags on a given object.
332
333   This is a generic implementation that knows how to deal with all
334   three cases of tag objects (cluster, node, instance). The opts
335   argument is expected to contain a tag_type field denoting what
336   object type we work on.
337
338   """
339   kind, name = _ExtractTagsObject(opts, args)
340   cl = GetClient()
341   result = cl.QueryTags(kind, name)
342   result = list(result)
343   result.sort()
344   for tag in result:
345     ToStdout(tag)
346
347
348 def AddTags(opts, args):
349   """Add tags on a given object.
350
351   This is a generic implementation that knows how to deal with all
352   three cases of tag objects (cluster, node, instance). The opts
353   argument is expected to contain a tag_type field denoting what
354   object type we work on.
355
356   """
357   kind, name = _ExtractTagsObject(opts, args)
358   _ExtendTags(opts, args)
359   if not args:
360     raise errors.OpPrereqError("No tags to be added")
361   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
362   SubmitOpCode(op)
363
364
365 def RemoveTags(opts, args):
366   """Remove tags from a given object.
367
368   This is a generic implementation that knows how to deal with all
369   three cases of tag objects (cluster, node, instance). The opts
370   argument is expected to contain a tag_type field denoting what
371   object type we work on.
372
373   """
374   kind, name = _ExtractTagsObject(opts, args)
375   _ExtendTags(opts, args)
376   if not args:
377     raise errors.OpPrereqError("No tags to be removed")
378   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
379   SubmitOpCode(op)
380
381
382 def check_unit(option, opt, value): # pylint: disable-msg=W0613
383   """OptParsers custom converter for units.
384
385   """
386   try:
387     return utils.ParseUnit(value)
388   except errors.UnitParseError, err:
389     raise OptionValueError("option %s: %s" % (opt, err))
390
391
392 def _SplitKeyVal(opt, data):
393   """Convert a KeyVal string into a dict.
394
395   This function will convert a key=val[,...] string into a dict. Empty
396   values will be converted specially: keys which have the prefix 'no_'
397   will have the value=False and the prefix stripped, the others will
398   have value=True.
399
400   @type opt: string
401   @param opt: a string holding the option name for which we process the
402       data, used in building error messages
403   @type data: string
404   @param data: a string of the format key=val,key=val,...
405   @rtype: dict
406   @return: {key=val, key=val}
407   @raises errors.ParameterError: if there are duplicate keys
408
409   """
410   kv_dict = {}
411   if data:
412     for elem in utils.UnescapeAndSplit(data, sep=","):
413       if "=" in elem:
414         key, val = elem.split("=", 1)
415       else:
416         if elem.startswith(NO_PREFIX):
417           key, val = elem[len(NO_PREFIX):], False
418         elif elem.startswith(UN_PREFIX):
419           key, val = elem[len(UN_PREFIX):], None
420         else:
421           key, val = elem, True
422       if key in kv_dict:
423         raise errors.ParameterError("Duplicate key '%s' in option %s" %
424                                     (key, opt))
425       kv_dict[key] = val
426   return kv_dict
427
428
429 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
430   """Custom parser for ident:key=val,key=val options.
431
432   This will store the parsed values as a tuple (ident, {key: val}). As such,
433   multiple uses of this option via action=append is possible.
434
435   """
436   if ":" not in value:
437     ident, rest = value, ''
438   else:
439     ident, rest = value.split(":", 1)
440
441   if ident.startswith(NO_PREFIX):
442     if rest:
443       msg = "Cannot pass options when removing parameter groups: %s" % value
444       raise errors.ParameterError(msg)
445     retval = (ident[len(NO_PREFIX):], False)
446   elif ident.startswith(UN_PREFIX):
447     if rest:
448       msg = "Cannot pass options when removing parameter groups: %s" % value
449       raise errors.ParameterError(msg)
450     retval = (ident[len(UN_PREFIX):], None)
451   else:
452     kv_dict = _SplitKeyVal(opt, rest)
453     retval = (ident, kv_dict)
454   return retval
455
456
457 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
458   """Custom parser class for key=val,key=val options.
459
460   This will store the parsed values as a dict {key: val}.
461
462   """
463   return _SplitKeyVal(opt, value)
464
465
466 def check_bool(option, opt, value): # pylint: disable-msg=W0613
467   """Custom parser for yes/no options.
468
469   This will store the parsed value as either True or False.
470
471   """
472   value = value.lower()
473   if value == constants.VALUE_FALSE or value == "no":
474     return False
475   elif value == constants.VALUE_TRUE or value == "yes":
476     return True
477   else:
478     raise errors.ParameterError("Invalid boolean value '%s'" % value)
479
480
481 # completion_suggestion is normally a list. Using numeric values not evaluating
482 # to False for dynamic completion.
483 (OPT_COMPL_MANY_NODES,
484  OPT_COMPL_ONE_NODE,
485  OPT_COMPL_ONE_INSTANCE,
486  OPT_COMPL_ONE_OS,
487  OPT_COMPL_ONE_IALLOCATOR,
488  OPT_COMPL_INST_ADD_NODES) = range(100, 106)
489
490 OPT_COMPL_ALL = frozenset([
491   OPT_COMPL_MANY_NODES,
492   OPT_COMPL_ONE_NODE,
493   OPT_COMPL_ONE_INSTANCE,
494   OPT_COMPL_ONE_OS,
495   OPT_COMPL_ONE_IALLOCATOR,
496   OPT_COMPL_INST_ADD_NODES,
497   ])
498
499
500 class CliOption(Option):
501   """Custom option class for optparse.
502
503   """
504   ATTRS = Option.ATTRS + [
505     "completion_suggest",
506     ]
507   TYPES = Option.TYPES + (
508     "identkeyval",
509     "keyval",
510     "unit",
511     "bool",
512     )
513   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
514   TYPE_CHECKER["identkeyval"] = check_ident_key_val
515   TYPE_CHECKER["keyval"] = check_key_val
516   TYPE_CHECKER["unit"] = check_unit
517   TYPE_CHECKER["bool"] = check_bool
518
519
520 # optparse.py sets make_option, so we do it for our own option class, too
521 cli_option = CliOption
522
523
524 _YORNO = "yes|no"
525
526 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
527                        help="Increase debugging level")
528
529 NOHDR_OPT = cli_option("--no-headers", default=False,
530                        action="store_true", dest="no_headers",
531                        help="Don't display column headers")
532
533 SEP_OPT = cli_option("--separator", default=None,
534                      action="store", dest="separator",
535                      help=("Separator between output fields"
536                            " (defaults to one space)"))
537
538 USEUNITS_OPT = cli_option("--units", default=None,
539                           dest="units", choices=('h', 'm', 'g', 't'),
540                           help="Specify units for output (one of hmgt)")
541
542 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
543                         type="string", metavar="FIELDS",
544                         help="Comma separated list of output fields")
545
546 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
547                        default=False, help="Force the operation")
548
549 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
550                          default=False, help="Do not require confirmation")
551
552 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
553                          default=None, help="File with tag names")
554
555 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
556                         default=False, action="store_true",
557                         help=("Submit the job and return the job ID, but"
558                               " don't wait for the job to finish"))
559
560 SYNC_OPT = cli_option("--sync", dest="do_locking",
561                       default=False, action="store_true",
562                       help=("Grab locks while doing the queries"
563                             " in order to ensure more consistent results"))
564
565 _DRY_RUN_OPT = cli_option("--dry-run", default=False,
566                           action="store_true",
567                           help=("Do not execute the operation, just run the"
568                                 " check steps and verify it it could be"
569                                 " executed"))
570
571 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
572                          action="store_true",
573                          help="Increase the verbosity of the operation")
574
575 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
576                               action="store_true", dest="simulate_errors",
577                               help="Debugging option that makes the operation"
578                               " treat most runtime checks as failed")
579
580 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
581                         default=True, action="store_false",
582                         help="Don't wait for sync (DANGEROUS!)")
583
584 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
585                                help="Custom disk setup (diskless, file,"
586                                " plain or drbd)",
587                                default=None, metavar="TEMPL",
588                                choices=list(constants.DISK_TEMPLATES))
589
590 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
591                         help="Do not create any network cards for"
592                         " the instance")
593
594 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
595                                help="Relative path under default cluster-wide"
596                                " file storage dir to store file-based disks",
597                                default=None, metavar="<DIR>")
598
599 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
600                                   help="Driver to use for image files",
601                                   default="loop", metavar="<DRIVER>",
602                                   choices=list(constants.FILE_DRIVER))
603
604 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
605                             help="Select nodes for the instance automatically"
606                             " using the <NAME> iallocator plugin",
607                             default=None, type="string",
608                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
609
610 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
611                     metavar="<os>",
612                     completion_suggest=OPT_COMPL_ONE_OS)
613
614 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
615                                action="store_true", default=False,
616                                help="Force an unknown variant")
617
618 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
619                             action="store_true", default=False,
620                             help="Do not install the OS (will"
621                             " enable no-start)")
622
623 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
624                          type="keyval", default={},
625                          help="Backend parameters")
626
627 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
628                          default={}, dest="hvparams",
629                          help="Hypervisor parameters")
630
631 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
632                             help="Hypervisor and hypervisor options, in the"
633                             " format hypervisor:option=value,option=value,...",
634                             default=None, type="identkeyval")
635
636 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
637                         help="Hypervisor and hypervisor options, in the"
638                         " format hypervisor:option=value,option=value,...",
639                         default=[], action="append", type="identkeyval")
640
641 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
642                            action="store_false",
643                            help="Don't check that the instance's IP"
644                            " is alive")
645
646 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
647                              default=True, action="store_false",
648                              help="Don't check that the instance's name"
649                              " is resolvable")
650
651 NET_OPT = cli_option("--net",
652                      help="NIC parameters", default=[],
653                      dest="nics", action="append", type="identkeyval")
654
655 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
656                       dest="disks", action="append", type="identkeyval")
657
658 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
659                          help="Comma-separated list of disks"
660                          " indices to act on (e.g. 0,2) (optional,"
661                          " defaults to all disks)")
662
663 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
664                          help="Enforces a single-disk configuration using the"
665                          " given disk size, in MiB unless a suffix is used",
666                          default=None, type="unit", metavar="<size>")
667
668 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
669                                 dest="ignore_consistency",
670                                 action="store_true", default=False,
671                                 help="Ignore the consistency of the disks on"
672                                 " the secondary")
673
674 NONLIVE_OPT = cli_option("--non-live", dest="live",
675                          default=True, action="store_false",
676                          help="Do a non-live migration (this usually means"
677                          " freeze the instance, save the state, transfer and"
678                          " only then resume running on the secondary node)")
679
680 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
681                                 help="Target node and optional secondary node",
682                                 metavar="<pnode>[:<snode>]",
683                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
684
685 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
686                            action="append", metavar="<node>",
687                            help="Use only this node (can be used multiple"
688                            " times, if not given defaults to all nodes)",
689                            completion_suggest=OPT_COMPL_ONE_NODE)
690
691 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
692                              metavar="<node>",
693                              completion_suggest=OPT_COMPL_ONE_NODE)
694
695 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
696                          action="store_false",
697                          help="Don't start the instance after creation")
698
699 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
700                          action="store_true", default=False,
701                          help="Show command instead of executing it")
702
703 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
704                          default=False, action="store_true",
705                          help="Instead of performing the migration, try to"
706                          " recover from a failed cleanup. This is safe"
707                          " to run even if the instance is healthy, but it"
708                          " will create extra replication traffic and "
709                          " disrupt briefly the replication (like during the"
710                          " migration")
711
712 STATIC_OPT = cli_option("-s", "--static", dest="static",
713                         action="store_true", default=False,
714                         help="Only show configuration data, not runtime data")
715
716 ALL_OPT = cli_option("--all", dest="show_all",
717                      default=False, action="store_true",
718                      help="Show info on all instances on the cluster."
719                      " This can take a long time to run, use wisely")
720
721 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
722                            action="store_true", default=False,
723                            help="Interactive OS reinstall, lists available"
724                            " OS templates for selection")
725
726 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
727                                  action="store_true", default=False,
728                                  help="Remove the instance from the cluster"
729                                  " configuration even if there are failures"
730                                  " during the removal process")
731
732 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
733                                help="Specifies the new secondary node",
734                                metavar="NODE", default=None,
735                                completion_suggest=OPT_COMPL_ONE_NODE)
736
737 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
738                             default=False, action="store_true",
739                             help="Replace the disk(s) on the primary"
740                             " node (only for the drbd template)")
741
742 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
743                               default=False, action="store_true",
744                               help="Replace the disk(s) on the secondary"
745                               " node (only for the drbd template)")
746
747 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
748                               default=False, action="store_true",
749                               help="Lock all nodes and auto-promote as needed"
750                               " to MC status")
751
752 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
753                               default=False, action="store_true",
754                               help="Automatically replace faulty disks"
755                               " (only for the drbd template)")
756
757 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
758                              default=False, action="store_true",
759                              help="Ignore current recorded size"
760                              " (useful for forcing activation when"
761                              " the recorded size is wrong)")
762
763 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
764                           metavar="<node>",
765                           completion_suggest=OPT_COMPL_ONE_NODE)
766
767 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
768                          metavar="<dir>")
769
770 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
771                               help="Specify the secondary ip for the node",
772                               metavar="ADDRESS", default=None)
773
774 READD_OPT = cli_option("--readd", dest="readd",
775                        default=False, action="store_true",
776                        help="Readd old node after replacing it")
777
778 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
779                                 default=True, action="store_false",
780                                 help="Disable SSH key fingerprint checking")
781
782
783 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
784                     type="bool", default=None, metavar=_YORNO,
785                     help="Set the master_candidate flag on the node")
786
787 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
788                          type="bool", default=None,
789                          help="Set the offline flag on the node")
790
791 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
792                          type="bool", default=None,
793                          help="Set the drained flag on the node")
794
795 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
796                              type="bool", default=None, metavar=_YORNO,
797                              help="Set the allocatable flag on a volume")
798
799 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
800                                help="Disable support for lvm based instances"
801                                " (cluster-wide)",
802                                action="store_false", default=True)
803
804 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
805                             dest="enabled_hypervisors",
806                             help="Comma-separated list of hypervisors",
807                             type="string", default=None)
808
809 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
810                             type="keyval", default={},
811                             help="NIC parameters")
812
813 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
814                          dest="candidate_pool_size", type="int",
815                          help="Set the candidate pool size")
816
817 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
818                          help="Enables LVM and specifies the volume group"
819                          " name (cluster-wide) for disk allocation [xenvg]",
820                          metavar="VG", default=None)
821
822 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
823                           help="Destroy cluster", action="store_true")
824
825 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
826                           help="Skip node agreement check (dangerous)",
827                           action="store_true", default=False)
828
829 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
830                             help="Specify the mac prefix for the instance IP"
831                             " addresses, in the format XX:XX:XX",
832                             metavar="PREFIX",
833                             default=None)
834
835 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
836                                help="Specify the node interface (cluster-wide)"
837                                " on which the master IP address will be added "
838                                " [%s]" % constants.DEFAULT_BRIDGE,
839                                metavar="NETDEV",
840                                default=constants.DEFAULT_BRIDGE)
841
842
843 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
844                                 help="Specify the default directory (cluster-"
845                                 "wide) for storing the file-based disks [%s]" %
846                                 constants.DEFAULT_FILE_STORAGE_DIR,
847                                 metavar="DIR",
848                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
849
850 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
851                                    help="Don't modify /etc/hosts",
852                                    action="store_false", default=True)
853
854 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
855                                     help="Don't initialize SSH keys",
856                                     action="store_false", default=True)
857
858 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
859                              help="Enable parseable error messages",
860                              action="store_true", default=False)
861
862 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
863                           help="Skip N+1 memory redundancy tests",
864                           action="store_true", default=False)
865
866 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
867                              help="Type of reboot: soft/hard/full",
868                              default=constants.INSTANCE_REBOOT_HARD,
869                              metavar="<REBOOT>",
870                              choices=list(constants.REBOOT_TYPES))
871
872 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
873                                     dest="ignore_secondaries",
874                                     default=False, action="store_true",
875                                     help="Ignore errors from secondaries")
876
877 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
878                             action="store_false", default=True,
879                             help="Don't shutdown the instance (unsafe)")
880
881 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
882                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
883                          help="Maximum time to wait")
884
885 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
886                          dest="shutdown_timeout", type="int",
887                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
888                          help="Maximum time to wait for instance shutdown")
889
890 EARLY_RELEASE_OPT = cli_option("--early-release",
891                                dest="early_release", default=False,
892                                action="store_true",
893                                help="Release the locks on the secondary"
894                                " node(s) early")
895
896 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
897                                   dest="new_cluster_cert",
898                                   default=False, action="store_true",
899                                   help="Generate a new cluster certificate")
900
901 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
902                            default=None,
903                            help="File containing new RAPI certificate")
904
905 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
906                                default=None, action="store_true",
907                                help=("Generate a new self-signed RAPI"
908                                      " certificate"))
909
910 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
911                                     dest="new_confd_hmac_key",
912                                     default=False, action="store_true",
913                                     help=("Create a new HMAC key for %s" %
914                                           constants.CONFD))
915
916 USE_REPL_NET_OPT = cli_option("--use-replication-network",
917                               dest="use_replication_network",
918                               help="Whether to use the replication network"
919                               " for talking to the nodes",
920                               action="store_true", default=False)
921
922 MAINTAIN_NODE_HEALTH_OPT = \
923     cli_option("--maintain-node-health", dest="maintain_node_health",
924                metavar=_YORNO, default=None, type="bool",
925                help="Configure the cluster to automatically maintain node"
926                " health, by shutting down unknown instances, shutting down"
927                " unknown DRBD devices, etc.")
928
929 IDENTIFY_DEFAULTS_OPT = \
930     cli_option("--identify-defaults", dest="identify_defaults",
931                default=False, action="store_true",
932                help="Identify which saved instance parameters are equal to"
933                " the current cluster defaults and set them as such, instead"
934                " of marking them as overridden")
935
936 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
937                          action="store", dest="uid_pool",
938                          help=("A list of user-ids or user-id"
939                                " ranges separated by commas"))
940
941 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
942                           action="store", dest="add_uids",
943                           help=("A list of user-ids or user-id"
944                                 " ranges separated by commas, to be"
945                                 " added to the user-id pool"))
946
947 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
948                              action="store", dest="remove_uids",
949                              help=("A list of user-ids or user-id"
950                                    " ranges separated by commas, to be"
951                                    " removed from the user-id pool"))
952
953
954 def _ParseArgs(argv, commands, aliases):
955   """Parser for the command line arguments.
956
957   This function parses the arguments and returns the function which
958   must be executed together with its (modified) arguments.
959
960   @param argv: the command line
961   @param commands: dictionary with special contents, see the design
962       doc for cmdline handling
963   @param aliases: dictionary with command aliases {'alias': 'target, ...}
964
965   """
966   if len(argv) == 0:
967     binary = "<command>"
968   else:
969     binary = argv[0].split("/")[-1]
970
971   if len(argv) > 1 and argv[1] == "--version":
972     ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
973     # Quit right away. That way we don't have to care about this special
974     # argument. optparse.py does it the same.
975     sys.exit(0)
976
977   if len(argv) < 2 or not (argv[1] in commands or
978                            argv[1] in aliases):
979     # let's do a nice thing
980     sortedcmds = commands.keys()
981     sortedcmds.sort()
982
983     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
984     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
985     ToStdout("")
986
987     # compute the max line length for cmd + usage
988     mlen = max([len(" %s" % cmd) for cmd in commands])
989     mlen = min(60, mlen) # should not get here...
990
991     # and format a nice command list
992     ToStdout("Commands:")
993     for cmd in sortedcmds:
994       cmdstr = " %s" % (cmd,)
995       help_text = commands[cmd][4]
996       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
997       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
998       for line in help_lines:
999         ToStdout("%-*s   %s", mlen, "", line)
1000
1001     ToStdout("")
1002
1003     return None, None, None
1004
1005   # get command, unalias it, and look it up in commands
1006   cmd = argv.pop(1)
1007   if cmd in aliases:
1008     if cmd in commands:
1009       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1010                                    " command" % cmd)
1011
1012     if aliases[cmd] not in commands:
1013       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1014                                    " command '%s'" % (cmd, aliases[cmd]))
1015
1016     cmd = aliases[cmd]
1017
1018   func, args_def, parser_opts, usage, description = commands[cmd]
1019   parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1020                         description=description,
1021                         formatter=TitledHelpFormatter(),
1022                         usage="%%prog %s %s" % (cmd, usage))
1023   parser.disable_interspersed_args()
1024   options, args = parser.parse_args()
1025
1026   if not _CheckArguments(cmd, args_def, args):
1027     return None, None, None
1028
1029   return func, options, args
1030
1031
1032 def _CheckArguments(cmd, args_def, args):
1033   """Verifies the arguments using the argument definition.
1034
1035   Algorithm:
1036
1037     1. Abort with error if values specified by user but none expected.
1038
1039     1. For each argument in definition
1040
1041       1. Keep running count of minimum number of values (min_count)
1042       1. Keep running count of maximum number of values (max_count)
1043       1. If it has an unlimited number of values
1044
1045         1. Abort with error if it's not the last argument in the definition
1046
1047     1. If last argument has limited number of values
1048
1049       1. Abort with error if number of values doesn't match or is too large
1050
1051     1. Abort with error if user didn't pass enough values (min_count)
1052
1053   """
1054   if args and not args_def:
1055     ToStderr("Error: Command %s expects no arguments", cmd)
1056     return False
1057
1058   min_count = None
1059   max_count = None
1060   check_max = None
1061
1062   last_idx = len(args_def) - 1
1063
1064   for idx, arg in enumerate(args_def):
1065     if min_count is None:
1066       min_count = arg.min
1067     elif arg.min is not None:
1068       min_count += arg.min
1069
1070     if max_count is None:
1071       max_count = arg.max
1072     elif arg.max is not None:
1073       max_count += arg.max
1074
1075     if idx == last_idx:
1076       check_max = (arg.max is not None)
1077
1078     elif arg.max is None:
1079       raise errors.ProgrammerError("Only the last argument can have max=None")
1080
1081   if check_max:
1082     # Command with exact number of arguments
1083     if (min_count is not None and max_count is not None and
1084         min_count == max_count and len(args) != min_count):
1085       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1086       return False
1087
1088     # Command with limited number of arguments
1089     if max_count is not None and len(args) > max_count:
1090       ToStderr("Error: Command %s expects only %d argument(s)",
1091                cmd, max_count)
1092       return False
1093
1094   # Command with some required arguments
1095   if min_count is not None and len(args) < min_count:
1096     ToStderr("Error: Command %s expects at least %d argument(s)",
1097              cmd, min_count)
1098     return False
1099
1100   return True
1101
1102
1103 def SplitNodeOption(value):
1104   """Splits the value of a --node option.
1105
1106   """
1107   if value and ':' in value:
1108     return value.split(':', 1)
1109   else:
1110     return (value, None)
1111
1112
1113 def CalculateOSNames(os_name, os_variants):
1114   """Calculates all the names an OS can be called, according to its variants.
1115
1116   @type os_name: string
1117   @param os_name: base name of the os
1118   @type os_variants: list or None
1119   @param os_variants: list of supported variants
1120   @rtype: list
1121   @return: list of valid names
1122
1123   """
1124   if os_variants:
1125     return ['%s+%s' % (os_name, v) for v in os_variants]
1126   else:
1127     return [os_name]
1128
1129
1130 def UsesRPC(fn):
1131   def wrapper(*args, **kwargs):
1132     rpc.Init()
1133     try:
1134       return fn(*args, **kwargs)
1135     finally:
1136       rpc.Shutdown()
1137   return wrapper
1138
1139
1140 def AskUser(text, choices=None):
1141   """Ask the user a question.
1142
1143   @param text: the question to ask
1144
1145   @param choices: list with elements tuples (input_char, return_value,
1146       description); if not given, it will default to: [('y', True,
1147       'Perform the operation'), ('n', False, 'Do no do the operation')];
1148       note that the '?' char is reserved for help
1149
1150   @return: one of the return values from the choices list; if input is
1151       not possible (i.e. not running with a tty, we return the last
1152       entry from the list
1153
1154   """
1155   if choices is None:
1156     choices = [('y', True, 'Perform the operation'),
1157                ('n', False, 'Do not perform the operation')]
1158   if not choices or not isinstance(choices, list):
1159     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1160   for entry in choices:
1161     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1162       raise errors.ProgrammerError("Invalid choices element to AskUser")
1163
1164   answer = choices[-1][1]
1165   new_text = []
1166   for line in text.splitlines():
1167     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1168   text = "\n".join(new_text)
1169   try:
1170     f = file("/dev/tty", "a+")
1171   except IOError:
1172     return answer
1173   try:
1174     chars = [entry[0] for entry in choices]
1175     chars[-1] = "[%s]" % chars[-1]
1176     chars.append('?')
1177     maps = dict([(entry[0], entry[1]) for entry in choices])
1178     while True:
1179       f.write(text)
1180       f.write('\n')
1181       f.write("/".join(chars))
1182       f.write(": ")
1183       line = f.readline(2).strip().lower()
1184       if line in maps:
1185         answer = maps[line]
1186         break
1187       elif line == '?':
1188         for entry in choices:
1189           f.write(" %s - %s\n" % (entry[0], entry[2]))
1190         f.write("\n")
1191         continue
1192   finally:
1193     f.close()
1194   return answer
1195
1196
1197 class JobSubmittedException(Exception):
1198   """Job was submitted, client should exit.
1199
1200   This exception has one argument, the ID of the job that was
1201   submitted. The handler should print this ID.
1202
1203   This is not an error, just a structured way to exit from clients.
1204
1205   """
1206
1207
1208 def SendJob(ops, cl=None):
1209   """Function to submit an opcode without waiting for the results.
1210
1211   @type ops: list
1212   @param ops: list of opcodes
1213   @type cl: luxi.Client
1214   @param cl: the luxi client to use for communicating with the master;
1215              if None, a new client will be created
1216
1217   """
1218   if cl is None:
1219     cl = GetClient()
1220
1221   job_id = cl.SubmitJob(ops)
1222
1223   return job_id
1224
1225
1226 def GenericPollJob(job_id, cbs, report_cbs):
1227   """Generic job-polling function.
1228
1229   @type job_id: number
1230   @param job_id: Job ID
1231   @type cbs: Instance of L{JobPollCbBase}
1232   @param cbs: Data callbacks
1233   @type report_cbs: Instance of L{JobPollReportCbBase}
1234   @param report_cbs: Reporting callbacks
1235
1236   """
1237   prev_job_info = None
1238   prev_logmsg_serial = None
1239
1240   status = None
1241
1242   while True:
1243     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1244                                       prev_logmsg_serial)
1245     if not result:
1246       # job not found, go away!
1247       raise errors.JobLost("Job with id %s lost" % job_id)
1248
1249     if result == constants.JOB_NOTCHANGED:
1250       report_cbs.ReportNotChanged(job_id, status)
1251
1252       # Wait again
1253       continue
1254
1255     # Split result, a tuple of (field values, log entries)
1256     (job_info, log_entries) = result
1257     (status, ) = job_info
1258
1259     if log_entries:
1260       for log_entry in log_entries:
1261         (serial, timestamp, log_type, message) = log_entry
1262         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1263                                     log_type, message)
1264         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1265
1266     # TODO: Handle canceled and archived jobs
1267     elif status in (constants.JOB_STATUS_SUCCESS,
1268                     constants.JOB_STATUS_ERROR,
1269                     constants.JOB_STATUS_CANCELING,
1270                     constants.JOB_STATUS_CANCELED):
1271       break
1272
1273     prev_job_info = job_info
1274
1275   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1276   if not jobs:
1277     raise errors.JobLost("Job with id %s lost" % job_id)
1278
1279   status, opstatus, result = jobs[0]
1280
1281   if status == constants.JOB_STATUS_SUCCESS:
1282     return result
1283
1284   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1285     raise errors.OpExecError("Job was canceled")
1286
1287   has_ok = False
1288   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1289     if status == constants.OP_STATUS_SUCCESS:
1290       has_ok = True
1291     elif status == constants.OP_STATUS_ERROR:
1292       errors.MaybeRaise(msg)
1293
1294       if has_ok:
1295         raise errors.OpExecError("partial failure (opcode %d): %s" %
1296                                  (idx, msg))
1297
1298       raise errors.OpExecError(str(msg))
1299
1300   # default failure mode
1301   raise errors.OpExecError(result)
1302
1303
1304 class JobPollCbBase:
1305   """Base class for L{GenericPollJob} callbacks.
1306
1307   """
1308   def __init__(self):
1309     """Initializes this class.
1310
1311     """
1312
1313   def WaitForJobChangeOnce(self, job_id, fields,
1314                            prev_job_info, prev_log_serial):
1315     """Waits for changes on a job.
1316
1317     """
1318     raise NotImplementedError()
1319
1320   def QueryJobs(self, job_ids, fields):
1321     """Returns the selected fields for the selected job IDs.
1322
1323     @type job_ids: list of numbers
1324     @param job_ids: Job IDs
1325     @type fields: list of strings
1326     @param fields: Fields
1327
1328     """
1329     raise NotImplementedError()
1330
1331
1332 class JobPollReportCbBase:
1333   """Base class for L{GenericPollJob} reporting callbacks.
1334
1335   """
1336   def __init__(self):
1337     """Initializes this class.
1338
1339     """
1340
1341   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1342     """Handles a log message.
1343
1344     """
1345     raise NotImplementedError()
1346
1347   def ReportNotChanged(self, job_id, status):
1348     """Called for if a job hasn't changed in a while.
1349
1350     @type job_id: number
1351     @param job_id: Job ID
1352     @type status: string or None
1353     @param status: Job status if available
1354
1355     """
1356     raise NotImplementedError()
1357
1358
1359 class _LuxiJobPollCb(JobPollCbBase):
1360   def __init__(self, cl):
1361     """Initializes this class.
1362
1363     """
1364     JobPollCbBase.__init__(self)
1365     self.cl = cl
1366
1367   def WaitForJobChangeOnce(self, job_id, fields,
1368                            prev_job_info, prev_log_serial):
1369     """Waits for changes on a job.
1370
1371     """
1372     return self.cl.WaitForJobChangeOnce(job_id, fields,
1373                                         prev_job_info, prev_log_serial)
1374
1375   def QueryJobs(self, job_ids, fields):
1376     """Returns the selected fields for the selected job IDs.
1377
1378     """
1379     return self.cl.QueryJobs(job_ids, fields)
1380
1381
1382 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1383   def __init__(self, feedback_fn):
1384     """Initializes this class.
1385
1386     """
1387     JobPollReportCbBase.__init__(self)
1388
1389     self.feedback_fn = feedback_fn
1390
1391     assert callable(feedback_fn)
1392
1393   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1394     """Handles a log message.
1395
1396     """
1397     self.feedback_fn((timestamp, log_type, log_msg))
1398
1399   def ReportNotChanged(self, job_id, status):
1400     """Called if a job hasn't changed in a while.
1401
1402     """
1403     # Ignore
1404
1405
1406 class StdioJobPollReportCb(JobPollReportCbBase):
1407   def __init__(self):
1408     """Initializes this class.
1409
1410     """
1411     JobPollReportCbBase.__init__(self)
1412
1413     self.notified_queued = False
1414     self.notified_waitlock = False
1415
1416   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1417     """Handles a log message.
1418
1419     """
1420     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1421              utils.SafeEncode(log_msg))
1422
1423   def ReportNotChanged(self, job_id, status):
1424     """Called if a job hasn't changed in a while.
1425
1426     """
1427     if status is None:
1428       return
1429
1430     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1431       ToStderr("Job %s is waiting in queue", job_id)
1432       self.notified_queued = True
1433
1434     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1435       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1436       self.notified_waitlock = True
1437
1438
1439 def PollJob(job_id, cl=None, feedback_fn=None):
1440   """Function to poll for the result of a job.
1441
1442   @type job_id: job identified
1443   @param job_id: the job to poll for results
1444   @type cl: luxi.Client
1445   @param cl: the luxi client to use for communicating with the master;
1446              if None, a new client will be created
1447
1448   """
1449   if cl is None:
1450     cl = GetClient()
1451
1452   if feedback_fn:
1453     reporter = FeedbackFnJobPollReportCb(feedback_fn)
1454   else:
1455     reporter = StdioJobPollReportCb()
1456
1457   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1458
1459
1460 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1461   """Legacy function to submit an opcode.
1462
1463   This is just a simple wrapper over the construction of the processor
1464   instance. It should be extended to better handle feedback and
1465   interaction functions.
1466
1467   """
1468   if cl is None:
1469     cl = GetClient()
1470
1471   SetGenericOpcodeOpts([op], opts)
1472
1473   job_id = SendJob([op], cl)
1474
1475   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1476
1477   return op_results[0]
1478
1479
1480 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1481   """Wrapper around SubmitOpCode or SendJob.
1482
1483   This function will decide, based on the 'opts' parameter, whether to
1484   submit and wait for the result of the opcode (and return it), or
1485   whether to just send the job and print its identifier. It is used in
1486   order to simplify the implementation of the '--submit' option.
1487
1488   It will also process the opcodes if we're sending the via SendJob
1489   (otherwise SubmitOpCode does it).
1490
1491   """
1492   if opts and opts.submit_only:
1493     job = [op]
1494     SetGenericOpcodeOpts(job, opts)
1495     job_id = SendJob(job, cl=cl)
1496     raise JobSubmittedException(job_id)
1497   else:
1498     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1499
1500
1501 def SetGenericOpcodeOpts(opcode_list, options):
1502   """Processor for generic options.
1503
1504   This function updates the given opcodes based on generic command
1505   line options (like debug, dry-run, etc.).
1506
1507   @param opcode_list: list of opcodes
1508   @param options: command line options or None
1509   @return: None (in-place modification)
1510
1511   """
1512   if not options:
1513     return
1514   for op in opcode_list:
1515     op.dry_run = options.dry_run
1516     op.debug_level = options.debug
1517
1518
1519 def GetClient():
1520   # TODO: Cache object?
1521   try:
1522     client = luxi.Client()
1523   except luxi.NoMasterError:
1524     ss = ssconf.SimpleStore()
1525
1526     # Try to read ssconf file
1527     try:
1528       ss.GetMasterNode()
1529     except errors.ConfigurationError:
1530       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1531                                  " not part of a cluster")
1532
1533     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1534     if master != myself:
1535       raise errors.OpPrereqError("This is not the master node, please connect"
1536                                  " to node '%s' and rerun the command" %
1537                                  master)
1538     raise
1539   return client
1540
1541
1542 def FormatError(err):
1543   """Return a formatted error message for a given error.
1544
1545   This function takes an exception instance and returns a tuple
1546   consisting of two values: first, the recommended exit code, and
1547   second, a string describing the error message (not
1548   newline-terminated).
1549
1550   """
1551   retcode = 1
1552   obuf = StringIO()
1553   msg = str(err)
1554   if isinstance(err, errors.ConfigurationError):
1555     txt = "Corrupt configuration file: %s" % msg
1556     logging.error(txt)
1557     obuf.write(txt + "\n")
1558     obuf.write("Aborting.")
1559     retcode = 2
1560   elif isinstance(err, errors.HooksAbort):
1561     obuf.write("Failure: hooks execution failed:\n")
1562     for node, script, out in err.args[0]:
1563       if out:
1564         obuf.write("  node: %s, script: %s, output: %s\n" %
1565                    (node, script, out))
1566       else:
1567         obuf.write("  node: %s, script: %s (no output)\n" %
1568                    (node, script))
1569   elif isinstance(err, errors.HooksFailure):
1570     obuf.write("Failure: hooks general failure: %s" % msg)
1571   elif isinstance(err, errors.ResolverError):
1572     this_host = utils.HostInfo.SysName()
1573     if err.args[0] == this_host:
1574       msg = "Failure: can't resolve my own hostname ('%s')"
1575     else:
1576       msg = "Failure: can't resolve hostname '%s'"
1577     obuf.write(msg % err.args[0])
1578   elif isinstance(err, errors.OpPrereqError):
1579     if len(err.args) == 2:
1580       obuf.write("Failure: prerequisites not met for this"
1581                " operation:\nerror type: %s, error details:\n%s" %
1582                  (err.args[1], err.args[0]))
1583     else:
1584       obuf.write("Failure: prerequisites not met for this"
1585                  " operation:\n%s" % msg)
1586   elif isinstance(err, errors.OpExecError):
1587     obuf.write("Failure: command execution error:\n%s" % msg)
1588   elif isinstance(err, errors.TagError):
1589     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1590   elif isinstance(err, errors.JobQueueDrainError):
1591     obuf.write("Failure: the job queue is marked for drain and doesn't"
1592                " accept new requests\n")
1593   elif isinstance(err, errors.JobQueueFull):
1594     obuf.write("Failure: the job queue is full and doesn't accept new"
1595                " job submissions until old jobs are archived\n")
1596   elif isinstance(err, errors.TypeEnforcementError):
1597     obuf.write("Parameter Error: %s" % msg)
1598   elif isinstance(err, errors.ParameterError):
1599     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1600   elif isinstance(err, errors.GenericError):
1601     obuf.write("Unhandled Ganeti error: %s" % msg)
1602   elif isinstance(err, luxi.NoMasterError):
1603     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1604                " and listening for connections?")
1605   elif isinstance(err, luxi.TimeoutError):
1606     obuf.write("Timeout while talking to the master daemon. Error:\n"
1607                "%s" % msg)
1608   elif isinstance(err, luxi.ProtocolError):
1609     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1610                "%s" % msg)
1611   elif isinstance(err, JobSubmittedException):
1612     obuf.write("JobID: %s\n" % err.args[0])
1613     retcode = 0
1614   else:
1615     obuf.write("Unhandled exception: %s" % msg)
1616   return retcode, obuf.getvalue().rstrip('\n')
1617
1618
1619 def GenericMain(commands, override=None, aliases=None):
1620   """Generic main function for all the gnt-* commands.
1621
1622   Arguments:
1623     - commands: a dictionary with a special structure, see the design doc
1624                 for command line handling.
1625     - override: if not None, we expect a dictionary with keys that will
1626                 override command line options; this can be used to pass
1627                 options from the scripts to generic functions
1628     - aliases: dictionary with command aliases {'alias': 'target, ...}
1629
1630   """
1631   # save the program name and the entire command line for later logging
1632   if sys.argv:
1633     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1634     if len(sys.argv) >= 2:
1635       binary += " " + sys.argv[1]
1636       old_cmdline = " ".join(sys.argv[2:])
1637     else:
1638       old_cmdline = ""
1639   else:
1640     binary = "<unknown program>"
1641     old_cmdline = ""
1642
1643   if aliases is None:
1644     aliases = {}
1645
1646   try:
1647     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1648   except errors.ParameterError, err:
1649     result, err_msg = FormatError(err)
1650     ToStderr(err_msg)
1651     return 1
1652
1653   if func is None: # parse error
1654     return 1
1655
1656   if override is not None:
1657     for key, val in override.iteritems():
1658       setattr(options, key, val)
1659
1660   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1661                      stderr_logging=True, program=binary)
1662
1663   if old_cmdline:
1664     logging.info("run with arguments '%s'", old_cmdline)
1665   else:
1666     logging.info("run with no arguments")
1667
1668   try:
1669     result = func(options, args)
1670   except (errors.GenericError, luxi.ProtocolError,
1671           JobSubmittedException), err:
1672     result, err_msg = FormatError(err)
1673     logging.exception("Error during command processing")
1674     ToStderr(err_msg)
1675
1676   return result
1677
1678
1679 def GenericInstanceCreate(mode, opts, args):
1680   """Add an instance to the cluster via either creation or import.
1681
1682   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1683   @param opts: the command line options selected by the user
1684   @type args: list
1685   @param args: should contain only one element, the new instance name
1686   @rtype: int
1687   @return: the desired exit code
1688
1689   """
1690   instance = args[0]
1691
1692   (pnode, snode) = SplitNodeOption(opts.node)
1693
1694   hypervisor = None
1695   hvparams = {}
1696   if opts.hypervisor:
1697     hypervisor, hvparams = opts.hypervisor
1698
1699   if opts.nics:
1700     try:
1701       nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1702     except ValueError, err:
1703       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1704     nics = [{}] * nic_max
1705     for nidx, ndict in opts.nics:
1706       nidx = int(nidx)
1707       if not isinstance(ndict, dict):
1708         msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1709         raise errors.OpPrereqError(msg)
1710       nics[nidx] = ndict
1711   elif opts.no_nics:
1712     # no nics
1713     nics = []
1714   elif mode == constants.INSTANCE_CREATE:
1715     # default of one nic, all auto
1716     nics = [{}]
1717   else:
1718     # mode == import
1719     nics = []
1720
1721   if opts.disk_template == constants.DT_DISKLESS:
1722     if opts.disks or opts.sd_size is not None:
1723       raise errors.OpPrereqError("Diskless instance but disk"
1724                                  " information passed")
1725     disks = []
1726   else:
1727     if (not opts.disks and not opts.sd_size
1728         and mode == constants.INSTANCE_CREATE):
1729       raise errors.OpPrereqError("No disk information specified")
1730     if opts.disks and opts.sd_size is not None:
1731       raise errors.OpPrereqError("Please use either the '--disk' or"
1732                                  " '-s' option")
1733     if opts.sd_size is not None:
1734       opts.disks = [(0, {"size": opts.sd_size})]
1735
1736     if opts.disks:
1737       try:
1738         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1739       except ValueError, err:
1740         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1741       disks = [{}] * disk_max
1742     else:
1743       disks = []
1744     for didx, ddict in opts.disks:
1745       didx = int(didx)
1746       if not isinstance(ddict, dict):
1747         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1748         raise errors.OpPrereqError(msg)
1749       elif "size" in ddict:
1750         if "adopt" in ddict:
1751           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1752                                      " (disk %d)" % didx)
1753         try:
1754           ddict["size"] = utils.ParseUnit(ddict["size"])
1755         except ValueError, err:
1756           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1757                                      (didx, err))
1758       elif "adopt" in ddict:
1759         if mode == constants.INSTANCE_IMPORT:
1760           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1761                                      " import")
1762         ddict["size"] = 0
1763       else:
1764         raise errors.OpPrereqError("Missing size or adoption source for"
1765                                    " disk %d" % didx)
1766       disks[didx] = ddict
1767
1768   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1769   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1770
1771   if mode == constants.INSTANCE_CREATE:
1772     start = opts.start
1773     os_type = opts.os
1774     src_node = None
1775     src_path = None
1776     no_install = opts.no_install
1777     identify_defaults = False
1778   elif mode == constants.INSTANCE_IMPORT:
1779     start = False
1780     os_type = None
1781     src_node = opts.src_node
1782     src_path = opts.src_dir
1783     no_install = None
1784     identify_defaults = opts.identify_defaults
1785   else:
1786     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1787
1788   op = opcodes.OpCreateInstance(instance_name=instance,
1789                                 disks=disks,
1790                                 disk_template=opts.disk_template,
1791                                 nics=nics,
1792                                 pnode=pnode, snode=snode,
1793                                 ip_check=opts.ip_check,
1794                                 name_check=opts.name_check,
1795                                 wait_for_sync=opts.wait_for_sync,
1796                                 file_storage_dir=opts.file_storage_dir,
1797                                 file_driver=opts.file_driver,
1798                                 iallocator=opts.iallocator,
1799                                 hypervisor=hypervisor,
1800                                 hvparams=hvparams,
1801                                 beparams=opts.beparams,
1802                                 mode=mode,
1803                                 start=start,
1804                                 os_type=os_type,
1805                                 src_node=src_node,
1806                                 src_path=src_path,
1807                                 no_install=no_install,
1808                                 identify_defaults=identify_defaults)
1809
1810   SubmitOrSend(op, opts)
1811   return 0
1812
1813
1814 class _RunWhileClusterStoppedHelper:
1815   """Helper class for L{RunWhileClusterStopped} to simplify state management
1816
1817   """
1818   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1819     """Initializes this class.
1820
1821     @type feedback_fn: callable
1822     @param feedback_fn: Feedback function
1823     @type cluster_name: string
1824     @param cluster_name: Cluster name
1825     @type master_node: string
1826     @param master_node Master node name
1827     @type online_nodes: list
1828     @param online_nodes: List of names of online nodes
1829
1830     """
1831     self.feedback_fn = feedback_fn
1832     self.cluster_name = cluster_name
1833     self.master_node = master_node
1834     self.online_nodes = online_nodes
1835
1836     self.ssh = ssh.SshRunner(self.cluster_name)
1837
1838     self.nonmaster_nodes = [name for name in online_nodes
1839                             if name != master_node]
1840
1841     assert self.master_node not in self.nonmaster_nodes
1842
1843   def _RunCmd(self, node_name, cmd):
1844     """Runs a command on the local or a remote machine.
1845
1846     @type node_name: string
1847     @param node_name: Machine name
1848     @type cmd: list
1849     @param cmd: Command
1850
1851     """
1852     if node_name is None or node_name == self.master_node:
1853       # No need to use SSH
1854       result = utils.RunCmd(cmd)
1855     else:
1856       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1857
1858     if result.failed:
1859       errmsg = ["Failed to run command %s" % result.cmd]
1860       if node_name:
1861         errmsg.append("on node %s" % node_name)
1862       errmsg.append(": exitcode %s and error %s" %
1863                     (result.exit_code, result.output))
1864       raise errors.OpExecError(" ".join(errmsg))
1865
1866   def Call(self, fn, *args):
1867     """Call function while all daemons are stopped.
1868
1869     @type fn: callable
1870     @param fn: Function to be called
1871
1872     """
1873     # Pause watcher by acquiring an exclusive lock on watcher state file
1874     self.feedback_fn("Blocking watcher")
1875     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1876     try:
1877       # TODO: Currently, this just blocks. There's no timeout.
1878       # TODO: Should it be a shared lock?
1879       watcher_block.Exclusive(blocking=True)
1880
1881       # Stop master daemons, so that no new jobs can come in and all running
1882       # ones are finished
1883       self.feedback_fn("Stopping master daemons")
1884       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1885       try:
1886         # Stop daemons on all nodes
1887         for node_name in self.online_nodes:
1888           self.feedback_fn("Stopping daemons on %s" % node_name)
1889           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1890
1891         # All daemons are shut down now
1892         try:
1893           return fn(self, *args)
1894         except Exception, err:
1895           _, errmsg = FormatError(err)
1896           logging.exception("Caught exception")
1897           self.feedback_fn(errmsg)
1898           raise
1899       finally:
1900         # Start cluster again, master node last
1901         for node_name in self.nonmaster_nodes + [self.master_node]:
1902           self.feedback_fn("Starting daemons on %s" % node_name)
1903           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1904     finally:
1905       # Resume watcher
1906       watcher_block.Close()
1907
1908
1909 def RunWhileClusterStopped(feedback_fn, fn, *args):
1910   """Calls a function while all cluster daemons are stopped.
1911
1912   @type feedback_fn: callable
1913   @param feedback_fn: Feedback function
1914   @type fn: callable
1915   @param fn: Function to be called when daemons are stopped
1916
1917   """
1918   feedback_fn("Gathering cluster information")
1919
1920   # This ensures we're running on the master daemon
1921   cl = GetClient()
1922
1923   (cluster_name, master_node) = \
1924     cl.QueryConfigValues(["cluster_name", "master_node"])
1925
1926   online_nodes = GetOnlineNodes([], cl=cl)
1927
1928   # Don't keep a reference to the client. The master daemon will go away.
1929   del cl
1930
1931   assert master_node in online_nodes
1932
1933   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1934                                        online_nodes).Call(fn, *args)
1935
1936
1937 def GenerateTable(headers, fields, separator, data,
1938                   numfields=None, unitfields=None,
1939                   units=None):
1940   """Prints a table with headers and different fields.
1941
1942   @type headers: dict
1943   @param headers: dictionary mapping field names to headers for
1944       the table
1945   @type fields: list
1946   @param fields: the field names corresponding to each row in
1947       the data field
1948   @param separator: the separator to be used; if this is None,
1949       the default 'smart' algorithm is used which computes optimal
1950       field width, otherwise just the separator is used between
1951       each field
1952   @type data: list
1953   @param data: a list of lists, each sublist being one row to be output
1954   @type numfields: list
1955   @param numfields: a list with the fields that hold numeric
1956       values and thus should be right-aligned
1957   @type unitfields: list
1958   @param unitfields: a list with the fields that hold numeric
1959       values that should be formatted with the units field
1960   @type units: string or None
1961   @param units: the units we should use for formatting, or None for
1962       automatic choice (human-readable for non-separator usage, otherwise
1963       megabytes); this is a one-letter string
1964
1965   """
1966   if units is None:
1967     if separator:
1968       units = "m"
1969     else:
1970       units = "h"
1971
1972   if numfields is None:
1973     numfields = []
1974   if unitfields is None:
1975     unitfields = []
1976
1977   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
1978   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
1979
1980   format_fields = []
1981   for field in fields:
1982     if headers and field not in headers:
1983       # TODO: handle better unknown fields (either revert to old
1984       # style of raising exception, or deal more intelligently with
1985       # variable fields)
1986       headers[field] = field
1987     if separator is not None:
1988       format_fields.append("%s")
1989     elif numfields.Matches(field):
1990       format_fields.append("%*s")
1991     else:
1992       format_fields.append("%-*s")
1993
1994   if separator is None:
1995     mlens = [0 for name in fields]
1996     format = ' '.join(format_fields)
1997   else:
1998     format = separator.replace("%", "%%").join(format_fields)
1999
2000   for row in data:
2001     if row is None:
2002       continue
2003     for idx, val in enumerate(row):
2004       if unitfields.Matches(fields[idx]):
2005         try:
2006           val = int(val)
2007         except (TypeError, ValueError):
2008           pass
2009         else:
2010           val = row[idx] = utils.FormatUnit(val, units)
2011       val = row[idx] = str(val)
2012       if separator is None:
2013         mlens[idx] = max(mlens[idx], len(val))
2014
2015   result = []
2016   if headers:
2017     args = []
2018     for idx, name in enumerate(fields):
2019       hdr = headers[name]
2020       if separator is None:
2021         mlens[idx] = max(mlens[idx], len(hdr))
2022         args.append(mlens[idx])
2023       args.append(hdr)
2024     result.append(format % tuple(args))
2025
2026   if separator is None:
2027     assert len(mlens) == len(fields)
2028
2029     if fields and not numfields.Matches(fields[-1]):
2030       mlens[-1] = 0
2031
2032   for line in data:
2033     args = []
2034     if line is None:
2035       line = ['-' for _ in fields]
2036     for idx in range(len(fields)):
2037       if separator is None:
2038         args.append(mlens[idx])
2039       args.append(line[idx])
2040     result.append(format % tuple(args))
2041
2042   return result
2043
2044
2045 def FormatTimestamp(ts):
2046   """Formats a given timestamp.
2047
2048   @type ts: timestamp
2049   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2050
2051   @rtype: string
2052   @return: a string with the formatted timestamp
2053
2054   """
2055   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2056     return '?'
2057   sec, usec = ts
2058   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2059
2060
2061 def ParseTimespec(value):
2062   """Parse a time specification.
2063
2064   The following suffixed will be recognized:
2065
2066     - s: seconds
2067     - m: minutes
2068     - h: hours
2069     - d: day
2070     - w: weeks
2071
2072   Without any suffix, the value will be taken to be in seconds.
2073
2074   """
2075   value = str(value)
2076   if not value:
2077     raise errors.OpPrereqError("Empty time specification passed")
2078   suffix_map = {
2079     's': 1,
2080     'm': 60,
2081     'h': 3600,
2082     'd': 86400,
2083     'w': 604800,
2084     }
2085   if value[-1] not in suffix_map:
2086     try:
2087       value = int(value)
2088     except (TypeError, ValueError):
2089       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2090   else:
2091     multiplier = suffix_map[value[-1]]
2092     value = value[:-1]
2093     if not value: # no data left after stripping the suffix
2094       raise errors.OpPrereqError("Invalid time specification (only"
2095                                  " suffix passed)")
2096     try:
2097       value = int(value) * multiplier
2098     except (TypeError, ValueError):
2099       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2100   return value
2101
2102
2103 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2104                    filter_master=False):
2105   """Returns the names of online nodes.
2106
2107   This function will also log a warning on stderr with the names of
2108   the online nodes.
2109
2110   @param nodes: if not empty, use only this subset of nodes (minus the
2111       offline ones)
2112   @param cl: if not None, luxi client to use
2113   @type nowarn: boolean
2114   @param nowarn: by default, this function will output a note with the
2115       offline nodes that are skipped; if this parameter is True the
2116       note is not displayed
2117   @type secondary_ips: boolean
2118   @param secondary_ips: if True, return the secondary IPs instead of the
2119       names, useful for doing network traffic over the replication interface
2120       (if any)
2121   @type filter_master: boolean
2122   @param filter_master: if True, do not return the master node in the list
2123       (useful in coordination with secondary_ips where we cannot check our
2124       node name against the list)
2125
2126   """
2127   if cl is None:
2128     cl = GetClient()
2129
2130   if secondary_ips:
2131     name_idx = 2
2132   else:
2133     name_idx = 0
2134
2135   if filter_master:
2136     master_node = cl.QueryConfigValues(["master_node"])[0]
2137     filter_fn = lambda x: x != master_node
2138   else:
2139     filter_fn = lambda _: True
2140
2141   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2142                          use_locking=False)
2143   offline = [row[0] for row in result if row[1]]
2144   if offline and not nowarn:
2145     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2146   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2147
2148
2149 def _ToStream(stream, txt, *args):
2150   """Write a message to a stream, bypassing the logging system
2151
2152   @type stream: file object
2153   @param stream: the file to which we should write
2154   @type txt: str
2155   @param txt: the message
2156
2157   """
2158   if args:
2159     args = tuple(args)
2160     stream.write(txt % args)
2161   else:
2162     stream.write(txt)
2163   stream.write('\n')
2164   stream.flush()
2165
2166
2167 def ToStdout(txt, *args):
2168   """Write a message to stdout only, bypassing the logging system
2169
2170   This is just a wrapper over _ToStream.
2171
2172   @type txt: str
2173   @param txt: the message
2174
2175   """
2176   _ToStream(sys.stdout, txt, *args)
2177
2178
2179 def ToStderr(txt, *args):
2180   """Write a message to stderr only, bypassing the logging system
2181
2182   This is just a wrapper over _ToStream.
2183
2184   @type txt: str
2185   @param txt: the message
2186
2187   """
2188   _ToStream(sys.stderr, txt, *args)
2189
2190
2191 class JobExecutor(object):
2192   """Class which manages the submission and execution of multiple jobs.
2193
2194   Note that instances of this class should not be reused between
2195   GetResults() calls.
2196
2197   """
2198   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2199     self.queue = []
2200     if cl is None:
2201       cl = GetClient()
2202     self.cl = cl
2203     self.verbose = verbose
2204     self.jobs = []
2205     self.opts = opts
2206     self.feedback_fn = feedback_fn
2207
2208   def QueueJob(self, name, *ops):
2209     """Record a job for later submit.
2210
2211     @type name: string
2212     @param name: a description of the job, will be used in WaitJobSet
2213     """
2214     SetGenericOpcodeOpts(ops, self.opts)
2215     self.queue.append((name, ops))
2216
2217   def SubmitPending(self):
2218     """Submit all pending jobs.
2219
2220     """
2221     results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2222     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2223                                                             self.queue)):
2224       self.jobs.append((idx, status, data, name))
2225
2226   def _ChooseJob(self):
2227     """Choose a non-waiting/queued job to poll next.
2228
2229     """
2230     assert self.jobs, "_ChooseJob called with empty job list"
2231
2232     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2233     assert result
2234
2235     for job_data, status in zip(self.jobs, result):
2236       if status[0] in (constants.JOB_STATUS_QUEUED,
2237                     constants.JOB_STATUS_WAITLOCK,
2238                     constants.JOB_STATUS_CANCELING):
2239         # job is still waiting
2240         continue
2241       # good candidate found
2242       self.jobs.remove(job_data)
2243       return job_data
2244
2245     # no job found
2246     return self.jobs.pop(0)
2247
2248   def GetResults(self):
2249     """Wait for and return the results of all jobs.
2250
2251     @rtype: list
2252     @return: list of tuples (success, job results), in the same order
2253         as the submitted jobs; if a job has failed, instead of the result
2254         there will be the error message
2255
2256     """
2257     if not self.jobs:
2258       self.SubmitPending()
2259     results = []
2260     if self.verbose:
2261       ok_jobs = [row[2] for row in self.jobs if row[1]]
2262       if ok_jobs:
2263         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2264
2265     # first, remove any non-submitted jobs
2266     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2267     for idx, _, jid, name in failures:
2268       ToStderr("Failed to submit job for %s: %s", name, jid)
2269       results.append((idx, False, jid))
2270
2271     while self.jobs:
2272       (idx, _, jid, name) = self._ChooseJob()
2273       ToStdout("Waiting for job %s for %s...", jid, name)
2274       try:
2275         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2276         success = True
2277       except (errors.GenericError, luxi.ProtocolError), err:
2278         _, job_result = FormatError(err)
2279         success = False
2280         # the error message will always be shown, verbose or not
2281         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2282
2283       results.append((idx, success, job_result))
2284
2285     # sort based on the index, then drop it
2286     results.sort()
2287     results = [i[1:] for i in results]
2288
2289     return results
2290
2291   def WaitOrShow(self, wait):
2292     """Wait for job results or only print the job IDs.
2293
2294     @type wait: boolean
2295     @param wait: whether to wait or not
2296
2297     """
2298     if wait:
2299       return self.GetResults()
2300     else:
2301       if not self.jobs:
2302         self.SubmitPending()
2303       for _, status, result, name in self.jobs:
2304         if status:
2305           ToStdout("%s: %s", result, name)
2306         else:
2307           ToStderr("Failure for %s: %s", name, result)