Add support for modifying instance OS parameters
[ganeti-local] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module dealing with command line parsing"""
23
24
25 import sys
26 import textwrap
27 import os.path
28 import time
29 import logging
30 from cStringIO import StringIO
31
32 from ganeti import utils
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import opcodes
36 from ganeti import luxi
37 from ganeti import ssconf
38 from ganeti import rpc
39 from ganeti import ssh
40 from ganeti import compat
41
42 from optparse import (OptionParser, TitledHelpFormatter,
43                       Option, OptionValueError)
44
45
46 __all__ = [
47   # Command line options
48   "ADD_UIDS_OPT",
49   "ALLOCATABLE_OPT",
50   "ALL_OPT",
51   "AUTO_PROMOTE_OPT",
52   "AUTO_REPLACE_OPT",
53   "BACKEND_OPT",
54   "CLEANUP_OPT",
55   "CLUSTER_DOMAIN_SECRET_OPT",
56   "CONFIRM_OPT",
57   "CP_SIZE_OPT",
58   "DEBUG_OPT",
59   "DEBUG_SIMERR_OPT",
60   "DISKIDX_OPT",
61   "DISK_OPT",
62   "DISK_TEMPLATE_OPT",
63   "DRAINED_OPT",
64   "EARLY_RELEASE_OPT",
65   "ENABLED_HV_OPT",
66   "ERROR_CODES_OPT",
67   "FIELDS_OPT",
68   "FILESTORE_DIR_OPT",
69   "FILESTORE_DRIVER_OPT",
70   "FORCE_OPT",
71   "FORCE_VARIANT_OPT",
72   "GLOBAL_FILEDIR_OPT",
73   "HVLIST_OPT",
74   "HVOPTS_OPT",
75   "HYPERVISOR_OPT",
76   "IALLOCATOR_OPT",
77   "IDENTIFY_DEFAULTS_OPT",
78   "IGNORE_CONSIST_OPT",
79   "IGNORE_FAILURES_OPT",
80   "IGNORE_REMOVE_FAILURES_OPT",
81   "IGNORE_SECONDARIES_OPT",
82   "IGNORE_SIZE_OPT",
83   "MAC_PREFIX_OPT",
84   "MAINTAIN_NODE_HEALTH_OPT",
85   "MASTER_NETDEV_OPT",
86   "MC_OPT",
87   "NET_OPT",
88   "NEW_CLUSTER_CERT_OPT",
89   "NEW_CLUSTER_DOMAIN_SECRET_OPT",
90   "NEW_CONFD_HMAC_KEY_OPT",
91   "NEW_RAPI_CERT_OPT",
92   "NEW_SECONDARY_OPT",
93   "NIC_PARAMS_OPT",
94   "NODE_LIST_OPT",
95   "NODE_PLACEMENT_OPT",
96   "NOHDR_OPT",
97   "NOIPCHECK_OPT",
98   "NO_INSTALL_OPT",
99   "NONAMECHECK_OPT",
100   "NOLVM_STORAGE_OPT",
101   "NOMODIFY_ETCHOSTS_OPT",
102   "NOMODIFY_SSH_SETUP_OPT",
103   "NONICS_OPT",
104   "NONLIVE_OPT",
105   "NONPLUS1_OPT",
106   "NOSHUTDOWN_OPT",
107   "NOSTART_OPT",
108   "NOSSH_KEYCHECK_OPT",
109   "NOVOTING_OPT",
110   "NWSYNC_OPT",
111   "ON_PRIMARY_OPT",
112   "ON_SECONDARY_OPT",
113   "OFFLINE_OPT",
114   "OSPARAMS_OPT",
115   "OS_OPT",
116   "OS_SIZE_OPT",
117   "RAPI_CERT_OPT",
118   "READD_OPT",
119   "REBOOT_TYPE_OPT",
120   "REMOVE_INSTANCE_OPT",
121   "REMOVE_UIDS_OPT",
122   "ROMAN_OPT",
123   "SECONDARY_IP_OPT",
124   "SELECT_OS_OPT",
125   "SEP_OPT",
126   "SHOWCMD_OPT",
127   "SHUTDOWN_TIMEOUT_OPT",
128   "SINGLE_NODE_OPT",
129   "SRC_DIR_OPT",
130   "SRC_NODE_OPT",
131   "SUBMIT_OPT",
132   "STATIC_OPT",
133   "SYNC_OPT",
134   "TAG_SRC_OPT",
135   "TIMEOUT_OPT",
136   "UIDPOOL_OPT",
137   "USEUNITS_OPT",
138   "USE_REPL_NET_OPT",
139   "VERBOSE_OPT",
140   "VG_NAME_OPT",
141   "YES_DOIT_OPT",
142   # Generic functions for CLI programs
143   "GenericMain",
144   "GenericInstanceCreate",
145   "GetClient",
146   "GetOnlineNodes",
147   "JobExecutor",
148   "JobSubmittedException",
149   "ParseTimespec",
150   "RunWhileClusterStopped",
151   "SubmitOpCode",
152   "SubmitOrSend",
153   "UsesRPC",
154   # Formatting functions
155   "ToStderr", "ToStdout",
156   "FormatError",
157   "GenerateTable",
158   "AskUser",
159   "FormatTimestamp",
160   # Tags functions
161   "ListTags",
162   "AddTags",
163   "RemoveTags",
164   # command line options support infrastructure
165   "ARGS_MANY_INSTANCES",
166   "ARGS_MANY_NODES",
167   "ARGS_NONE",
168   "ARGS_ONE_INSTANCE",
169   "ARGS_ONE_NODE",
170   "ARGS_ONE_OS",
171   "ArgChoice",
172   "ArgCommand",
173   "ArgFile",
174   "ArgHost",
175   "ArgInstance",
176   "ArgJobId",
177   "ArgNode",
178   "ArgOs",
179   "ArgSuggest",
180   "ArgUnknown",
181   "OPT_COMPL_INST_ADD_NODES",
182   "OPT_COMPL_MANY_NODES",
183   "OPT_COMPL_ONE_IALLOCATOR",
184   "OPT_COMPL_ONE_INSTANCE",
185   "OPT_COMPL_ONE_NODE",
186   "OPT_COMPL_ONE_OS",
187   "cli_option",
188   "SplitNodeOption",
189   "CalculateOSNames",
190   ]
191
192 NO_PREFIX = "no_"
193 UN_PREFIX = "-"
194
195
196 class _Argument:
197   def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
198     self.min = min
199     self.max = max
200
201   def __repr__(self):
202     return ("<%s min=%s max=%s>" %
203             (self.__class__.__name__, self.min, self.max))
204
205
206 class ArgSuggest(_Argument):
207   """Suggesting argument.
208
209   Value can be any of the ones passed to the constructor.
210
211   """
212   # pylint: disable-msg=W0622
213   def __init__(self, min=0, max=None, choices=None):
214     _Argument.__init__(self, min=min, max=max)
215     self.choices = choices
216
217   def __repr__(self):
218     return ("<%s min=%s max=%s choices=%r>" %
219             (self.__class__.__name__, self.min, self.max, self.choices))
220
221
222 class ArgChoice(ArgSuggest):
223   """Choice argument.
224
225   Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
226   but value must be one of the choices.
227
228   """
229
230
231 class ArgUnknown(_Argument):
232   """Unknown argument to program (e.g. determined at runtime).
233
234   """
235
236
237 class ArgInstance(_Argument):
238   """Instances argument.
239
240   """
241
242
243 class ArgNode(_Argument):
244   """Node argument.
245
246   """
247
248 class ArgJobId(_Argument):
249   """Job ID argument.
250
251   """
252
253
254 class ArgFile(_Argument):
255   """File path argument.
256
257   """
258
259
260 class ArgCommand(_Argument):
261   """Command argument.
262
263   """
264
265
266 class ArgHost(_Argument):
267   """Host argument.
268
269   """
270
271
272 class ArgOs(_Argument):
273   """OS argument.
274
275   """
276
277
278 ARGS_NONE = []
279 ARGS_MANY_INSTANCES = [ArgInstance()]
280 ARGS_MANY_NODES = [ArgNode()]
281 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
282 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
283 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
284
285
286 def _ExtractTagsObject(opts, args):
287   """Extract the tag type object.
288
289   Note that this function will modify its args parameter.
290
291   """
292   if not hasattr(opts, "tag_type"):
293     raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
294   kind = opts.tag_type
295   if kind == constants.TAG_CLUSTER:
296     retval = kind, kind
297   elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
298     if not args:
299       raise errors.OpPrereqError("no arguments passed to the command")
300     name = args.pop(0)
301     retval = kind, name
302   else:
303     raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
304   return retval
305
306
307 def _ExtendTags(opts, args):
308   """Extend the args if a source file has been given.
309
310   This function will extend the tags with the contents of the file
311   passed in the 'tags_source' attribute of the opts parameter. A file
312   named '-' will be replaced by stdin.
313
314   """
315   fname = opts.tags_source
316   if fname is None:
317     return
318   if fname == "-":
319     new_fh = sys.stdin
320   else:
321     new_fh = open(fname, "r")
322   new_data = []
323   try:
324     # we don't use the nice 'new_data = [line.strip() for line in fh]'
325     # because of python bug 1633941
326     while True:
327       line = new_fh.readline()
328       if not line:
329         break
330       new_data.append(line.strip())
331   finally:
332     new_fh.close()
333   args.extend(new_data)
334
335
336 def ListTags(opts, args):
337   """List the tags on a given object.
338
339   This is a generic implementation that knows how to deal with all
340   three cases of tag objects (cluster, node, instance). The opts
341   argument is expected to contain a tag_type field denoting what
342   object type we work on.
343
344   """
345   kind, name = _ExtractTagsObject(opts, args)
346   cl = GetClient()
347   result = cl.QueryTags(kind, name)
348   result = list(result)
349   result.sort()
350   for tag in result:
351     ToStdout(tag)
352
353
354 def AddTags(opts, args):
355   """Add tags on a given object.
356
357   This is a generic implementation that knows how to deal with all
358   three cases of tag objects (cluster, node, instance). The opts
359   argument is expected to contain a tag_type field denoting what
360   object type we work on.
361
362   """
363   kind, name = _ExtractTagsObject(opts, args)
364   _ExtendTags(opts, args)
365   if not args:
366     raise errors.OpPrereqError("No tags to be added")
367   op = opcodes.OpAddTags(kind=kind, name=name, tags=args)
368   SubmitOpCode(op)
369
370
371 def RemoveTags(opts, args):
372   """Remove tags from a given object.
373
374   This is a generic implementation that knows how to deal with all
375   three cases of tag objects (cluster, node, instance). The opts
376   argument is expected to contain a tag_type field denoting what
377   object type we work on.
378
379   """
380   kind, name = _ExtractTagsObject(opts, args)
381   _ExtendTags(opts, args)
382   if not args:
383     raise errors.OpPrereqError("No tags to be removed")
384   op = opcodes.OpDelTags(kind=kind, name=name, tags=args)
385   SubmitOpCode(op)
386
387
388 def check_unit(option, opt, value): # pylint: disable-msg=W0613
389   """OptParsers custom converter for units.
390
391   """
392   try:
393     return utils.ParseUnit(value)
394   except errors.UnitParseError, err:
395     raise OptionValueError("option %s: %s" % (opt, err))
396
397
398 def _SplitKeyVal(opt, data):
399   """Convert a KeyVal string into a dict.
400
401   This function will convert a key=val[,...] string into a dict. Empty
402   values will be converted specially: keys which have the prefix 'no_'
403   will have the value=False and the prefix stripped, the others will
404   have value=True.
405
406   @type opt: string
407   @param opt: a string holding the option name for which we process the
408       data, used in building error messages
409   @type data: string
410   @param data: a string of the format key=val,key=val,...
411   @rtype: dict
412   @return: {key=val, key=val}
413   @raises errors.ParameterError: if there are duplicate keys
414
415   """
416   kv_dict = {}
417   if data:
418     for elem in utils.UnescapeAndSplit(data, sep=","):
419       if "=" in elem:
420         key, val = elem.split("=", 1)
421       else:
422         if elem.startswith(NO_PREFIX):
423           key, val = elem[len(NO_PREFIX):], False
424         elif elem.startswith(UN_PREFIX):
425           key, val = elem[len(UN_PREFIX):], None
426         else:
427           key, val = elem, True
428       if key in kv_dict:
429         raise errors.ParameterError("Duplicate key '%s' in option %s" %
430                                     (key, opt))
431       kv_dict[key] = val
432   return kv_dict
433
434
435 def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
436   """Custom parser for ident:key=val,key=val options.
437
438   This will store the parsed values as a tuple (ident, {key: val}). As such,
439   multiple uses of this option via action=append is possible.
440
441   """
442   if ":" not in value:
443     ident, rest = value, ''
444   else:
445     ident, rest = value.split(":", 1)
446
447   if ident.startswith(NO_PREFIX):
448     if rest:
449       msg = "Cannot pass options when removing parameter groups: %s" % value
450       raise errors.ParameterError(msg)
451     retval = (ident[len(NO_PREFIX):], False)
452   elif ident.startswith(UN_PREFIX):
453     if rest:
454       msg = "Cannot pass options when removing parameter groups: %s" % value
455       raise errors.ParameterError(msg)
456     retval = (ident[len(UN_PREFIX):], None)
457   else:
458     kv_dict = _SplitKeyVal(opt, rest)
459     retval = (ident, kv_dict)
460   return retval
461
462
463 def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
464   """Custom parser class for key=val,key=val options.
465
466   This will store the parsed values as a dict {key: val}.
467
468   """
469   return _SplitKeyVal(opt, value)
470
471
472 def check_bool(option, opt, value): # pylint: disable-msg=W0613
473   """Custom parser for yes/no options.
474
475   This will store the parsed value as either True or False.
476
477   """
478   value = value.lower()
479   if value == constants.VALUE_FALSE or value == "no":
480     return False
481   elif value == constants.VALUE_TRUE or value == "yes":
482     return True
483   else:
484     raise errors.ParameterError("Invalid boolean value '%s'" % value)
485
486
487 # completion_suggestion is normally a list. Using numeric values not evaluating
488 # to False for dynamic completion.
489 (OPT_COMPL_MANY_NODES,
490  OPT_COMPL_ONE_NODE,
491  OPT_COMPL_ONE_INSTANCE,
492  OPT_COMPL_ONE_OS,
493  OPT_COMPL_ONE_IALLOCATOR,
494  OPT_COMPL_INST_ADD_NODES) = range(100, 106)
495
496 OPT_COMPL_ALL = frozenset([
497   OPT_COMPL_MANY_NODES,
498   OPT_COMPL_ONE_NODE,
499   OPT_COMPL_ONE_INSTANCE,
500   OPT_COMPL_ONE_OS,
501   OPT_COMPL_ONE_IALLOCATOR,
502   OPT_COMPL_INST_ADD_NODES,
503   ])
504
505
506 class CliOption(Option):
507   """Custom option class for optparse.
508
509   """
510   ATTRS = Option.ATTRS + [
511     "completion_suggest",
512     ]
513   TYPES = Option.TYPES + (
514     "identkeyval",
515     "keyval",
516     "unit",
517     "bool",
518     )
519   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
520   TYPE_CHECKER["identkeyval"] = check_ident_key_val
521   TYPE_CHECKER["keyval"] = check_key_val
522   TYPE_CHECKER["unit"] = check_unit
523   TYPE_CHECKER["bool"] = check_bool
524
525
526 # optparse.py sets make_option, so we do it for our own option class, too
527 cli_option = CliOption
528
529
530 _YORNO = "yes|no"
531
532 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
533                        help="Increase debugging level")
534
535 NOHDR_OPT = cli_option("--no-headers", default=False,
536                        action="store_true", dest="no_headers",
537                        help="Don't display column headers")
538
539 SEP_OPT = cli_option("--separator", default=None,
540                      action="store", dest="separator",
541                      help=("Separator between output fields"
542                            " (defaults to one space)"))
543
544 USEUNITS_OPT = cli_option("--units", default=None,
545                           dest="units", choices=('h', 'm', 'g', 't'),
546                           help="Specify units for output (one of hmgt)")
547
548 FIELDS_OPT = cli_option("-o", "--output", dest="output", action="store",
549                         type="string", metavar="FIELDS",
550                         help="Comma separated list of output fields")
551
552 FORCE_OPT = cli_option("-f", "--force", dest="force", action="store_true",
553                        default=False, help="Force the operation")
554
555 CONFIRM_OPT = cli_option("--yes", dest="confirm", action="store_true",
556                          default=False, help="Do not require confirmation")
557
558 TAG_SRC_OPT = cli_option("--from", dest="tags_source",
559                          default=None, help="File with tag names")
560
561 SUBMIT_OPT = cli_option("--submit", dest="submit_only",
562                         default=False, action="store_true",
563                         help=("Submit the job and return the job ID, but"
564                               " don't wait for the job to finish"))
565
566 SYNC_OPT = cli_option("--sync", dest="do_locking",
567                       default=False, action="store_true",
568                       help=("Grab locks while doing the queries"
569                             " in order to ensure more consistent results"))
570
571 _DRY_RUN_OPT = cli_option("--dry-run", default=False,
572                           action="store_true",
573                           help=("Do not execute the operation, just run the"
574                                 " check steps and verify it it could be"
575                                 " executed"))
576
577 VERBOSE_OPT = cli_option("-v", "--verbose", default=False,
578                          action="store_true",
579                          help="Increase the verbosity of the operation")
580
581 DEBUG_SIMERR_OPT = cli_option("--debug-simulate-errors", default=False,
582                               action="store_true", dest="simulate_errors",
583                               help="Debugging option that makes the operation"
584                               " treat most runtime checks as failed")
585
586 NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
587                         default=True, action="store_false",
588                         help="Don't wait for sync (DANGEROUS!)")
589
590 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
591                                help="Custom disk setup (diskless, file,"
592                                " plain or drbd)",
593                                default=None, metavar="TEMPL",
594                                choices=list(constants.DISK_TEMPLATES))
595
596 NONICS_OPT = cli_option("--no-nics", default=False, action="store_true",
597                         help="Do not create any network cards for"
598                         " the instance")
599
600 FILESTORE_DIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
601                                help="Relative path under default cluster-wide"
602                                " file storage dir to store file-based disks",
603                                default=None, metavar="<DIR>")
604
605 FILESTORE_DRIVER_OPT = cli_option("--file-driver", dest="file_driver",
606                                   help="Driver to use for image files",
607                                   default="loop", metavar="<DRIVER>",
608                                   choices=list(constants.FILE_DRIVER))
609
610 IALLOCATOR_OPT = cli_option("-I", "--iallocator", metavar="<NAME>",
611                             help="Select nodes for the instance automatically"
612                             " using the <NAME> iallocator plugin",
613                             default=None, type="string",
614                             completion_suggest=OPT_COMPL_ONE_IALLOCATOR)
615
616 OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
617                     metavar="<os>",
618                     completion_suggest=OPT_COMPL_ONE_OS)
619
620 OSPARAMS_OPT = cli_option("-O", "--os-parameters", dest="osparams",
621                          type="keyval", default={},
622                          help="OS parameters")
623
624 FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
625                                action="store_true", default=False,
626                                help="Force an unknown variant")
627
628 NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
629                             action="store_true", default=False,
630                             help="Do not install the OS (will"
631                             " enable no-start)")
632
633 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
634                          type="keyval", default={},
635                          help="Backend parameters")
636
637 HVOPTS_OPT =  cli_option("-H", "--hypervisor-parameters", type="keyval",
638                          default={}, dest="hvparams",
639                          help="Hypervisor parameters")
640
641 HYPERVISOR_OPT = cli_option("-H", "--hypervisor-parameters", dest="hypervisor",
642                             help="Hypervisor and hypervisor options, in the"
643                             " format hypervisor:option=value,option=value,...",
644                             default=None, type="identkeyval")
645
646 HVLIST_OPT = cli_option("-H", "--hypervisor-parameters", dest="hvparams",
647                         help="Hypervisor and hypervisor options, in the"
648                         " format hypervisor:option=value,option=value,...",
649                         default=[], action="append", type="identkeyval")
650
651 NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
652                            action="store_false",
653                            help="Don't check that the instance's IP"
654                            " is alive")
655
656 NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
657                              default=True, action="store_false",
658                              help="Don't check that the instance's name"
659                              " is resolvable")
660
661 NET_OPT = cli_option("--net",
662                      help="NIC parameters", default=[],
663                      dest="nics", action="append", type="identkeyval")
664
665 DISK_OPT = cli_option("--disk", help="Disk parameters", default=[],
666                       dest="disks", action="append", type="identkeyval")
667
668 DISKIDX_OPT = cli_option("--disks", dest="disks", default=None,
669                          help="Comma-separated list of disks"
670                          " indices to act on (e.g. 0,2) (optional,"
671                          " defaults to all disks)")
672
673 OS_SIZE_OPT = cli_option("-s", "--os-size", dest="sd_size",
674                          help="Enforces a single-disk configuration using the"
675                          " given disk size, in MiB unless a suffix is used",
676                          default=None, type="unit", metavar="<size>")
677
678 IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
679                                 dest="ignore_consistency",
680                                 action="store_true", default=False,
681                                 help="Ignore the consistency of the disks on"
682                                 " the secondary")
683
684 NONLIVE_OPT = cli_option("--non-live", dest="live",
685                          default=True, action="store_false",
686                          help="Do a non-live migration (this usually means"
687                          " freeze the instance, save the state, transfer and"
688                          " only then resume running on the secondary node)")
689
690 NODE_PLACEMENT_OPT = cli_option("-n", "--node", dest="node",
691                                 help="Target node and optional secondary node",
692                                 metavar="<pnode>[:<snode>]",
693                                 completion_suggest=OPT_COMPL_INST_ADD_NODES)
694
695 NODE_LIST_OPT = cli_option("-n", "--node", dest="nodes", default=[],
696                            action="append", metavar="<node>",
697                            help="Use only this node (can be used multiple"
698                            " times, if not given defaults to all nodes)",
699                            completion_suggest=OPT_COMPL_ONE_NODE)
700
701 SINGLE_NODE_OPT = cli_option("-n", "--node", dest="node", help="Target node",
702                              metavar="<node>",
703                              completion_suggest=OPT_COMPL_ONE_NODE)
704
705 NOSTART_OPT = cli_option("--no-start", dest="start", default=True,
706                          action="store_false",
707                          help="Don't start the instance after creation")
708
709 SHOWCMD_OPT = cli_option("--show-cmd", dest="show_command",
710                          action="store_true", default=False,
711                          help="Show command instead of executing it")
712
713 CLEANUP_OPT = cli_option("--cleanup", dest="cleanup",
714                          default=False, action="store_true",
715                          help="Instead of performing the migration, try to"
716                          " recover from a failed cleanup. This is safe"
717                          " to run even if the instance is healthy, but it"
718                          " will create extra replication traffic and "
719                          " disrupt briefly the replication (like during the"
720                          " migration")
721
722 STATIC_OPT = cli_option("-s", "--static", dest="static",
723                         action="store_true", default=False,
724                         help="Only show configuration data, not runtime data")
725
726 ALL_OPT = cli_option("--all", dest="show_all",
727                      default=False, action="store_true",
728                      help="Show info on all instances on the cluster."
729                      " This can take a long time to run, use wisely")
730
731 SELECT_OS_OPT = cli_option("--select-os", dest="select_os",
732                            action="store_true", default=False,
733                            help="Interactive OS reinstall, lists available"
734                            " OS templates for selection")
735
736 IGNORE_FAILURES_OPT = cli_option("--ignore-failures", dest="ignore_failures",
737                                  action="store_true", default=False,
738                                  help="Remove the instance from the cluster"
739                                  " configuration even if there are failures"
740                                  " during the removal process")
741
742 IGNORE_REMOVE_FAILURES_OPT = cli_option("--ignore-remove-failures",
743                                         dest="ignore_remove_failures",
744                                         action="store_true", default=False,
745                                         help="Remove the instance from the"
746                                         " cluster configuration even if there"
747                                         " are failures during the removal"
748                                         " process")
749
750 REMOVE_INSTANCE_OPT = cli_option("--remove-instance", dest="remove_instance",
751                                  action="store_true", default=False,
752                                  help="Remove the instance from the cluster")
753
754 NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
755                                help="Specifies the new secondary node",
756                                metavar="NODE", default=None,
757                                completion_suggest=OPT_COMPL_ONE_NODE)
758
759 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
760                             default=False, action="store_true",
761                             help="Replace the disk(s) on the primary"
762                             " node (only for the drbd template)")
763
764 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
765                               default=False, action="store_true",
766                               help="Replace the disk(s) on the secondary"
767                               " node (only for the drbd template)")
768
769 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
770                               default=False, action="store_true",
771                               help="Lock all nodes and auto-promote as needed"
772                               " to MC status")
773
774 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
775                               default=False, action="store_true",
776                               help="Automatically replace faulty disks"
777                               " (only for the drbd template)")
778
779 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
780                              default=False, action="store_true",
781                              help="Ignore current recorded size"
782                              " (useful for forcing activation when"
783                              " the recorded size is wrong)")
784
785 SRC_NODE_OPT = cli_option("--src-node", dest="src_node", help="Source node",
786                           metavar="<node>",
787                           completion_suggest=OPT_COMPL_ONE_NODE)
788
789 SRC_DIR_OPT = cli_option("--src-dir", dest="src_dir", help="Source directory",
790                          metavar="<dir>")
791
792 SECONDARY_IP_OPT = cli_option("-s", "--secondary-ip", dest="secondary_ip",
793                               help="Specify the secondary ip for the node",
794                               metavar="ADDRESS", default=None)
795
796 READD_OPT = cli_option("--readd", dest="readd",
797                        default=False, action="store_true",
798                        help="Readd old node after replacing it")
799
800 NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
801                                 default=True, action="store_false",
802                                 help="Disable SSH key fingerprint checking")
803
804
805 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
806                     type="bool", default=None, metavar=_YORNO,
807                     help="Set the master_candidate flag on the node")
808
809 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
810                          type="bool", default=None,
811                          help="Set the offline flag on the node")
812
813 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
814                          type="bool", default=None,
815                          help="Set the drained flag on the node")
816
817 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
818                              type="bool", default=None, metavar=_YORNO,
819                              help="Set the allocatable flag on a volume")
820
821 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
822                                help="Disable support for lvm based instances"
823                                " (cluster-wide)",
824                                action="store_false", default=True)
825
826 ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
827                             dest="enabled_hypervisors",
828                             help="Comma-separated list of hypervisors",
829                             type="string", default=None)
830
831 NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
832                             type="keyval", default={},
833                             help="NIC parameters")
834
835 CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
836                          dest="candidate_pool_size", type="int",
837                          help="Set the candidate pool size")
838
839 VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
840                          help="Enables LVM and specifies the volume group"
841                          " name (cluster-wide) for disk allocation [xenvg]",
842                          metavar="VG", default=None)
843
844 YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
845                           help="Destroy cluster", action="store_true")
846
847 NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
848                           help="Skip node agreement check (dangerous)",
849                           action="store_true", default=False)
850
851 MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
852                             help="Specify the mac prefix for the instance IP"
853                             " addresses, in the format XX:XX:XX",
854                             metavar="PREFIX",
855                             default=None)
856
857 MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
858                                help="Specify the node interface (cluster-wide)"
859                                " on which the master IP address will be added "
860                                " [%s]" % constants.DEFAULT_BRIDGE,
861                                metavar="NETDEV",
862                                default=constants.DEFAULT_BRIDGE)
863
864 GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
865                                 help="Specify the default directory (cluster-"
866                                 "wide) for storing the file-based disks [%s]" %
867                                 constants.DEFAULT_FILE_STORAGE_DIR,
868                                 metavar="DIR",
869                                 default=constants.DEFAULT_FILE_STORAGE_DIR)
870
871 NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
872                                    help="Don't modify /etc/hosts",
873                                    action="store_false", default=True)
874
875 NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
876                                     help="Don't initialize SSH keys",
877                                     action="store_false", default=True)
878
879 ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
880                              help="Enable parseable error messages",
881                              action="store_true", default=False)
882
883 NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
884                           help="Skip N+1 memory redundancy tests",
885                           action="store_true", default=False)
886
887 REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
888                              help="Type of reboot: soft/hard/full",
889                              default=constants.INSTANCE_REBOOT_HARD,
890                              metavar="<REBOOT>",
891                              choices=list(constants.REBOOT_TYPES))
892
893 IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
894                                     dest="ignore_secondaries",
895                                     default=False, action="store_true",
896                                     help="Ignore errors from secondaries")
897
898 NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
899                             action="store_false", default=True,
900                             help="Don't shutdown the instance (unsafe)")
901
902 TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
903                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
904                          help="Maximum time to wait")
905
906 SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
907                          dest="shutdown_timeout", type="int",
908                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
909                          help="Maximum time to wait for instance shutdown")
910
911 EARLY_RELEASE_OPT = cli_option("--early-release",
912                                dest="early_release", default=False,
913                                action="store_true",
914                                help="Release the locks on the secondary"
915                                " node(s) early")
916
917 NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
918                                   dest="new_cluster_cert",
919                                   default=False, action="store_true",
920                                   help="Generate a new cluster certificate")
921
922 RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
923                            default=None,
924                            help="File containing new RAPI certificate")
925
926 NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
927                                default=None, action="store_true",
928                                help=("Generate a new self-signed RAPI"
929                                      " certificate"))
930
931 NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
932                                     dest="new_confd_hmac_key",
933                                     default=False, action="store_true",
934                                     help=("Create a new HMAC key for %s" %
935                                           constants.CONFD))
936
937 CLUSTER_DOMAIN_SECRET_OPT = cli_option("--cluster-domain-secret",
938                                        dest="cluster_domain_secret",
939                                        default=None,
940                                        help=("Load new new cluster domain"
941                                              " secret from file"))
942
943 NEW_CLUSTER_DOMAIN_SECRET_OPT = cli_option("--new-cluster-domain-secret",
944                                            dest="new_cluster_domain_secret",
945                                            default=False, action="store_true",
946                                            help=("Create a new cluster domain"
947                                                  " secret"))
948
949 USE_REPL_NET_OPT = cli_option("--use-replication-network",
950                               dest="use_replication_network",
951                               help="Whether to use the replication network"
952                               " for talking to the nodes",
953                               action="store_true", default=False)
954
955 MAINTAIN_NODE_HEALTH_OPT = \
956     cli_option("--maintain-node-health", dest="maintain_node_health",
957                metavar=_YORNO, default=None, type="bool",
958                help="Configure the cluster to automatically maintain node"
959                " health, by shutting down unknown instances, shutting down"
960                " unknown DRBD devices, etc.")
961
962 IDENTIFY_DEFAULTS_OPT = \
963     cli_option("--identify-defaults", dest="identify_defaults",
964                default=False, action="store_true",
965                help="Identify which saved instance parameters are equal to"
966                " the current cluster defaults and set them as such, instead"
967                " of marking them as overridden")
968
969 UIDPOOL_OPT = cli_option("--uid-pool", default=None,
970                          action="store", dest="uid_pool",
971                          help=("A list of user-ids or user-id"
972                                " ranges separated by commas"))
973
974 ADD_UIDS_OPT = cli_option("--add-uids", default=None,
975                           action="store", dest="add_uids",
976                           help=("A list of user-ids or user-id"
977                                 " ranges separated by commas, to be"
978                                 " added to the user-id pool"))
979
980 REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
981                              action="store", dest="remove_uids",
982                              help=("A list of user-ids or user-id"
983                                    " ranges separated by commas, to be"
984                                    " removed from the user-id pool"))
985
986 ROMAN_OPT = cli_option("--roman",
987                        dest="roman_integers", default=False,
988                        action="store_true",
989                        help="Use roman numbers for positive integers")
990
991
992
993 def _ParseArgs(argv, commands, aliases):
994   """Parser for the command line arguments.
995
996   This function parses the arguments and returns the function which
997   must be executed together with its (modified) arguments.
998
999   @param argv: the command line
1000   @param commands: dictionary with special contents, see the design
1001       doc for cmdline handling
1002   @param aliases: dictionary with command aliases {'alias': 'target, ...}
1003
1004   """
1005   if len(argv) == 0:
1006     binary = "<command>"
1007   else:
1008     binary = argv[0].split("/")[-1]
1009
1010   if len(argv) > 1 and argv[1] == "--version":
1011     ToStdout("%s (ganeti) %s", binary, constants.RELEASE_VERSION)
1012     # Quit right away. That way we don't have to care about this special
1013     # argument. optparse.py does it the same.
1014     sys.exit(0)
1015
1016   if len(argv) < 2 or not (argv[1] in commands or
1017                            argv[1] in aliases):
1018     # let's do a nice thing
1019     sortedcmds = commands.keys()
1020     sortedcmds.sort()
1021
1022     ToStdout("Usage: %s {command} [options...] [argument...]", binary)
1023     ToStdout("%s <command> --help to see details, or man %s", binary, binary)
1024     ToStdout("")
1025
1026     # compute the max line length for cmd + usage
1027     mlen = max([len(" %s" % cmd) for cmd in commands])
1028     mlen = min(60, mlen) # should not get here...
1029
1030     # and format a nice command list
1031     ToStdout("Commands:")
1032     for cmd in sortedcmds:
1033       cmdstr = " %s" % (cmd,)
1034       help_text = commands[cmd][4]
1035       help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
1036       ToStdout("%-*s - %s", mlen, cmdstr, help_lines.pop(0))
1037       for line in help_lines:
1038         ToStdout("%-*s   %s", mlen, "", line)
1039
1040     ToStdout("")
1041
1042     return None, None, None
1043
1044   # get command, unalias it, and look it up in commands
1045   cmd = argv.pop(1)
1046   if cmd in aliases:
1047     if cmd in commands:
1048       raise errors.ProgrammerError("Alias '%s' overrides an existing"
1049                                    " command" % cmd)
1050
1051     if aliases[cmd] not in commands:
1052       raise errors.ProgrammerError("Alias '%s' maps to non-existing"
1053                                    " command '%s'" % (cmd, aliases[cmd]))
1054
1055     cmd = aliases[cmd]
1056
1057   func, args_def, parser_opts, usage, description = commands[cmd]
1058   parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
1059                         description=description,
1060                         formatter=TitledHelpFormatter(),
1061                         usage="%%prog %s %s" % (cmd, usage))
1062   parser.disable_interspersed_args()
1063   options, args = parser.parse_args()
1064
1065   if not _CheckArguments(cmd, args_def, args):
1066     return None, None, None
1067
1068   return func, options, args
1069
1070
1071 def _CheckArguments(cmd, args_def, args):
1072   """Verifies the arguments using the argument definition.
1073
1074   Algorithm:
1075
1076     1. Abort with error if values specified by user but none expected.
1077
1078     1. For each argument in definition
1079
1080       1. Keep running count of minimum number of values (min_count)
1081       1. Keep running count of maximum number of values (max_count)
1082       1. If it has an unlimited number of values
1083
1084         1. Abort with error if it's not the last argument in the definition
1085
1086     1. If last argument has limited number of values
1087
1088       1. Abort with error if number of values doesn't match or is too large
1089
1090     1. Abort with error if user didn't pass enough values (min_count)
1091
1092   """
1093   if args and not args_def:
1094     ToStderr("Error: Command %s expects no arguments", cmd)
1095     return False
1096
1097   min_count = None
1098   max_count = None
1099   check_max = None
1100
1101   last_idx = len(args_def) - 1
1102
1103   for idx, arg in enumerate(args_def):
1104     if min_count is None:
1105       min_count = arg.min
1106     elif arg.min is not None:
1107       min_count += arg.min
1108
1109     if max_count is None:
1110       max_count = arg.max
1111     elif arg.max is not None:
1112       max_count += arg.max
1113
1114     if idx == last_idx:
1115       check_max = (arg.max is not None)
1116
1117     elif arg.max is None:
1118       raise errors.ProgrammerError("Only the last argument can have max=None")
1119
1120   if check_max:
1121     # Command with exact number of arguments
1122     if (min_count is not None and max_count is not None and
1123         min_count == max_count and len(args) != min_count):
1124       ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
1125       return False
1126
1127     # Command with limited number of arguments
1128     if max_count is not None and len(args) > max_count:
1129       ToStderr("Error: Command %s expects only %d argument(s)",
1130                cmd, max_count)
1131       return False
1132
1133   # Command with some required arguments
1134   if min_count is not None and len(args) < min_count:
1135     ToStderr("Error: Command %s expects at least %d argument(s)",
1136              cmd, min_count)
1137     return False
1138
1139   return True
1140
1141
1142 def SplitNodeOption(value):
1143   """Splits the value of a --node option.
1144
1145   """
1146   if value and ':' in value:
1147     return value.split(':', 1)
1148   else:
1149     return (value, None)
1150
1151
1152 def CalculateOSNames(os_name, os_variants):
1153   """Calculates all the names an OS can be called, according to its variants.
1154
1155   @type os_name: string
1156   @param os_name: base name of the os
1157   @type os_variants: list or None
1158   @param os_variants: list of supported variants
1159   @rtype: list
1160   @return: list of valid names
1161
1162   """
1163   if os_variants:
1164     return ['%s+%s' % (os_name, v) for v in os_variants]
1165   else:
1166     return [os_name]
1167
1168
1169 def UsesRPC(fn):
1170   def wrapper(*args, **kwargs):
1171     rpc.Init()
1172     try:
1173       return fn(*args, **kwargs)
1174     finally:
1175       rpc.Shutdown()
1176   return wrapper
1177
1178
1179 def AskUser(text, choices=None):
1180   """Ask the user a question.
1181
1182   @param text: the question to ask
1183
1184   @param choices: list with elements tuples (input_char, return_value,
1185       description); if not given, it will default to: [('y', True,
1186       'Perform the operation'), ('n', False, 'Do no do the operation')];
1187       note that the '?' char is reserved for help
1188
1189   @return: one of the return values from the choices list; if input is
1190       not possible (i.e. not running with a tty, we return the last
1191       entry from the list
1192
1193   """
1194   if choices is None:
1195     choices = [('y', True, 'Perform the operation'),
1196                ('n', False, 'Do not perform the operation')]
1197   if not choices or not isinstance(choices, list):
1198     raise errors.ProgrammerError("Invalid choices argument to AskUser")
1199   for entry in choices:
1200     if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == '?':
1201       raise errors.ProgrammerError("Invalid choices element to AskUser")
1202
1203   answer = choices[-1][1]
1204   new_text = []
1205   for line in text.splitlines():
1206     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
1207   text = "\n".join(new_text)
1208   try:
1209     f = file("/dev/tty", "a+")
1210   except IOError:
1211     return answer
1212   try:
1213     chars = [entry[0] for entry in choices]
1214     chars[-1] = "[%s]" % chars[-1]
1215     chars.append('?')
1216     maps = dict([(entry[0], entry[1]) for entry in choices])
1217     while True:
1218       f.write(text)
1219       f.write('\n')
1220       f.write("/".join(chars))
1221       f.write(": ")
1222       line = f.readline(2).strip().lower()
1223       if line in maps:
1224         answer = maps[line]
1225         break
1226       elif line == '?':
1227         for entry in choices:
1228           f.write(" %s - %s\n" % (entry[0], entry[2]))
1229         f.write("\n")
1230         continue
1231   finally:
1232     f.close()
1233   return answer
1234
1235
1236 class JobSubmittedException(Exception):
1237   """Job was submitted, client should exit.
1238
1239   This exception has one argument, the ID of the job that was
1240   submitted. The handler should print this ID.
1241
1242   This is not an error, just a structured way to exit from clients.
1243
1244   """
1245
1246
1247 def SendJob(ops, cl=None):
1248   """Function to submit an opcode without waiting for the results.
1249
1250   @type ops: list
1251   @param ops: list of opcodes
1252   @type cl: luxi.Client
1253   @param cl: the luxi client to use for communicating with the master;
1254              if None, a new client will be created
1255
1256   """
1257   if cl is None:
1258     cl = GetClient()
1259
1260   job_id = cl.SubmitJob(ops)
1261
1262   return job_id
1263
1264
1265 def GenericPollJob(job_id, cbs, report_cbs):
1266   """Generic job-polling function.
1267
1268   @type job_id: number
1269   @param job_id: Job ID
1270   @type cbs: Instance of L{JobPollCbBase}
1271   @param cbs: Data callbacks
1272   @type report_cbs: Instance of L{JobPollReportCbBase}
1273   @param report_cbs: Reporting callbacks
1274
1275   """
1276   prev_job_info = None
1277   prev_logmsg_serial = None
1278
1279   status = None
1280
1281   while True:
1282     result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
1283                                       prev_logmsg_serial)
1284     if not result:
1285       # job not found, go away!
1286       raise errors.JobLost("Job with id %s lost" % job_id)
1287
1288     if result == constants.JOB_NOTCHANGED:
1289       report_cbs.ReportNotChanged(job_id, status)
1290
1291       # Wait again
1292       continue
1293
1294     # Split result, a tuple of (field values, log entries)
1295     (job_info, log_entries) = result
1296     (status, ) = job_info
1297
1298     if log_entries:
1299       for log_entry in log_entries:
1300         (serial, timestamp, log_type, message) = log_entry
1301         report_cbs.ReportLogMessage(job_id, serial, timestamp,
1302                                     log_type, message)
1303         prev_logmsg_serial = max(prev_logmsg_serial, serial)
1304
1305     # TODO: Handle canceled and archived jobs
1306     elif status in (constants.JOB_STATUS_SUCCESS,
1307                     constants.JOB_STATUS_ERROR,
1308                     constants.JOB_STATUS_CANCELING,
1309                     constants.JOB_STATUS_CANCELED):
1310       break
1311
1312     prev_job_info = job_info
1313
1314   jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
1315   if not jobs:
1316     raise errors.JobLost("Job with id %s lost" % job_id)
1317
1318   status, opstatus, result = jobs[0]
1319
1320   if status == constants.JOB_STATUS_SUCCESS:
1321     return result
1322
1323   if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
1324     raise errors.OpExecError("Job was canceled")
1325
1326   has_ok = False
1327   for idx, (status, msg) in enumerate(zip(opstatus, result)):
1328     if status == constants.OP_STATUS_SUCCESS:
1329       has_ok = True
1330     elif status == constants.OP_STATUS_ERROR:
1331       errors.MaybeRaise(msg)
1332
1333       if has_ok:
1334         raise errors.OpExecError("partial failure (opcode %d): %s" %
1335                                  (idx, msg))
1336
1337       raise errors.OpExecError(str(msg))
1338
1339   # default failure mode
1340   raise errors.OpExecError(result)
1341
1342
1343 class JobPollCbBase:
1344   """Base class for L{GenericPollJob} callbacks.
1345
1346   """
1347   def __init__(self):
1348     """Initializes this class.
1349
1350     """
1351
1352   def WaitForJobChangeOnce(self, job_id, fields,
1353                            prev_job_info, prev_log_serial):
1354     """Waits for changes on a job.
1355
1356     """
1357     raise NotImplementedError()
1358
1359   def QueryJobs(self, job_ids, fields):
1360     """Returns the selected fields for the selected job IDs.
1361
1362     @type job_ids: list of numbers
1363     @param job_ids: Job IDs
1364     @type fields: list of strings
1365     @param fields: Fields
1366
1367     """
1368     raise NotImplementedError()
1369
1370
1371 class JobPollReportCbBase:
1372   """Base class for L{GenericPollJob} reporting callbacks.
1373
1374   """
1375   def __init__(self):
1376     """Initializes this class.
1377
1378     """
1379
1380   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1381     """Handles a log message.
1382
1383     """
1384     raise NotImplementedError()
1385
1386   def ReportNotChanged(self, job_id, status):
1387     """Called for if a job hasn't changed in a while.
1388
1389     @type job_id: number
1390     @param job_id: Job ID
1391     @type status: string or None
1392     @param status: Job status if available
1393
1394     """
1395     raise NotImplementedError()
1396
1397
1398 class _LuxiJobPollCb(JobPollCbBase):
1399   def __init__(self, cl):
1400     """Initializes this class.
1401
1402     """
1403     JobPollCbBase.__init__(self)
1404     self.cl = cl
1405
1406   def WaitForJobChangeOnce(self, job_id, fields,
1407                            prev_job_info, prev_log_serial):
1408     """Waits for changes on a job.
1409
1410     """
1411     return self.cl.WaitForJobChangeOnce(job_id, fields,
1412                                         prev_job_info, prev_log_serial)
1413
1414   def QueryJobs(self, job_ids, fields):
1415     """Returns the selected fields for the selected job IDs.
1416
1417     """
1418     return self.cl.QueryJobs(job_ids, fields)
1419
1420
1421 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
1422   def __init__(self, feedback_fn):
1423     """Initializes this class.
1424
1425     """
1426     JobPollReportCbBase.__init__(self)
1427
1428     self.feedback_fn = feedback_fn
1429
1430     assert callable(feedback_fn)
1431
1432   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1433     """Handles a log message.
1434
1435     """
1436     self.feedback_fn((timestamp, log_type, log_msg))
1437
1438   def ReportNotChanged(self, job_id, status):
1439     """Called if a job hasn't changed in a while.
1440
1441     """
1442     # Ignore
1443
1444
1445 class StdioJobPollReportCb(JobPollReportCbBase):
1446   def __init__(self):
1447     """Initializes this class.
1448
1449     """
1450     JobPollReportCbBase.__init__(self)
1451
1452     self.notified_queued = False
1453     self.notified_waitlock = False
1454
1455   def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
1456     """Handles a log message.
1457
1458     """
1459     ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
1460              utils.SafeEncode(log_msg))
1461
1462   def ReportNotChanged(self, job_id, status):
1463     """Called if a job hasn't changed in a while.
1464
1465     """
1466     if status is None:
1467       return
1468
1469     if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
1470       ToStderr("Job %s is waiting in queue", job_id)
1471       self.notified_queued = True
1472
1473     elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
1474       ToStderr("Job %s is trying to acquire all necessary locks", job_id)
1475       self.notified_waitlock = True
1476
1477
1478 def PollJob(job_id, cl=None, feedback_fn=None):
1479   """Function to poll for the result of a job.
1480
1481   @type job_id: job identified
1482   @param job_id: the job to poll for results
1483   @type cl: luxi.Client
1484   @param cl: the luxi client to use for communicating with the master;
1485              if None, a new client will be created
1486
1487   """
1488   if cl is None:
1489     cl = GetClient()
1490
1491   if feedback_fn:
1492     reporter = FeedbackFnJobPollReportCb(feedback_fn)
1493   else:
1494     reporter = StdioJobPollReportCb()
1495
1496   return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
1497
1498
1499 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
1500   """Legacy function to submit an opcode.
1501
1502   This is just a simple wrapper over the construction of the processor
1503   instance. It should be extended to better handle feedback and
1504   interaction functions.
1505
1506   """
1507   if cl is None:
1508     cl = GetClient()
1509
1510   SetGenericOpcodeOpts([op], opts)
1511
1512   job_id = SendJob([op], cl)
1513
1514   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
1515
1516   return op_results[0]
1517
1518
1519 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
1520   """Wrapper around SubmitOpCode or SendJob.
1521
1522   This function will decide, based on the 'opts' parameter, whether to
1523   submit and wait for the result of the opcode (and return it), or
1524   whether to just send the job and print its identifier. It is used in
1525   order to simplify the implementation of the '--submit' option.
1526
1527   It will also process the opcodes if we're sending the via SendJob
1528   (otherwise SubmitOpCode does it).
1529
1530   """
1531   if opts and opts.submit_only:
1532     job = [op]
1533     SetGenericOpcodeOpts(job, opts)
1534     job_id = SendJob(job, cl=cl)
1535     raise JobSubmittedException(job_id)
1536   else:
1537     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1538
1539
1540 def SetGenericOpcodeOpts(opcode_list, options):
1541   """Processor for generic options.
1542
1543   This function updates the given opcodes based on generic command
1544   line options (like debug, dry-run, etc.).
1545
1546   @param opcode_list: list of opcodes
1547   @param options: command line options or None
1548   @return: None (in-place modification)
1549
1550   """
1551   if not options:
1552     return
1553   for op in opcode_list:
1554     op.dry_run = options.dry_run
1555     op.debug_level = options.debug
1556
1557
1558 def GetClient():
1559   # TODO: Cache object?
1560   try:
1561     client = luxi.Client()
1562   except luxi.NoMasterError:
1563     ss = ssconf.SimpleStore()
1564
1565     # Try to read ssconf file
1566     try:
1567       ss.GetMasterNode()
1568     except errors.ConfigurationError:
1569       raise errors.OpPrereqError("Cluster not initialized or this machine is"
1570                                  " not part of a cluster")
1571
1572     master, myself = ssconf.GetMasterAndMyself(ss=ss)
1573     if master != myself:
1574       raise errors.OpPrereqError("This is not the master node, please connect"
1575                                  " to node '%s' and rerun the command" %
1576                                  master)
1577     raise
1578   return client
1579
1580
1581 def FormatError(err):
1582   """Return a formatted error message for a given error.
1583
1584   This function takes an exception instance and returns a tuple
1585   consisting of two values: first, the recommended exit code, and
1586   second, a string describing the error message (not
1587   newline-terminated).
1588
1589   """
1590   retcode = 1
1591   obuf = StringIO()
1592   msg = str(err)
1593   if isinstance(err, errors.ConfigurationError):
1594     txt = "Corrupt configuration file: %s" % msg
1595     logging.error(txt)
1596     obuf.write(txt + "\n")
1597     obuf.write("Aborting.")
1598     retcode = 2
1599   elif isinstance(err, errors.HooksAbort):
1600     obuf.write("Failure: hooks execution failed:\n")
1601     for node, script, out in err.args[0]:
1602       if out:
1603         obuf.write("  node: %s, script: %s, output: %s\n" %
1604                    (node, script, out))
1605       else:
1606         obuf.write("  node: %s, script: %s (no output)\n" %
1607                    (node, script))
1608   elif isinstance(err, errors.HooksFailure):
1609     obuf.write("Failure: hooks general failure: %s" % msg)
1610   elif isinstance(err, errors.ResolverError):
1611     this_host = utils.HostInfo.SysName()
1612     if err.args[0] == this_host:
1613       msg = "Failure: can't resolve my own hostname ('%s')"
1614     else:
1615       msg = "Failure: can't resolve hostname '%s'"
1616     obuf.write(msg % err.args[0])
1617   elif isinstance(err, errors.OpPrereqError):
1618     if len(err.args) == 2:
1619       obuf.write("Failure: prerequisites not met for this"
1620                " operation:\nerror type: %s, error details:\n%s" %
1621                  (err.args[1], err.args[0]))
1622     else:
1623       obuf.write("Failure: prerequisites not met for this"
1624                  " operation:\n%s" % msg)
1625   elif isinstance(err, errors.OpExecError):
1626     obuf.write("Failure: command execution error:\n%s" % msg)
1627   elif isinstance(err, errors.TagError):
1628     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1629   elif isinstance(err, errors.JobQueueDrainError):
1630     obuf.write("Failure: the job queue is marked for drain and doesn't"
1631                " accept new requests\n")
1632   elif isinstance(err, errors.JobQueueFull):
1633     obuf.write("Failure: the job queue is full and doesn't accept new"
1634                " job submissions until old jobs are archived\n")
1635   elif isinstance(err, errors.TypeEnforcementError):
1636     obuf.write("Parameter Error: %s" % msg)
1637   elif isinstance(err, errors.ParameterError):
1638     obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1639   elif isinstance(err, luxi.NoMasterError):
1640     obuf.write("Cannot communicate with the master daemon.\nIs it running"
1641                " and listening for connections?")
1642   elif isinstance(err, luxi.TimeoutError):
1643     obuf.write("Timeout while talking to the master daemon. Error:\n"
1644                "%s" % msg)
1645   elif isinstance(err, luxi.ProtocolError):
1646     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1647                "%s" % msg)
1648   elif isinstance(err, errors.GenericError):
1649     obuf.write("Unhandled Ganeti error: %s" % msg)
1650   elif isinstance(err, JobSubmittedException):
1651     obuf.write("JobID: %s\n" % err.args[0])
1652     retcode = 0
1653   else:
1654     obuf.write("Unhandled exception: %s" % msg)
1655   return retcode, obuf.getvalue().rstrip('\n')
1656
1657
1658 def GenericMain(commands, override=None, aliases=None):
1659   """Generic main function for all the gnt-* commands.
1660
1661   Arguments:
1662     - commands: a dictionary with a special structure, see the design doc
1663                 for command line handling.
1664     - override: if not None, we expect a dictionary with keys that will
1665                 override command line options; this can be used to pass
1666                 options from the scripts to generic functions
1667     - aliases: dictionary with command aliases {'alias': 'target, ...}
1668
1669   """
1670   # save the program name and the entire command line for later logging
1671   if sys.argv:
1672     binary = os.path.basename(sys.argv[0]) or sys.argv[0]
1673     if len(sys.argv) >= 2:
1674       binary += " " + sys.argv[1]
1675       old_cmdline = " ".join(sys.argv[2:])
1676     else:
1677       old_cmdline = ""
1678   else:
1679     binary = "<unknown program>"
1680     old_cmdline = ""
1681
1682   if aliases is None:
1683     aliases = {}
1684
1685   try:
1686     func, options, args = _ParseArgs(sys.argv, commands, aliases)
1687   except errors.ParameterError, err:
1688     result, err_msg = FormatError(err)
1689     ToStderr(err_msg)
1690     return 1
1691
1692   if func is None: # parse error
1693     return 1
1694
1695   if override is not None:
1696     for key, val in override.iteritems():
1697       setattr(options, key, val)
1698
1699   utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
1700                      stderr_logging=True, program=binary)
1701
1702   if old_cmdline:
1703     logging.info("run with arguments '%s'", old_cmdline)
1704   else:
1705     logging.info("run with no arguments")
1706
1707   try:
1708     result = func(options, args)
1709   except (errors.GenericError, luxi.ProtocolError,
1710           JobSubmittedException), err:
1711     result, err_msg = FormatError(err)
1712     logging.exception("Error during command processing")
1713     ToStderr(err_msg)
1714
1715   return result
1716
1717
1718 def GenericInstanceCreate(mode, opts, args):
1719   """Add an instance to the cluster via either creation or import.
1720
1721   @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1722   @param opts: the command line options selected by the user
1723   @type args: list
1724   @param args: should contain only one element, the new instance name
1725   @rtype: int
1726   @return: the desired exit code
1727
1728   """
1729   instance = args[0]
1730
1731   (pnode, snode) = SplitNodeOption(opts.node)
1732
1733   hypervisor = None
1734   hvparams = {}
1735   if opts.hypervisor:
1736     hypervisor, hvparams = opts.hypervisor
1737
1738   if opts.nics:
1739     try:
1740       nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
1741     except ValueError, err:
1742       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
1743     nics = [{}] * nic_max
1744     for nidx, ndict in opts.nics:
1745       nidx = int(nidx)
1746       if not isinstance(ndict, dict):
1747         msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
1748         raise errors.OpPrereqError(msg)
1749       nics[nidx] = ndict
1750   elif opts.no_nics:
1751     # no nics
1752     nics = []
1753   elif mode == constants.INSTANCE_CREATE:
1754     # default of one nic, all auto
1755     nics = [{}]
1756   else:
1757     # mode == import
1758     nics = []
1759
1760   if opts.disk_template == constants.DT_DISKLESS:
1761     if opts.disks or opts.sd_size is not None:
1762       raise errors.OpPrereqError("Diskless instance but disk"
1763                                  " information passed")
1764     disks = []
1765   else:
1766     if (not opts.disks and not opts.sd_size
1767         and mode == constants.INSTANCE_CREATE):
1768       raise errors.OpPrereqError("No disk information specified")
1769     if opts.disks and opts.sd_size is not None:
1770       raise errors.OpPrereqError("Please use either the '--disk' or"
1771                                  " '-s' option")
1772     if opts.sd_size is not None:
1773       opts.disks = [(0, {"size": opts.sd_size})]
1774
1775     if opts.disks:
1776       try:
1777         disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1778       except ValueError, err:
1779         raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
1780       disks = [{}] * disk_max
1781     else:
1782       disks = []
1783     for didx, ddict in opts.disks:
1784       didx = int(didx)
1785       if not isinstance(ddict, dict):
1786         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1787         raise errors.OpPrereqError(msg)
1788       elif "size" in ddict:
1789         if "adopt" in ddict:
1790           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1791                                      " (disk %d)" % didx)
1792         try:
1793           ddict["size"] = utils.ParseUnit(ddict["size"])
1794         except ValueError, err:
1795           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1796                                      (didx, err))
1797       elif "adopt" in ddict:
1798         if mode == constants.INSTANCE_IMPORT:
1799           raise errors.OpPrereqError("Disk adoption not allowed for instance"
1800                                      " import")
1801         ddict["size"] = 0
1802       else:
1803         raise errors.OpPrereqError("Missing size or adoption source for"
1804                                    " disk %d" % didx)
1805       disks[didx] = ddict
1806
1807   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
1808   utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1809
1810   if mode == constants.INSTANCE_CREATE:
1811     start = opts.start
1812     os_type = opts.os
1813     src_node = None
1814     src_path = None
1815     no_install = opts.no_install
1816     identify_defaults = False
1817   elif mode == constants.INSTANCE_IMPORT:
1818     start = False
1819     os_type = None
1820     src_node = opts.src_node
1821     src_path = opts.src_dir
1822     no_install = None
1823     identify_defaults = opts.identify_defaults
1824   else:
1825     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1826
1827   op = opcodes.OpCreateInstance(instance_name=instance,
1828                                 disks=disks,
1829                                 disk_template=opts.disk_template,
1830                                 nics=nics,
1831                                 pnode=pnode, snode=snode,
1832                                 ip_check=opts.ip_check,
1833                                 name_check=opts.name_check,
1834                                 wait_for_sync=opts.wait_for_sync,
1835                                 file_storage_dir=opts.file_storage_dir,
1836                                 file_driver=opts.file_driver,
1837                                 iallocator=opts.iallocator,
1838                                 hypervisor=hypervisor,
1839                                 hvparams=hvparams,
1840                                 beparams=opts.beparams,
1841                                 osparams=opts.osparams,
1842                                 mode=mode,
1843                                 start=start,
1844                                 os_type=os_type,
1845                                 src_node=src_node,
1846                                 src_path=src_path,
1847                                 no_install=no_install,
1848                                 identify_defaults=identify_defaults)
1849
1850   SubmitOrSend(op, opts)
1851   return 0
1852
1853
1854 class _RunWhileClusterStoppedHelper:
1855   """Helper class for L{RunWhileClusterStopped} to simplify state management
1856
1857   """
1858   def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1859     """Initializes this class.
1860
1861     @type feedback_fn: callable
1862     @param feedback_fn: Feedback function
1863     @type cluster_name: string
1864     @param cluster_name: Cluster name
1865     @type master_node: string
1866     @param master_node Master node name
1867     @type online_nodes: list
1868     @param online_nodes: List of names of online nodes
1869
1870     """
1871     self.feedback_fn = feedback_fn
1872     self.cluster_name = cluster_name
1873     self.master_node = master_node
1874     self.online_nodes = online_nodes
1875
1876     self.ssh = ssh.SshRunner(self.cluster_name)
1877
1878     self.nonmaster_nodes = [name for name in online_nodes
1879                             if name != master_node]
1880
1881     assert self.master_node not in self.nonmaster_nodes
1882
1883   def _RunCmd(self, node_name, cmd):
1884     """Runs a command on the local or a remote machine.
1885
1886     @type node_name: string
1887     @param node_name: Machine name
1888     @type cmd: list
1889     @param cmd: Command
1890
1891     """
1892     if node_name is None or node_name == self.master_node:
1893       # No need to use SSH
1894       result = utils.RunCmd(cmd)
1895     else:
1896       result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1897
1898     if result.failed:
1899       errmsg = ["Failed to run command %s" % result.cmd]
1900       if node_name:
1901         errmsg.append("on node %s" % node_name)
1902       errmsg.append(": exitcode %s and error %s" %
1903                     (result.exit_code, result.output))
1904       raise errors.OpExecError(" ".join(errmsg))
1905
1906   def Call(self, fn, *args):
1907     """Call function while all daemons are stopped.
1908
1909     @type fn: callable
1910     @param fn: Function to be called
1911
1912     """
1913     # Pause watcher by acquiring an exclusive lock on watcher state file
1914     self.feedback_fn("Blocking watcher")
1915     watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1916     try:
1917       # TODO: Currently, this just blocks. There's no timeout.
1918       # TODO: Should it be a shared lock?
1919       watcher_block.Exclusive(blocking=True)
1920
1921       # Stop master daemons, so that no new jobs can come in and all running
1922       # ones are finished
1923       self.feedback_fn("Stopping master daemons")
1924       self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1925       try:
1926         # Stop daemons on all nodes
1927         for node_name in self.online_nodes:
1928           self.feedback_fn("Stopping daemons on %s" % node_name)
1929           self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1930
1931         # All daemons are shut down now
1932         try:
1933           return fn(self, *args)
1934         except Exception, err:
1935           _, errmsg = FormatError(err)
1936           logging.exception("Caught exception")
1937           self.feedback_fn(errmsg)
1938           raise
1939       finally:
1940         # Start cluster again, master node last
1941         for node_name in self.nonmaster_nodes + [self.master_node]:
1942           self.feedback_fn("Starting daemons on %s" % node_name)
1943           self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1944     finally:
1945       # Resume watcher
1946       watcher_block.Close()
1947
1948
1949 def RunWhileClusterStopped(feedback_fn, fn, *args):
1950   """Calls a function while all cluster daemons are stopped.
1951
1952   @type feedback_fn: callable
1953   @param feedback_fn: Feedback function
1954   @type fn: callable
1955   @param fn: Function to be called when daemons are stopped
1956
1957   """
1958   feedback_fn("Gathering cluster information")
1959
1960   # This ensures we're running on the master daemon
1961   cl = GetClient()
1962
1963   (cluster_name, master_node) = \
1964     cl.QueryConfigValues(["cluster_name", "master_node"])
1965
1966   online_nodes = GetOnlineNodes([], cl=cl)
1967
1968   # Don't keep a reference to the client. The master daemon will go away.
1969   del cl
1970
1971   assert master_node in online_nodes
1972
1973   return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1974                                        online_nodes).Call(fn, *args)
1975
1976
1977 def GenerateTable(headers, fields, separator, data,
1978                   numfields=None, unitfields=None,
1979                   units=None):
1980   """Prints a table with headers and different fields.
1981
1982   @type headers: dict
1983   @param headers: dictionary mapping field names to headers for
1984       the table
1985   @type fields: list
1986   @param fields: the field names corresponding to each row in
1987       the data field
1988   @param separator: the separator to be used; if this is None,
1989       the default 'smart' algorithm is used which computes optimal
1990       field width, otherwise just the separator is used between
1991       each field
1992   @type data: list
1993   @param data: a list of lists, each sublist being one row to be output
1994   @type numfields: list
1995   @param numfields: a list with the fields that hold numeric
1996       values and thus should be right-aligned
1997   @type unitfields: list
1998   @param unitfields: a list with the fields that hold numeric
1999       values that should be formatted with the units field
2000   @type units: string or None
2001   @param units: the units we should use for formatting, or None for
2002       automatic choice (human-readable for non-separator usage, otherwise
2003       megabytes); this is a one-letter string
2004
2005   """
2006   if units is None:
2007     if separator:
2008       units = "m"
2009     else:
2010       units = "h"
2011
2012   if numfields is None:
2013     numfields = []
2014   if unitfields is None:
2015     unitfields = []
2016
2017   numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
2018   unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
2019
2020   format_fields = []
2021   for field in fields:
2022     if headers and field not in headers:
2023       # TODO: handle better unknown fields (either revert to old
2024       # style of raising exception, or deal more intelligently with
2025       # variable fields)
2026       headers[field] = field
2027     if separator is not None:
2028       format_fields.append("%s")
2029     elif numfields.Matches(field):
2030       format_fields.append("%*s")
2031     else:
2032       format_fields.append("%-*s")
2033
2034   if separator is None:
2035     mlens = [0 for name in fields]
2036     format_str = ' '.join(format_fields)
2037   else:
2038     format_str = separator.replace("%", "%%").join(format_fields)
2039
2040   for row in data:
2041     if row is None:
2042       continue
2043     for idx, val in enumerate(row):
2044       if unitfields.Matches(fields[idx]):
2045         try:
2046           val = int(val)
2047         except (TypeError, ValueError):
2048           pass
2049         else:
2050           val = row[idx] = utils.FormatUnit(val, units)
2051       val = row[idx] = str(val)
2052       if separator is None:
2053         mlens[idx] = max(mlens[idx], len(val))
2054
2055   result = []
2056   if headers:
2057     args = []
2058     for idx, name in enumerate(fields):
2059       hdr = headers[name]
2060       if separator is None:
2061         mlens[idx] = max(mlens[idx], len(hdr))
2062         args.append(mlens[idx])
2063       args.append(hdr)
2064     result.append(format_str % tuple(args))
2065
2066   if separator is None:
2067     assert len(mlens) == len(fields)
2068
2069     if fields and not numfields.Matches(fields[-1]):
2070       mlens[-1] = 0
2071
2072   for line in data:
2073     args = []
2074     if line is None:
2075       line = ['-' for _ in fields]
2076     for idx in range(len(fields)):
2077       if separator is None:
2078         args.append(mlens[idx])
2079       args.append(line[idx])
2080     result.append(format_str % tuple(args))
2081
2082   return result
2083
2084
2085 def FormatTimestamp(ts):
2086   """Formats a given timestamp.
2087
2088   @type ts: timestamp
2089   @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2090
2091   @rtype: string
2092   @return: a string with the formatted timestamp
2093
2094   """
2095   if not isinstance (ts, (tuple, list)) or len(ts) != 2:
2096     return '?'
2097   sec, usec = ts
2098   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
2099
2100
2101 def ParseTimespec(value):
2102   """Parse a time specification.
2103
2104   The following suffixed will be recognized:
2105
2106     - s: seconds
2107     - m: minutes
2108     - h: hours
2109     - d: day
2110     - w: weeks
2111
2112   Without any suffix, the value will be taken to be in seconds.
2113
2114   """
2115   value = str(value)
2116   if not value:
2117     raise errors.OpPrereqError("Empty time specification passed")
2118   suffix_map = {
2119     's': 1,
2120     'm': 60,
2121     'h': 3600,
2122     'd': 86400,
2123     'w': 604800,
2124     }
2125   if value[-1] not in suffix_map:
2126     try:
2127       value = int(value)
2128     except (TypeError, ValueError):
2129       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2130   else:
2131     multiplier = suffix_map[value[-1]]
2132     value = value[:-1]
2133     if not value: # no data left after stripping the suffix
2134       raise errors.OpPrereqError("Invalid time specification (only"
2135                                  " suffix passed)")
2136     try:
2137       value = int(value) * multiplier
2138     except (TypeError, ValueError):
2139       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
2140   return value
2141
2142
2143 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2144                    filter_master=False):
2145   """Returns the names of online nodes.
2146
2147   This function will also log a warning on stderr with the names of
2148   the online nodes.
2149
2150   @param nodes: if not empty, use only this subset of nodes (minus the
2151       offline ones)
2152   @param cl: if not None, luxi client to use
2153   @type nowarn: boolean
2154   @param nowarn: by default, this function will output a note with the
2155       offline nodes that are skipped; if this parameter is True the
2156       note is not displayed
2157   @type secondary_ips: boolean
2158   @param secondary_ips: if True, return the secondary IPs instead of the
2159       names, useful for doing network traffic over the replication interface
2160       (if any)
2161   @type filter_master: boolean
2162   @param filter_master: if True, do not return the master node in the list
2163       (useful in coordination with secondary_ips where we cannot check our
2164       node name against the list)
2165
2166   """
2167   if cl is None:
2168     cl = GetClient()
2169
2170   if secondary_ips:
2171     name_idx = 2
2172   else:
2173     name_idx = 0
2174
2175   if filter_master:
2176     master_node = cl.QueryConfigValues(["master_node"])[0]
2177     filter_fn = lambda x: x != master_node
2178   else:
2179     filter_fn = lambda _: True
2180
2181   result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
2182                          use_locking=False)
2183   offline = [row[0] for row in result if row[1]]
2184   if offline and not nowarn:
2185     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
2186   return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
2187
2188
2189 def _ToStream(stream, txt, *args):
2190   """Write a message to a stream, bypassing the logging system
2191
2192   @type stream: file object
2193   @param stream: the file to which we should write
2194   @type txt: str
2195   @param txt: the message
2196
2197   """
2198   if args:
2199     args = tuple(args)
2200     stream.write(txt % args)
2201   else:
2202     stream.write(txt)
2203   stream.write('\n')
2204   stream.flush()
2205
2206
2207 def ToStdout(txt, *args):
2208   """Write a message to stdout only, bypassing the logging system
2209
2210   This is just a wrapper over _ToStream.
2211
2212   @type txt: str
2213   @param txt: the message
2214
2215   """
2216   _ToStream(sys.stdout, txt, *args)
2217
2218
2219 def ToStderr(txt, *args):
2220   """Write a message to stderr only, bypassing the logging system
2221
2222   This is just a wrapper over _ToStream.
2223
2224   @type txt: str
2225   @param txt: the message
2226
2227   """
2228   _ToStream(sys.stderr, txt, *args)
2229
2230
2231 class JobExecutor(object):
2232   """Class which manages the submission and execution of multiple jobs.
2233
2234   Note that instances of this class should not be reused between
2235   GetResults() calls.
2236
2237   """
2238   def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2239     self.queue = []
2240     if cl is None:
2241       cl = GetClient()
2242     self.cl = cl
2243     self.verbose = verbose
2244     self.jobs = []
2245     self.opts = opts
2246     self.feedback_fn = feedback_fn
2247
2248   def QueueJob(self, name, *ops):
2249     """Record a job for later submit.
2250
2251     @type name: string
2252     @param name: a description of the job, will be used in WaitJobSet
2253     """
2254     SetGenericOpcodeOpts(ops, self.opts)
2255     self.queue.append((name, ops))
2256
2257   def SubmitPending(self, each=False):
2258     """Submit all pending jobs.
2259
2260     """
2261     if each:
2262       results = []
2263       for row in self.queue:
2264         # SubmitJob will remove the success status, but raise an exception if
2265         # the submission fails, so we'll notice that anyway.
2266         results.append([True, self.cl.SubmitJob(row[1])])
2267     else:
2268       results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
2269     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
2270                                                             self.queue)):
2271       self.jobs.append((idx, status, data, name))
2272
2273   def _ChooseJob(self):
2274     """Choose a non-waiting/queued job to poll next.
2275
2276     """
2277     assert self.jobs, "_ChooseJob called with empty job list"
2278
2279     result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
2280     assert result
2281
2282     for job_data, status in zip(self.jobs, result):
2283       if status[0] in (constants.JOB_STATUS_QUEUED,
2284                     constants.JOB_STATUS_WAITLOCK,
2285                     constants.JOB_STATUS_CANCELING):
2286         # job is still waiting
2287         continue
2288       # good candidate found
2289       self.jobs.remove(job_data)
2290       return job_data
2291
2292     # no job found
2293     return self.jobs.pop(0)
2294
2295   def GetResults(self):
2296     """Wait for and return the results of all jobs.
2297
2298     @rtype: list
2299     @return: list of tuples (success, job results), in the same order
2300         as the submitted jobs; if a job has failed, instead of the result
2301         there will be the error message
2302
2303     """
2304     if not self.jobs:
2305       self.SubmitPending()
2306     results = []
2307     if self.verbose:
2308       ok_jobs = [row[2] for row in self.jobs if row[1]]
2309       if ok_jobs:
2310         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2311
2312     # first, remove any non-submitted jobs
2313     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2314     for idx, _, jid, name in failures:
2315       ToStderr("Failed to submit job for %s: %s", name, jid)
2316       results.append((idx, False, jid))
2317
2318     while self.jobs:
2319       (idx, _, jid, name) = self._ChooseJob()
2320       ToStdout("Waiting for job %s for %s...", jid, name)
2321       try:
2322         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2323         success = True
2324       except (errors.GenericError, luxi.ProtocolError), err:
2325         _, job_result = FormatError(err)
2326         success = False
2327         # the error message will always be shown, verbose or not
2328         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
2329
2330       results.append((idx, success, job_result))
2331
2332     # sort based on the index, then drop it
2333     results.sort()
2334     results = [i[1:] for i in results]
2335
2336     return results
2337
2338   def WaitOrShow(self, wait):
2339     """Wait for job results or only print the job IDs.
2340
2341     @type wait: boolean
2342     @param wait: whether to wait or not
2343
2344     """
2345     if wait:
2346       return self.GetResults()
2347     else:
2348       if not self.jobs:
2349         self.SubmitPending()
2350       for _, status, result, name in self.jobs:
2351         if status:
2352           ToStdout("%s: %s", result, name)
2353         else:
2354           ToStderr("Failure for %s: %s", name, result)