Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ d48c944b

History | View | Annotate | Download (68.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import pathutils
48
from ganeti import serializer
49
from ganeti import qlang
50

    
51

    
52
ON_OPT = cli_option("--on", default=False,
53
                    action="store_true", dest="on",
54
                    help="Recover from an EPO")
55

    
56
GROUPS_OPT = cli_option("--groups", default=False,
57
                        action="store_true", dest="groups",
58
                        help="Arguments are node groups instead of nodes")
59

    
60
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
61
                            help="Override interactive check for --no-voting",
62
                            default=False, action="store_true")
63

    
64
FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
65
                                help="Unconditionally distribute the"
66
                                " configuration, even if the queue"
67
                                " is drained",
68
                                default=False, action="store_true")
69

    
70
TO_OPT = cli_option("--to", default=None, type="string",
71
                    help="The Ganeti version to upgrade to")
72

    
73
RESUME_OPT = cli_option("--resume", default=False, action="store_true",
74
                        help="Resume any pending Ganeti upgrades")
75

    
76
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
77
_EPO_PING_TIMEOUT = 1 # 1 second
78
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
79

    
80

    
81
def _CheckNoLvmStorageOptDeprecated(opts):
82
  """Checks if the legacy option '--no-lvm-storage' is used.
83

84
  """
85
  if not opts.lvm_storage:
86
    ToStderr("The option --no-lvm-storage is no longer supported. If you want"
87
             " to disable lvm-based storage cluster-wide, use the option"
88
             " --enabled-disk-templates to disable all of these lvm-base disk "
89
             "  templates: %s" %
90
             utils.CommaJoin(constants.DTS_LVM))
91
    return 1
92

    
93

    
94
def _InitEnabledDiskTemplates(opts):
95
  """Initialize the list of enabled disk templates.
96

97
  """
98
  if opts.enabled_disk_templates:
99
    return opts.enabled_disk_templates.split(",")
100
  else:
101
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
102

    
103

    
104
def _InitVgName(opts, enabled_disk_templates):
105
  """Initialize the volume group name.
106

107
  @type enabled_disk_templates: list of strings
108
  @param enabled_disk_templates: cluster-wide enabled disk templates
109

110
  """
111
  vg_name = None
112
  if opts.vg_name is not None:
113
    vg_name = opts.vg_name
114
    if vg_name:
115
      if not utils.IsLvmEnabled(enabled_disk_templates):
116
        ToStdout("You specified a volume group with --vg-name, but you did not"
117
                 " enable any disk template that uses lvm.")
118
    elif utils.IsLvmEnabled(enabled_disk_templates):
119
      raise errors.OpPrereqError(
120
          "LVM disk templates are enabled, but vg name not set.")
121
  elif utils.IsLvmEnabled(enabled_disk_templates):
122
    vg_name = constants.DEFAULT_VG
123
  return vg_name
124

    
125

    
126
def _InitDrbdHelper(opts, enabled_disk_templates):
127
  """Initialize the DRBD usermode helper.
128

129
  """
130
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
131

    
132
  if not drbd_enabled and opts.drbd_helper is not None:
133
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
134
             " is not enabled.")
135

    
136
  if drbd_enabled:
137
    if opts.drbd_helper is None:
138
      return constants.DEFAULT_DRBD_HELPER
139
    if opts.drbd_helper == '':
140
      raise errors.OpPrereqError(
141
          "Unsetting the drbd usermode helper while enabling DRBD is not"
142
          " allowed.")
143

    
144
  return opts.drbd_helper
145

    
146

    
147
@UsesRPC
148
def InitCluster(opts, args):
149
  """Initialize the cluster.
150

151
  @param opts: the command line options selected by the user
152
  @type args: list
153
  @param args: should contain only one element, the desired
154
      cluster name
155
  @rtype: int
156
  @return: the desired exit code
157

158
  """
159
  if _CheckNoLvmStorageOptDeprecated(opts):
160
    return 1
161

    
162
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
163

    
164
  try:
165
    vg_name = _InitVgName(opts, enabled_disk_templates)
166
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
167
  except errors.OpPrereqError, e:
168
    ToStderr(str(e))
169
    return 1
170

    
171
  master_netdev = opts.master_netdev
172
  if master_netdev is None:
173
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
174
    if not nic_mode:
175
      # default case, use bridging
176
      master_netdev = constants.DEFAULT_BRIDGE
177
    elif nic_mode == constants.NIC_MODE_OVS:
178
      # default ovs is different from default bridge
179
      master_netdev = constants.DEFAULT_OVS
180
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
181

    
182
  hvlist = opts.enabled_hypervisors
183
  if hvlist is None:
184
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
185
  hvlist = hvlist.split(",")
186

    
187
  hvparams = dict(opts.hvparams)
188
  beparams = opts.beparams
189
  nicparams = opts.nicparams
190

    
191
  diskparams = dict(opts.diskparams)
192

    
193
  # check the disk template types here, as we cannot rely on the type check done
194
  # by the opcode parameter types
195
  diskparams_keys = set(diskparams.keys())
196
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
197
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
198
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
199
    return 1
200

    
201
  # prepare beparams dict
202
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
203
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
204

    
205
  # prepare nicparams dict
206
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
207
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
208

    
209
  # prepare ndparams dict
210
  if opts.ndparams is None:
211
    ndparams = dict(constants.NDC_DEFAULTS)
212
  else:
213
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
214
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
215

    
216
  # prepare hvparams dict
217
  for hv in constants.HYPER_TYPES:
218
    if hv not in hvparams:
219
      hvparams[hv] = {}
220
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
221
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
222

    
223
  # prepare diskparams dict
224
  for templ in constants.DISK_TEMPLATES:
225
    if templ not in diskparams:
226
      diskparams[templ] = {}
227
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
228
                                         diskparams[templ])
229
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
230

    
231
  # prepare ipolicy dict
232
  ipolicy = CreateIPolicyFromOpts(
233
    ispecs_mem_size=opts.ispecs_mem_size,
234
    ispecs_cpu_count=opts.ispecs_cpu_count,
235
    ispecs_disk_count=opts.ispecs_disk_count,
236
    ispecs_disk_size=opts.ispecs_disk_size,
237
    ispecs_nic_count=opts.ispecs_nic_count,
238
    minmax_ispecs=opts.ipolicy_bounds_specs,
239
    std_ispecs=opts.ipolicy_std_specs,
240
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
241
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
242
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
243
    fill_all=True)
244

    
245
  if opts.candidate_pool_size is None:
246
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
247

    
248
  if opts.mac_prefix is None:
249
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
250

    
251
  uid_pool = opts.uid_pool
252
  if uid_pool is not None:
253
    uid_pool = uidpool.ParseUidPool(uid_pool)
254

    
255
  if opts.prealloc_wipe_disks is None:
256
    opts.prealloc_wipe_disks = False
257

    
258
  external_ip_setup_script = opts.use_external_mip_script
259
  if external_ip_setup_script is None:
260
    external_ip_setup_script = False
261

    
262
  try:
263
    primary_ip_version = int(opts.primary_ip_version)
264
  except (ValueError, TypeError), err:
265
    ToStderr("Invalid primary ip version value: %s" % str(err))
266
    return 1
267

    
268
  master_netmask = opts.master_netmask
269
  try:
270
    if master_netmask is not None:
271
      master_netmask = int(master_netmask)
272
  except (ValueError, TypeError), err:
273
    ToStderr("Invalid master netmask value: %s" % str(err))
274
    return 1
275

    
276
  if opts.disk_state:
277
    disk_state = utils.FlatToDict(opts.disk_state)
278
  else:
279
    disk_state = {}
280

    
281
  hv_state = dict(opts.hv_state)
282

    
283
  bootstrap.InitCluster(cluster_name=args[0],
284
                        secondary_ip=opts.secondary_ip,
285
                        vg_name=vg_name,
286
                        mac_prefix=opts.mac_prefix,
287
                        master_netmask=master_netmask,
288
                        master_netdev=master_netdev,
289
                        file_storage_dir=opts.file_storage_dir,
290
                        shared_file_storage_dir=opts.shared_file_storage_dir,
291
                        enabled_hypervisors=hvlist,
292
                        hvparams=hvparams,
293
                        beparams=beparams,
294
                        nicparams=nicparams,
295
                        ndparams=ndparams,
296
                        diskparams=diskparams,
297
                        ipolicy=ipolicy,
298
                        candidate_pool_size=opts.candidate_pool_size,
299
                        modify_etc_hosts=opts.modify_etc_hosts,
300
                        modify_ssh_setup=opts.modify_ssh_setup,
301
                        maintain_node_health=opts.maintain_node_health,
302
                        drbd_helper=drbd_helper,
303
                        uid_pool=uid_pool,
304
                        default_iallocator=opts.default_iallocator,
305
                        primary_ip_version=primary_ip_version,
306
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
307
                        use_external_mip_script=external_ip_setup_script,
308
                        hv_state=hv_state,
309
                        disk_state=disk_state,
310
                        enabled_disk_templates=enabled_disk_templates,
311
                        )
312
  op = opcodes.OpClusterPostInit()
313
  SubmitOpCode(op, opts=opts)
314
  return 0
315

    
316

    
317
@UsesRPC
318
def DestroyCluster(opts, args):
319
  """Destroy the cluster.
320

321
  @param opts: the command line options selected by the user
322
  @type args: list
323
  @param args: should be an empty list
324
  @rtype: int
325
  @return: the desired exit code
326

327
  """
328
  if not opts.yes_do_it:
329
    ToStderr("Destroying a cluster is irreversible. If you really want"
330
             " destroy this cluster, supply the --yes-do-it option.")
331
    return 1
332

    
333
  op = opcodes.OpClusterDestroy()
334
  master_uuid = SubmitOpCode(op, opts=opts)
335
  # if we reached this, the opcode didn't fail; we can proceed to
336
  # shutdown all the daemons
337
  bootstrap.FinalizeClusterDestroy(master_uuid)
338
  return 0
339

    
340

    
341
def RenameCluster(opts, args):
342
  """Rename the cluster.
343

344
  @param opts: the command line options selected by the user
345
  @type args: list
346
  @param args: should contain only one element, the new cluster name
347
  @rtype: int
348
  @return: the desired exit code
349

350
  """
351
  cl = GetClient()
352

    
353
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
354

    
355
  new_name = args[0]
356
  if not opts.force:
357
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
358
                " connected over the network to the cluster name, the"
359
                " operation is very dangerous as the IP address will be"
360
                " removed from the node and the change may not go through."
361
                " Continue?") % (cluster_name, new_name)
362
    if not AskUser(usertext):
363
      return 1
364

    
365
  op = opcodes.OpClusterRename(name=new_name)
366
  result = SubmitOpCode(op, opts=opts, cl=cl)
367

    
368
  if result:
369
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
370

    
371
  return 0
372

    
373

    
374
def ActivateMasterIp(opts, args):
375
  """Activates the master IP.
376

377
  """
378
  op = opcodes.OpClusterActivateMasterIp()
379
  SubmitOpCode(op)
380
  return 0
381

    
382

    
383
def DeactivateMasterIp(opts, args):
384
  """Deactivates the master IP.
385

386
  """
387
  if not opts.confirm:
388
    usertext = ("This will disable the master IP. All the open connections to"
389
                " the master IP will be closed. To reach the master you will"
390
                " need to use its node IP."
391
                " Continue?")
392
    if not AskUser(usertext):
393
      return 1
394

    
395
  op = opcodes.OpClusterDeactivateMasterIp()
396
  SubmitOpCode(op)
397
  return 0
398

    
399

    
400
def RedistributeConfig(opts, args):
401
  """Forces push of the cluster configuration.
402

403
  @param opts: the command line options selected by the user
404
  @type args: list
405
  @param args: empty list
406
  @rtype: int
407
  @return: the desired exit code
408

409
  """
410
  op = opcodes.OpClusterRedistConf()
411
  if opts.yes_do_it:
412
    SubmitOpCodeToDrainedQueue(op)
413
  else:
414
    SubmitOrSend(op, opts)
415
  return 0
416

    
417

    
418
def ShowClusterVersion(opts, args):
419
  """Write version of ganeti software to the standard output.
420

421
  @param opts: the command line options selected by the user
422
  @type args: list
423
  @param args: should be an empty list
424
  @rtype: int
425
  @return: the desired exit code
426

427
  """
428
  cl = GetClient(query=True)
429
  result = cl.QueryClusterInfo()
430
  ToStdout("Software version: %s", result["software_version"])
431
  ToStdout("Internode protocol: %s", result["protocol_version"])
432
  ToStdout("Configuration format: %s", result["config_version"])
433
  ToStdout("OS api version: %s", result["os_api_version"])
434
  ToStdout("Export interface: %s", result["export_version"])
435
  ToStdout("VCS version: %s", result["vcs_version"])
436
  return 0
437

    
438

    
439
def ShowClusterMaster(opts, args):
440
  """Write name of master node to the standard output.
441

442
  @param opts: the command line options selected by the user
443
  @type args: list
444
  @param args: should be an empty list
445
  @rtype: int
446
  @return: the desired exit code
447

448
  """
449
  master = bootstrap.GetMaster()
450
  ToStdout(master)
451
  return 0
452

    
453

    
454
def _FormatGroupedParams(paramsdict, roman=False):
455
  """Format Grouped parameters (be, nic, disk) by group.
456

457
  @type paramsdict: dict of dicts
458
  @param paramsdict: {group: {param: value, ...}, ...}
459
  @rtype: dict of dicts
460
  @return: copy of the input dictionaries with strings as values
461

462
  """
463
  ret = {}
464
  for (item, val) in paramsdict.items():
465
    if isinstance(val, dict):
466
      ret[item] = _FormatGroupedParams(val, roman=roman)
467
    elif roman and isinstance(val, int):
468
      ret[item] = compat.TryToRoman(val)
469
    else:
470
      ret[item] = str(val)
471
  return ret
472

    
473

    
474
def ShowClusterConfig(opts, args):
475
  """Shows cluster information.
476

477
  @param opts: the command line options selected by the user
478
  @type args: list
479
  @param args: should be an empty list
480
  @rtype: int
481
  @return: the desired exit code
482

483
  """
484
  cl = GetClient(query=True)
485
  result = cl.QueryClusterInfo()
486

    
487
  if result["tags"]:
488
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
489
  else:
490
    tags = "(none)"
491
  if result["reserved_lvs"]:
492
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
493
  else:
494
    reserved_lvs = "(none)"
495

    
496
  enabled_hv = result["enabled_hypervisors"]
497
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
498
                  if k in enabled_hv)
499

    
500
  info = [
501
    ("Cluster name", result["name"]),
502
    ("Cluster UUID", result["uuid"]),
503

    
504
    ("Creation time", utils.FormatTime(result["ctime"])),
505
    ("Modification time", utils.FormatTime(result["mtime"])),
506

    
507
    ("Master node", result["master"]),
508

    
509
    ("Architecture (this node)",
510
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
511

    
512
    ("Tags", tags),
513

    
514
    ("Default hypervisor", result["default_hypervisor"]),
515
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
516

    
517
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
518

    
519
    ("OS-specific hypervisor parameters",
520
     _FormatGroupedParams(result["os_hvp"])),
521

    
522
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
523

    
524
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
525
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
526

    
527
    ("Cluster parameters", [
528
      ("candidate pool size",
529
       compat.TryToRoman(result["candidate_pool_size"],
530
                         convert=opts.roman_integers)),
531
      ("master netdev", result["master_netdev"]),
532
      ("master netmask", result["master_netmask"]),
533
      ("use external master IP address setup script",
534
       result["use_external_mip_script"]),
535
      ("lvm volume group", result["volume_group_name"]),
536
      ("lvm reserved volumes", reserved_lvs),
537
      ("drbd usermode helper", result["drbd_usermode_helper"]),
538
      ("file storage path", result["file_storage_dir"]),
539
      ("shared file storage path", result["shared_file_storage_dir"]),
540
      ("maintenance of node health", result["maintain_node_health"]),
541
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
542
      ("default instance allocator", result["default_iallocator"]),
543
      ("primary ip version", result["primary_ip_version"]),
544
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
545
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
546
      ("ExtStorage Providers search path",
547
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
548
      ("enabled disk templates",
549
       utils.CommaJoin(result["enabled_disk_templates"])),
550
      ]),
551

    
552
    ("Default node parameters",
553
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
554

    
555
    ("Default instance parameters",
556
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
557

    
558
    ("Default nic parameters",
559
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
560

    
561
    ("Default disk parameters",
562
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
563

    
564
    ("Instance policy - limits for instances",
565
     FormatPolicyInfo(result["ipolicy"], None, True)),
566
    ]
567

    
568
  PrintGenericInfo(info)
569
  return 0
570

    
571

    
572
def ClusterCopyFile(opts, args):
573
  """Copy a file from master to some nodes.
574

575
  @param opts: the command line options selected by the user
576
  @type args: list
577
  @param args: should contain only one element, the path of
578
      the file to be copied
579
  @rtype: int
580
  @return: the desired exit code
581

582
  """
583
  filename = args[0]
584
  if not os.path.exists(filename):
585
    raise errors.OpPrereqError("No such filename '%s'" % filename,
586
                               errors.ECODE_INVAL)
587

    
588
  cl = GetClient()
589

    
590
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
591

    
592
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
593
                           secondary_ips=opts.use_replication_network,
594
                           nodegroup=opts.nodegroup)
595

    
596
  srun = ssh.SshRunner(cluster_name)
597
  for node in results:
598
    if not srun.CopyFileToNode(node, filename):
599
      ToStderr("Copy of file %s to node %s failed", filename, node)
600

    
601
  return 0
602

    
603

    
604
def RunClusterCommand(opts, args):
605
  """Run a command on some nodes.
606

607
  @param opts: the command line options selected by the user
608
  @type args: list
609
  @param args: should contain the command to be run and its arguments
610
  @rtype: int
611
  @return: the desired exit code
612

613
  """
614
  cl = GetClient()
615

    
616
  command = " ".join(args)
617

    
618
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
619

    
620
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
621
                                                    "master_node"])
622

    
623
  srun = ssh.SshRunner(cluster_name=cluster_name)
624

    
625
  # Make sure master node is at list end
626
  if master_node in nodes:
627
    nodes.remove(master_node)
628
    nodes.append(master_node)
629

    
630
  for name in nodes:
631
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
632

    
633
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
634
      # Do not output anything for successful commands
635
      continue
636

    
637
    ToStdout("------------------------------------------------")
638
    if opts.show_machine_names:
639
      for line in result.output.splitlines():
640
        ToStdout("%s: %s", name, line)
641
    else:
642
      ToStdout("node: %s", name)
643
      ToStdout("%s", result.output)
644
    ToStdout("return code = %s", result.exit_code)
645

    
646
  return 0
647

    
648

    
649
def VerifyCluster(opts, args):
650
  """Verify integrity of cluster, performing various test on nodes.
651

652
  @param opts: the command line options selected by the user
653
  @type args: list
654
  @param args: should be an empty list
655
  @rtype: int
656
  @return: the desired exit code
657

658
  """
659
  skip_checks = []
660

    
661
  if opts.skip_nplusone_mem:
662
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
663

    
664
  cl = GetClient()
665

    
666
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
667
                               error_codes=opts.error_codes,
668
                               debug_simulate_errors=opts.simulate_errors,
669
                               skip_checks=skip_checks,
670
                               ignore_errors=opts.ignore_errors,
671
                               group_name=opts.nodegroup)
672
  result = SubmitOpCode(op, cl=cl, opts=opts)
673

    
674
  # Keep track of submitted jobs
675
  jex = JobExecutor(cl=cl, opts=opts)
676

    
677
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
678
    jex.AddJobId(None, status, job_id)
679

    
680
  results = jex.GetResults()
681

    
682
  (bad_jobs, bad_results) = \
683
    map(len,
684
        # Convert iterators to lists
685
        map(list,
686
            # Count errors
687
            map(compat.partial(itertools.ifilterfalse, bool),
688
                # Convert result to booleans in a tuple
689
                zip(*((job_success, len(op_results) == 1 and op_results[0])
690
                      for (job_success, op_results) in results)))))
691

    
692
  if bad_jobs == 0 and bad_results == 0:
693
    rcode = constants.EXIT_SUCCESS
694
  else:
695
    rcode = constants.EXIT_FAILURE
696
    if bad_jobs > 0:
697
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
698

    
699
  return rcode
700

    
701

    
702
def VerifyDisks(opts, args):
703
  """Verify integrity of cluster disks.
704

705
  @param opts: the command line options selected by the user
706
  @type args: list
707
  @param args: should be an empty list
708
  @rtype: int
709
  @return: the desired exit code
710

711
  """
712
  cl = GetClient()
713

    
714
  op = opcodes.OpClusterVerifyDisks()
715

    
716
  result = SubmitOpCode(op, cl=cl, opts=opts)
717

    
718
  # Keep track of submitted jobs
719
  jex = JobExecutor(cl=cl, opts=opts)
720

    
721
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
722
    jex.AddJobId(None, status, job_id)
723

    
724
  retcode = constants.EXIT_SUCCESS
725

    
726
  for (status, result) in jex.GetResults():
727
    if not status:
728
      ToStdout("Job failed: %s", result)
729
      continue
730

    
731
    ((bad_nodes, instances, missing), ) = result
732

    
733
    for node, text in bad_nodes.items():
734
      ToStdout("Error gathering data on node %s: %s",
735
               node, utils.SafeEncode(text[-400:]))
736
      retcode = constants.EXIT_FAILURE
737
      ToStdout("You need to fix these nodes first before fixing instances")
738

    
739
    for iname in instances:
740
      if iname in missing:
741
        continue
742
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
743
      try:
744
        ToStdout("Activating disks for instance '%s'", iname)
745
        SubmitOpCode(op, opts=opts, cl=cl)
746
      except errors.GenericError, err:
747
        nret, msg = FormatError(err)
748
        retcode |= nret
749
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
750

    
751
    if missing:
752
      for iname, ival in missing.iteritems():
753
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
754
        if all_missing:
755
          ToStdout("Instance %s cannot be verified as it lives on"
756
                   " broken nodes", iname)
757
        else:
758
          ToStdout("Instance %s has missing logical volumes:", iname)
759
          ival.sort()
760
          for node, vol in ival:
761
            if node in bad_nodes:
762
              ToStdout("\tbroken node %s /dev/%s", node, vol)
763
            else:
764
              ToStdout("\t%s /dev/%s", node, vol)
765

    
766
      ToStdout("You need to replace or recreate disks for all the above"
767
               " instances if this message persists after fixing broken nodes.")
768
      retcode = constants.EXIT_FAILURE
769
    elif not instances:
770
      ToStdout("No disks need to be activated.")
771

    
772
  return retcode
773

    
774

    
775
def RepairDiskSizes(opts, args):
776
  """Verify sizes of cluster disks.
777

778
  @param opts: the command line options selected by the user
779
  @type args: list
780
  @param args: optional list of instances to restrict check to
781
  @rtype: int
782
  @return: the desired exit code
783

784
  """
785
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
786
  SubmitOpCode(op, opts=opts)
787

    
788

    
789
@UsesRPC
790
def MasterFailover(opts, args):
791
  """Failover the master node.
792

793
  This command, when run on a non-master node, will cause the current
794
  master to cease being master, and the non-master to become new
795
  master.
796

797
  @param opts: the command line options selected by the user
798
  @type args: list
799
  @param args: should be an empty list
800
  @rtype: int
801
  @return: the desired exit code
802

803
  """
804
  if opts.no_voting and not opts.yes_do_it:
805
    usertext = ("This will perform the failover even if most other nodes"
806
                " are down, or if this node is outdated. This is dangerous"
807
                " as it can lead to a non-consistent cluster. Check the"
808
                " gnt-cluster(8) man page before proceeding. Continue?")
809
    if not AskUser(usertext):
810
      return 1
811

    
812
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
813

    
814

    
815
def MasterPing(opts, args):
816
  """Checks if the master is alive.
817

818
  @param opts: the command line options selected by the user
819
  @type args: list
820
  @param args: should be an empty list
821
  @rtype: int
822
  @return: the desired exit code
823

824
  """
825
  try:
826
    cl = GetClient()
827
    cl.QueryClusterInfo()
828
    return 0
829
  except Exception: # pylint: disable=W0703
830
    return 1
831

    
832

    
833
def SearchTags(opts, args):
834
  """Searches the tags on all the cluster.
835

836
  @param opts: the command line options selected by the user
837
  @type args: list
838
  @param args: should contain only one element, the tag pattern
839
  @rtype: int
840
  @return: the desired exit code
841

842
  """
843
  op = opcodes.OpTagsSearch(pattern=args[0])
844
  result = SubmitOpCode(op, opts=opts)
845
  if not result:
846
    return 1
847
  result = list(result)
848
  result.sort()
849
  for path, tag in result:
850
    ToStdout("%s %s", path, tag)
851

    
852

    
853
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
854
  """Reads and verifies an X509 certificate.
855

856
  @type cert_filename: string
857
  @param cert_filename: the path of the file containing the certificate to
858
                        verify encoded in PEM format
859
  @type verify_private_key: bool
860
  @param verify_private_key: whether to verify the private key in addition to
861
                             the public certificate
862
  @rtype: string
863
  @return: a string containing the PEM-encoded certificate.
864

865
  """
866
  try:
867
    pem = utils.ReadFile(cert_filename)
868
  except IOError, err:
869
    raise errors.X509CertError(cert_filename,
870
                               "Unable to read certificate: %s" % str(err))
871

    
872
  try:
873
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
874
  except Exception, err:
875
    raise errors.X509CertError(cert_filename,
876
                               "Unable to load certificate: %s" % str(err))
877

    
878
  if verify_private_key:
879
    try:
880
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
881
    except Exception, err:
882
      raise errors.X509CertError(cert_filename,
883
                                 "Unable to load private key: %s" % str(err))
884

    
885
  return pem
886

    
887

    
888
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
889
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
890
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
891
                 cds_filename, force):
892
  """Renews cluster certificates, keys and secrets.
893

894
  @type new_cluster_cert: bool
895
  @param new_cluster_cert: Whether to generate a new cluster certificate
896
  @type new_rapi_cert: bool
897
  @param new_rapi_cert: Whether to generate a new RAPI certificate
898
  @type rapi_cert_filename: string
899
  @param rapi_cert_filename: Path to file containing new RAPI certificate
900
  @type new_spice_cert: bool
901
  @param new_spice_cert: Whether to generate a new SPICE certificate
902
  @type spice_cert_filename: string
903
  @param spice_cert_filename: Path to file containing new SPICE certificate
904
  @type spice_cacert_filename: string
905
  @param spice_cacert_filename: Path to file containing the certificate of the
906
                                CA that signed the SPICE certificate
907
  @type new_confd_hmac_key: bool
908
  @param new_confd_hmac_key: Whether to generate a new HMAC key
909
  @type new_cds: bool
910
  @param new_cds: Whether to generate a new cluster domain secret
911
  @type cds_filename: string
912
  @param cds_filename: Path to file containing new cluster domain secret
913
  @type force: bool
914
  @param force: Whether to ask user for confirmation
915

916
  """
917
  if new_rapi_cert and rapi_cert_filename:
918
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
919
             " options can be specified at the same time.")
920
    return 1
921

    
922
  if new_cds and cds_filename:
923
    ToStderr("Only one of the --new-cluster-domain-secret and"
924
             " --cluster-domain-secret options can be specified at"
925
             " the same time.")
926
    return 1
927

    
928
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
929
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
930
             " and --spice-ca-certificate must not be used.")
931
    return 1
932

    
933
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
934
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
935
             " specified.")
936
    return 1
937

    
938
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
939
  try:
940
    if rapi_cert_filename:
941
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
942
    if spice_cert_filename:
943
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
944
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
945
  except errors.X509CertError, err:
946
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
947
    return 1
948

    
949
  if cds_filename:
950
    try:
951
      cds = utils.ReadFile(cds_filename)
952
    except Exception, err: # pylint: disable=W0703
953
      ToStderr("Can't load new cluster domain secret from %s: %s" %
954
               (cds_filename, str(err)))
955
      return 1
956
  else:
957
    cds = None
958

    
959
  if not force:
960
    usertext = ("This requires all daemons on all nodes to be restarted and"
961
                " may take some time. Continue?")
962
    if not AskUser(usertext):
963
      return 1
964

    
965
  def _RenewCryptoInner(ctx):
966
    ctx.feedback_fn("Updating certificates and keys")
967
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
968
                                    new_rapi_cert,
969
                                    new_spice_cert,
970
                                    new_confd_hmac_key,
971
                                    new_cds,
972
                                    rapi_cert_pem=rapi_cert_pem,
973
                                    spice_cert_pem=spice_cert_pem,
974
                                    spice_cacert_pem=spice_cacert_pem,
975
                                    cds=cds)
976

    
977
    files_to_copy = []
978

    
979
    if new_cluster_cert:
980
      files_to_copy.append(pathutils.NODED_CERT_FILE)
981

    
982
    if new_rapi_cert or rapi_cert_pem:
983
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
984

    
985
    if new_spice_cert or spice_cert_pem:
986
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
987
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
988

    
989
    if new_confd_hmac_key:
990
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
991

    
992
    if new_cds or cds:
993
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
994

    
995
    if files_to_copy:
996
      for node_name in ctx.nonmaster_nodes:
997
        ctx.feedback_fn("Copying %s to %s" %
998
                        (", ".join(files_to_copy), node_name))
999
        for file_name in files_to_copy:
1000
          ctx.ssh.CopyFileToNode(node_name, file_name)
1001

    
1002
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1003

    
1004
  ToStdout("All requested certificates and keys have been replaced."
1005
           " Running \"gnt-cluster verify\" now is recommended.")
1006

    
1007
  return 0
1008

    
1009

    
1010
def RenewCrypto(opts, args):
1011
  """Renews cluster certificates, keys and secrets.
1012

1013
  """
1014
  return _RenewCrypto(opts.new_cluster_cert,
1015
                      opts.new_rapi_cert,
1016
                      opts.rapi_cert,
1017
                      opts.new_spice_cert,
1018
                      opts.spice_cert,
1019
                      opts.spice_cacert,
1020
                      opts.new_confd_hmac_key,
1021
                      opts.new_cluster_domain_secret,
1022
                      opts.cluster_domain_secret,
1023
                      opts.force)
1024

    
1025

    
1026
def _GetEnabledDiskTemplates(opts):
1027
  """Determine the list of enabled disk templates.
1028

1029
  """
1030
  if opts.enabled_disk_templates:
1031
    return opts.enabled_disk_templates.split(",")
1032
  else:
1033
    return None
1034

    
1035

    
1036
def _GetVgName(opts, enabled_disk_templates):
1037
  """Determine the volume group name.
1038

1039
  @type enabled_disk_templates: list of strings
1040
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1041

1042
  """
1043
  # consistency between vg name and enabled disk templates
1044
  vg_name = None
1045
  if opts.vg_name is not None:
1046
    vg_name = opts.vg_name
1047
  if enabled_disk_templates:
1048
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1049
      ToStdout("You specified a volume group with --vg-name, but you did not"
1050
               " enable any of the following lvm-based disk templates: %s" %
1051
               utils.CommaJoin(constants.DTS_LVM))
1052
  return vg_name
1053

    
1054

    
1055
def _GetDrbdHelper(opts, enabled_disk_templates):
1056
  """Determine the DRBD usermode helper.
1057

1058
  """
1059
  drbd_helper = opts.drbd_helper
1060
  if enabled_disk_templates:
1061
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1062
    if not drbd_enabled and opts.drbd_helper:
1063
      ToStdout("You specified a DRBD usermode helper with "
1064
               " --drbd-usermode-helper while DRBD is not enabled.")
1065
  return drbd_helper
1066

    
1067

    
1068
def SetClusterParams(opts, args):
1069
  """Modify the cluster.
1070

1071
  @param opts: the command line options selected by the user
1072
  @type args: list
1073
  @param args: should be an empty list
1074
  @rtype: int
1075
  @return: the desired exit code
1076

1077
  """
1078
  if not (opts.vg_name is not None or
1079
          opts.drbd_helper is not None or
1080
          opts.enabled_hypervisors or opts.hvparams or
1081
          opts.beparams or opts.nicparams or
1082
          opts.ndparams or opts.diskparams or
1083
          opts.candidate_pool_size is not None or
1084
          opts.uid_pool is not None or
1085
          opts.maintain_node_health is not None or
1086
          opts.add_uids is not None or
1087
          opts.remove_uids is not None or
1088
          opts.default_iallocator is not None or
1089
          opts.reserved_lvs is not None or
1090
          opts.master_netdev is not None or
1091
          opts.master_netmask is not None or
1092
          opts.use_external_mip_script is not None or
1093
          opts.prealloc_wipe_disks is not None or
1094
          opts.hv_state or
1095
          opts.enabled_disk_templates or
1096
          opts.disk_state or
1097
          opts.ipolicy_bounds_specs is not None or
1098
          opts.ipolicy_std_specs is not None or
1099
          opts.ipolicy_disk_templates is not None or
1100
          opts.ipolicy_vcpu_ratio is not None or
1101
          opts.ipolicy_spindle_ratio is not None or
1102
          opts.modify_etc_hosts is not None or
1103
          opts.file_storage_dir is not None):
1104
    ToStderr("Please give at least one of the parameters.")
1105
    return 1
1106

    
1107
  if _CheckNoLvmStorageOptDeprecated(opts):
1108
    return 1
1109

    
1110
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1111
  vg_name = _GetVgName(opts, enabled_disk_templates)
1112

    
1113
  try:
1114
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1115
  except errors.OpPrereqError, e:
1116
    ToStderr(str(e))
1117
    return 1
1118

    
1119
  hvlist = opts.enabled_hypervisors
1120
  if hvlist is not None:
1121
    hvlist = hvlist.split(",")
1122

    
1123
  # a list of (name, dict) we can pass directly to dict() (or [])
1124
  hvparams = dict(opts.hvparams)
1125
  for hv_params in hvparams.values():
1126
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1127

    
1128
  diskparams = dict(opts.diskparams)
1129

    
1130
  for dt_params in diskparams.values():
1131
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1132

    
1133
  beparams = opts.beparams
1134
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1135

    
1136
  nicparams = opts.nicparams
1137
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1138

    
1139
  ndparams = opts.ndparams
1140
  if ndparams is not None:
1141
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1142

    
1143
  ipolicy = CreateIPolicyFromOpts(
1144
    minmax_ispecs=opts.ipolicy_bounds_specs,
1145
    std_ispecs=opts.ipolicy_std_specs,
1146
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1147
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1148
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1149
    )
1150

    
1151
  mnh = opts.maintain_node_health
1152

    
1153
  uid_pool = opts.uid_pool
1154
  if uid_pool is not None:
1155
    uid_pool = uidpool.ParseUidPool(uid_pool)
1156

    
1157
  add_uids = opts.add_uids
1158
  if add_uids is not None:
1159
    add_uids = uidpool.ParseUidPool(add_uids)
1160

    
1161
  remove_uids = opts.remove_uids
1162
  if remove_uids is not None:
1163
    remove_uids = uidpool.ParseUidPool(remove_uids)
1164

    
1165
  if opts.reserved_lvs is not None:
1166
    if opts.reserved_lvs == "":
1167
      opts.reserved_lvs = []
1168
    else:
1169
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1170

    
1171
  if opts.master_netmask is not None:
1172
    try:
1173
      opts.master_netmask = int(opts.master_netmask)
1174
    except ValueError:
1175
      ToStderr("The --master-netmask option expects an int parameter.")
1176
      return 1
1177

    
1178
  ext_ip_script = opts.use_external_mip_script
1179

    
1180
  if opts.disk_state:
1181
    disk_state = utils.FlatToDict(opts.disk_state)
1182
  else:
1183
    disk_state = {}
1184

    
1185
  hv_state = dict(opts.hv_state)
1186

    
1187
  op = opcodes.OpClusterSetParams(
1188
    vg_name=vg_name,
1189
    drbd_helper=drbd_helper,
1190
    enabled_hypervisors=hvlist,
1191
    hvparams=hvparams,
1192
    os_hvp=None,
1193
    beparams=beparams,
1194
    nicparams=nicparams,
1195
    ndparams=ndparams,
1196
    diskparams=diskparams,
1197
    ipolicy=ipolicy,
1198
    candidate_pool_size=opts.candidate_pool_size,
1199
    maintain_node_health=mnh,
1200
    modify_etc_hosts=opts.modify_etc_hosts,
1201
    uid_pool=uid_pool,
1202
    add_uids=add_uids,
1203
    remove_uids=remove_uids,
1204
    default_iallocator=opts.default_iallocator,
1205
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1206
    master_netdev=opts.master_netdev,
1207
    master_netmask=opts.master_netmask,
1208
    reserved_lvs=opts.reserved_lvs,
1209
    use_external_mip_script=ext_ip_script,
1210
    hv_state=hv_state,
1211
    disk_state=disk_state,
1212
    enabled_disk_templates=enabled_disk_templates,
1213
    force=opts.force,
1214
    file_storage_dir=opts.file_storage_dir,
1215
    )
1216
  SubmitOrSend(op, opts)
1217
  return 0
1218

    
1219

    
1220
def QueueOps(opts, args):
1221
  """Queue operations.
1222

1223
  @param opts: the command line options selected by the user
1224
  @type args: list
1225
  @param args: should contain only one element, the subcommand
1226
  @rtype: int
1227
  @return: the desired exit code
1228

1229
  """
1230
  command = args[0]
1231
  client = GetClient()
1232
  if command in ("drain", "undrain"):
1233
    drain_flag = command == "drain"
1234
    client.SetQueueDrainFlag(drain_flag)
1235
  elif command == "info":
1236
    result = client.QueryConfigValues(["drain_flag"])
1237
    if result[0]:
1238
      val = "set"
1239
    else:
1240
      val = "unset"
1241
    ToStdout("The drain flag is %s" % val)
1242
  else:
1243
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1244
                               errors.ECODE_INVAL)
1245

    
1246
  return 0
1247

    
1248

    
1249
def _ShowWatcherPause(until):
1250
  if until is None or until < time.time():
1251
    ToStdout("The watcher is not paused.")
1252
  else:
1253
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1254

    
1255

    
1256
def WatcherOps(opts, args):
1257
  """Watcher operations.
1258

1259
  @param opts: the command line options selected by the user
1260
  @type args: list
1261
  @param args: should contain only one element, the subcommand
1262
  @rtype: int
1263
  @return: the desired exit code
1264

1265
  """
1266
  command = args[0]
1267
  client = GetClient()
1268

    
1269
  if command == "continue":
1270
    client.SetWatcherPause(None)
1271
    ToStdout("The watcher is no longer paused.")
1272

    
1273
  elif command == "pause":
1274
    if len(args) < 2:
1275
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1276

    
1277
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1278
    _ShowWatcherPause(result)
1279

    
1280
  elif command == "info":
1281
    result = client.QueryConfigValues(["watcher_pause"])
1282
    _ShowWatcherPause(result[0])
1283

    
1284
  else:
1285
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1286
                               errors.ECODE_INVAL)
1287

    
1288
  return 0
1289

    
1290

    
1291
def _OobPower(opts, node_list, power):
1292
  """Puts the node in the list to desired power state.
1293

1294
  @param opts: The command line options selected by the user
1295
  @param node_list: The list of nodes to operate on
1296
  @param power: True if they should be powered on, False otherwise
1297
  @return: The success of the operation (none failed)
1298

1299
  """
1300
  if power:
1301
    command = constants.OOB_POWER_ON
1302
  else:
1303
    command = constants.OOB_POWER_OFF
1304

    
1305
  op = opcodes.OpOobCommand(node_names=node_list,
1306
                            command=command,
1307
                            ignore_status=True,
1308
                            timeout=opts.oob_timeout,
1309
                            power_delay=opts.power_delay)
1310
  result = SubmitOpCode(op, opts=opts)
1311
  errs = 0
1312
  for node_result in result:
1313
    (node_tuple, data_tuple) = node_result
1314
    (_, node_name) = node_tuple
1315
    (data_status, _) = data_tuple
1316
    if data_status != constants.RS_NORMAL:
1317
      assert data_status != constants.RS_UNAVAIL
1318
      errs += 1
1319
      ToStderr("There was a problem changing power for %s, please investigate",
1320
               node_name)
1321

    
1322
  if errs > 0:
1323
    return False
1324

    
1325
  return True
1326

    
1327

    
1328
def _InstanceStart(opts, inst_list, start, no_remember=False):
1329
  """Puts the instances in the list to desired state.
1330

1331
  @param opts: The command line options selected by the user
1332
  @param inst_list: The list of instances to operate on
1333
  @param start: True if they should be started, False for shutdown
1334
  @param no_remember: If the instance state should be remembered
1335
  @return: The success of the operation (none failed)
1336

1337
  """
1338
  if start:
1339
    opcls = opcodes.OpInstanceStartup
1340
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1341
  else:
1342
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1343
                           timeout=opts.shutdown_timeout,
1344
                           no_remember=no_remember)
1345
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1346

    
1347
  jex = JobExecutor(opts=opts)
1348

    
1349
  for inst in inst_list:
1350
    ToStdout("Submit %s of instance %s", text_submit, inst)
1351
    op = opcls(instance_name=inst)
1352
    jex.QueueJob(inst, op)
1353

    
1354
  results = jex.GetResults()
1355
  bad_cnt = len([1 for (success, _) in results if not success])
1356

    
1357
  if bad_cnt == 0:
1358
    ToStdout("All instances have been %s successfully", text_success)
1359
  else:
1360
    ToStderr("There were errors while %s instances:\n"
1361
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1362
             len(results))
1363
    return False
1364

    
1365
  return True
1366

    
1367

    
1368
class _RunWhenNodesReachableHelper:
1369
  """Helper class to make shared internal state sharing easier.
1370

1371
  @ivar success: Indicates if all action_cb calls were successful
1372

1373
  """
1374
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1375
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1376
    """Init the object.
1377

1378
    @param node_list: The list of nodes to be reachable
1379
    @param action_cb: Callback called when a new host is reachable
1380
    @type node2ip: dict
1381
    @param node2ip: Node to ip mapping
1382
    @param port: The port to use for the TCP ping
1383
    @param feedback_fn: The function used for feedback
1384
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1385
    @param _sleep_fn: Function to sleep (for unittest use only)
1386

1387
    """
1388
    self.down = set(node_list)
1389
    self.up = set()
1390
    self.node2ip = node2ip
1391
    self.success = True
1392
    self.action_cb = action_cb
1393
    self.port = port
1394
    self.feedback_fn = feedback_fn
1395
    self._ping_fn = _ping_fn
1396
    self._sleep_fn = _sleep_fn
1397

    
1398
  def __call__(self):
1399
    """When called we run action_cb.
1400

1401
    @raises utils.RetryAgain: When there are still down nodes
1402

1403
    """
1404
    if not self.action_cb(self.up):
1405
      self.success = False
1406

    
1407
    if self.down:
1408
      raise utils.RetryAgain()
1409
    else:
1410
      return self.success
1411

    
1412
  def Wait(self, secs):
1413
    """Checks if a host is up or waits remaining seconds.
1414

1415
    @param secs: The secs remaining
1416

1417
    """
1418
    start = time.time()
1419
    for node in self.down:
1420
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1421
                       live_port_needed=True):
1422
        self.feedback_fn("Node %s became available" % node)
1423
        self.up.add(node)
1424
        self.down -= self.up
1425
        # If we have a node available there is the possibility to run the
1426
        # action callback successfully, therefore we don't wait and return
1427
        return
1428

    
1429
    self._sleep_fn(max(0.0, start + secs - time.time()))
1430

    
1431

    
1432
def _RunWhenNodesReachable(node_list, action_cb, interval):
1433
  """Run action_cb when nodes become reachable.
1434

1435
  @param node_list: The list of nodes to be reachable
1436
  @param action_cb: Callback called when a new host is reachable
1437
  @param interval: The earliest time to retry
1438

1439
  """
1440
  client = GetClient()
1441
  cluster_info = client.QueryClusterInfo()
1442
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1443
    family = netutils.IPAddress.family
1444
  else:
1445
    family = netutils.IP6Address.family
1446

    
1447
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1448
                 for node in node_list)
1449

    
1450
  port = netutils.GetDaemonPort(constants.NODED)
1451
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1452
                                        ToStdout)
1453

    
1454
  try:
1455
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1456
                       wait_fn=helper.Wait)
1457
  except utils.RetryTimeout:
1458
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1459
             " again:\n  - %s", "  - ".join(helper.down))
1460
    return False
1461

    
1462

    
1463
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1464
                          _instance_start_fn=_InstanceStart):
1465
  """Start the instances conditional based on node_states.
1466

1467
  @param opts: The command line options selected by the user
1468
  @param inst_map: A dict of inst -> nodes mapping
1469
  @param nodes_online: A list of nodes online
1470
  @param _instance_start_fn: Callback to start instances (unittest use only)
1471
  @return: Success of the operation on all instances
1472

1473
  """
1474
  start_inst_list = []
1475
  for (inst, nodes) in inst_map.items():
1476
    if not (nodes - nodes_online):
1477
      # All nodes the instance lives on are back online
1478
      start_inst_list.append(inst)
1479

    
1480
  for inst in start_inst_list:
1481
    del inst_map[inst]
1482

    
1483
  if start_inst_list:
1484
    return _instance_start_fn(opts, start_inst_list, True)
1485

    
1486
  return True
1487

    
1488

    
1489
def _EpoOn(opts, full_node_list, node_list, inst_map):
1490
  """Does the actual power on.
1491

1492
  @param opts: The command line options selected by the user
1493
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1494
                         OOB)
1495
  @param node_list: The list of nodes to operate on (all need to support OOB)
1496
  @param inst_map: A dict of inst -> nodes mapping
1497
  @return: The desired exit status
1498

1499
  """
1500
  if node_list and not _OobPower(opts, node_list, False):
1501
    ToStderr("Not all nodes seem to get back up, investigate and start"
1502
             " manually if needed")
1503

    
1504
  # Wait for the nodes to be back up
1505
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1506

    
1507
  ToStdout("Waiting until all nodes are available again")
1508
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1509
    ToStderr("Please investigate and start stopped instances manually")
1510
    return constants.EXIT_FAILURE
1511

    
1512
  return constants.EXIT_SUCCESS
1513

    
1514

    
1515
def _EpoOff(opts, node_list, inst_map):
1516
  """Does the actual power off.
1517

1518
  @param opts: The command line options selected by the user
1519
  @param node_list: The list of nodes to operate on (all need to support OOB)
1520
  @param inst_map: A dict of inst -> nodes mapping
1521
  @return: The desired exit status
1522

1523
  """
1524
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1525
    ToStderr("Please investigate and stop instances manually before continuing")
1526
    return constants.EXIT_FAILURE
1527

    
1528
  if not node_list:
1529
    return constants.EXIT_SUCCESS
1530

    
1531
  if _OobPower(opts, node_list, False):
1532
    return constants.EXIT_SUCCESS
1533
  else:
1534
    return constants.EXIT_FAILURE
1535

    
1536

    
1537
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1538
        _confirm_fn=ConfirmOperation,
1539
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1540
  """EPO operations.
1541

1542
  @param opts: the command line options selected by the user
1543
  @type args: list
1544
  @param args: should contain only one element, the subcommand
1545
  @rtype: int
1546
  @return: the desired exit code
1547

1548
  """
1549
  if opts.groups and opts.show_all:
1550
    _stderr_fn("Only one of --groups or --all are allowed")
1551
    return constants.EXIT_FAILURE
1552
  elif args and opts.show_all:
1553
    _stderr_fn("Arguments in combination with --all are not allowed")
1554
    return constants.EXIT_FAILURE
1555

    
1556
  if cl is None:
1557
    cl = GetClient()
1558

    
1559
  if opts.groups:
1560
    node_query_list = \
1561
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1562
  else:
1563
    node_query_list = args
1564

    
1565
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1566
                                           "sinst_list", "powered", "offline"],
1567
                         False)
1568

    
1569
  all_nodes = map(compat.fst, result)
1570
  node_list = []
1571
  inst_map = {}
1572
  for (node, master, pinsts, sinsts, powered, offline) in result:
1573
    if not offline:
1574
      for inst in (pinsts + sinsts):
1575
        if inst in inst_map:
1576
          if not master:
1577
            inst_map[inst].add(node)
1578
        elif master:
1579
          inst_map[inst] = set()
1580
        else:
1581
          inst_map[inst] = set([node])
1582

    
1583
    if master and opts.on:
1584
      # We ignore the master for turning on the machines, in fact we are
1585
      # already operating on the master at this point :)
1586
      continue
1587
    elif master and not opts.show_all:
1588
      _stderr_fn("%s is the master node, please do a master-failover to another"
1589
                 " node not affected by the EPO or use --all if you intend to"
1590
                 " shutdown the whole cluster", node)
1591
      return constants.EXIT_FAILURE
1592
    elif powered is None:
1593
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1594
                 " handled in a fully automated manner", node)
1595
    elif powered == opts.on:
1596
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1597
    elif not offline or (offline and powered):
1598
      node_list.append(node)
1599

    
1600
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1601
    return constants.EXIT_FAILURE
1602

    
1603
  if opts.on:
1604
    return _on_fn(opts, all_nodes, node_list, inst_map)
1605
  else:
1606
    return _off_fn(opts, node_list, inst_map)
1607

    
1608

    
1609
def _GetCreateCommand(info):
1610
  buf = StringIO()
1611
  buf.write("gnt-cluster init")
1612
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1613
  buf.write(" ")
1614
  buf.write(info["name"])
1615
  return buf.getvalue()
1616

    
1617

    
1618
def ShowCreateCommand(opts, args):
1619
  """Shows the command that can be used to re-create the cluster.
1620

1621
  Currently it works only for ipolicy specs.
1622

1623
  """
1624
  cl = GetClient(query=True)
1625
  result = cl.QueryClusterInfo()
1626
  ToStdout(_GetCreateCommand(result))
1627

    
1628

    
1629
def _RunCommandAndReport(cmd):
1630
  """Run a command and report its output, iff it failed.
1631

1632
  @param cmd: the command to execute
1633
  @type cmd: list
1634
  @rtype: bool
1635
  @return: False, if the execution failed.
1636

1637
  """
1638
  result = utils.RunCmd(cmd)
1639
  if result.failed:
1640
    ToStderr("Command %s failed: %s; Output %s" %
1641
             (cmd, result.fail_reason, result.output))
1642
    return False
1643
  return True
1644

    
1645

    
1646
def _VerifyCommand(cmd):
1647
  """Verify that a given command succeeds on all online nodes.
1648

1649
  As this function is intended to run during upgrades, it
1650
  is implemented in such a way that it still works, if all Ganeti
1651
  daemons are down.
1652

1653
  @param cmd: the command to execute
1654
  @type cmd: list
1655
  @rtype: list
1656
  @return: the list of node names that are online where
1657
      the command failed.
1658

1659
  """
1660
  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1661

    
1662
  nodes = ssconf.SimpleStore().GetOnlineNodeList()
1663
  master_node = ssconf.SimpleStore().GetMasterNode()
1664
  cluster_name = ssconf.SimpleStore().GetClusterName()
1665

    
1666
  # If master node is in 'nodes', make sure master node is at list end
1667
  if master_node in nodes:
1668
    nodes.remove(master_node)
1669
    nodes.append(master_node)
1670

    
1671
  failed = []
1672

    
1673
  srun = ssh.SshRunner(cluster_name=cluster_name)
1674
  for name in nodes:
1675
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1676
    if result.exit_code != 0:
1677
      failed.append(name)
1678

    
1679
  return failed
1680

    
1681

    
1682
def _VerifyVersionInstalled(versionstring):
1683
  """Verify that the given version of ganeti is installed on all online nodes.
1684

1685
  Do nothing, if this is the case, otherwise print an appropriate
1686
  message to stderr.
1687

1688
  @param versionstring: the version to check for
1689
  @type versionstring: string
1690
  @rtype: bool
1691
  @return: True, if the version is installed on all online nodes
1692

1693
  """
1694
  badnodes = _VerifyCommand(["test", "-d",
1695
                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
1696
  if badnodes:
1697
    ToStderr("Ganeti version %s not installed on nodes %s"
1698
             % (versionstring, ", ".join(badnodes)))
1699
    return False
1700

    
1701
  return True
1702

    
1703

    
1704
def _GetRunning():
1705
  """Determine the list of running jobs.
1706

1707
  @rtype: list
1708
  @return: the number of jobs still running
1709

1710
  """
1711
  cl = GetClient()
1712
  qfilter = qlang.MakeSimpleFilter("status",
1713
                                   frozenset([constants.JOB_STATUS_RUNNING]))
1714
  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1715

    
1716

    
1717
def _SetGanetiVersion(versionstring):
1718
  """Set the active version of ganeti to the given versionstring
1719

1720
  @type versionstring: string
1721
  @rtype: list
1722
  @return: the list of nodes where the version change failed
1723

1724
  """
1725
  failed = []
1726
  if constants.HAS_GNU_LN:
1727
    failed.extend(_VerifyCommand(
1728
        ["ln", "-s", "-f", "-T",
1729
         os.path.join(pathutils.PKGLIBDIR, versionstring),
1730
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1731
    failed.extend(_VerifyCommand(
1732
        ["ln", "-s", "-f", "-T",
1733
         os.path.join(pathutils.SHAREDIR, versionstring),
1734
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1735
  else:
1736
    failed.extend(_VerifyCommand(
1737
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1738
    failed.extend(_VerifyCommand(
1739
        ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1740
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1741
    failed.extend(_VerifyCommand(
1742
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1743
    failed.extend(_VerifyCommand(
1744
        ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1745
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1746
  return list(set(failed))
1747

    
1748

    
1749
def _ExecuteCommands(fns):
1750
  """Execute a list of functions, in reverse order.
1751

1752
  @type fns: list of functions.
1753
  @param fns: the functions to be executed.
1754

1755
  """
1756
  for fn in reversed(fns):
1757
    fn()
1758

    
1759

    
1760
def _GetConfigVersion():
1761
  """Determine the version the configuration file currently has.
1762

1763
  @rtype: tuple or None
1764
  @return: (major, minor, revision) if the version can be determined,
1765
      None otherwise
1766

1767
  """
1768
  config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1769
  try:
1770
    config_version = config_data["version"]
1771
  except KeyError:
1772
    return None
1773
  return utils.SplitVersion(config_version)
1774

    
1775

    
1776
def _ReadIntentToUpgrade():
1777
  """Read the file documenting the intent to upgrade the cluster.
1778

1779
  @rtype: string or None
1780
  @return: the version to upgrade to, if the file exists, and None
1781
      otherwise.
1782

1783
  """
1784
  if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1785
    return None
1786

    
1787
  contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1788
  contents = utils.UnescapeAndSplit(contentstring)
1789
  if len(contents) != 2:
1790
    # file syntactically mal-formed
1791
    return None
1792
  return contents[0]
1793

    
1794

    
1795
def _WriteIntentToUpgrade(version):
1796
  """Write file documenting the intent to upgrade the cluster.
1797

1798
  @type version: string
1799
  @param version: the version we intent to upgrade to
1800

1801
  """
1802
  utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1803
                  data=utils.EscapeAndJoin([version, "%d" % os.getpid()]))
1804

    
1805

    
1806
def _UpgradeBeforeConfigurationChange(versionstring):
1807
  """
1808
  Carry out all the tasks necessary for an upgrade that happen before
1809
  the configuration file, or Ganeti version, changes.
1810

1811
  @type versionstring: string
1812
  @param versionstring: the version to upgrade to
1813
  @rtype: (bool, list)
1814
  @return: tuple of a bool indicating success and a list of rollback tasks
1815

1816
  """
1817
  rollback = []
1818

    
1819
  if not _VerifyVersionInstalled(versionstring):
1820
    return (False, rollback)
1821

    
1822
  _WriteIntentToUpgrade(versionstring)
1823
  rollback.append(
1824
    lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1825

    
1826
  ToStdout("Draining queue")
1827
  client = GetClient()
1828
  client.SetQueueDrainFlag(True)
1829

    
1830
  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1831

    
1832
  if utils.SimpleRetry(0, _GetRunning,
1833
                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
1834
                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1835
    ToStderr("Failed to completely empty the queue.")
1836
    return (False, rollback)
1837

    
1838
  ToStdout("Stopping daemons on master node.")
1839
  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1840
    return (False, rollback)
1841

    
1842
  if not _VerifyVersionInstalled(versionstring):
1843
    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1844
    return (False, rollback)
1845

    
1846
  ToStdout("Stopping daemons everywhere.")
1847
  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1848
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1849
  if badnodes:
1850
    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
1851
    return (False, rollback)
1852

    
1853
  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
1854
                           "lib/ganeti%d.tar" % time.time())
1855
  ToStdout("Backing up configuration as %s" % backuptar)
1856
  if not _RunCommandAndReport(["tar", "cf", backuptar,
1857
                               pathutils.DATA_DIR]):
1858
    return (False, rollback)
1859

    
1860
  return (True, rollback)
1861

    
1862

    
1863
def _SwitchVersionAndConfig(versionstring, downgrade):
1864
  """
1865
  Switch to the new Ganeti version and change the configuration,
1866
  in correct order.
1867

1868
  @type versionstring: string
1869
  @param versionstring: the version to change to
1870
  @type downgrade: bool
1871
  @param downgrade: True, if the configuration should be downgraded
1872
  @rtype: (bool, list)
1873
  @return: tupe of a bool indicating success, and a list of
1874
      additional rollback tasks
1875

1876
  """
1877
  rollback = []
1878
  if downgrade:
1879
    ToStdout("Downgrading configuration")
1880
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
1881
      return (False, rollback)
1882

    
1883
  # Configuration change is the point of no return. From then onwards, it is
1884
  # safer to push through the up/dowgrade than to try to roll it back.
1885

    
1886
  ToStdout("Switching to version %s on all nodes" % versionstring)
1887
  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
1888
  badnodes = _SetGanetiVersion(versionstring)
1889
  if badnodes:
1890
    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
1891
             % (versionstring, ", ".join(badnodes)))
1892
    if not downgrade:
1893
      return (False, rollback)
1894

    
1895
  # Now that we have changed to the new version of Ganeti we should
1896
  # not communicate over luxi any more, as luxi might have changed in
1897
  # incompatible ways. Therefore, manually call the corresponding ganeti
1898
  # commands using their canonical (version independent) path.
1899

    
1900
  if not downgrade:
1901
    ToStdout("Upgrading configuration")
1902
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
1903
      return (False, rollback)
1904

    
1905
  return (True, rollback)
1906

    
1907

    
1908
def _UpgradeAfterConfigurationChange():
1909
  """
1910
  Carry out the upgrade actions necessary after switching to the new
1911
  Ganeti version and updating the configuration.
1912

1913
  As this part is run at a time where the new version of Ganeti is already
1914
  running, no communication should happen via luxi, as this is not a stable
1915
  interface. Also, as the configuration change is the point of no return,
1916
  all actions are pushed trough, even if some of them fail.
1917

1918
  @rtype: int
1919
  @return: the intended return value
1920

1921
  """
1922
  returnvalue = 0
1923

    
1924
  ToStdout("Starting daemons everywhere.")
1925
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1926
  if badnodes:
1927
    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
1928
    returnvalue = 1
1929

    
1930
  ToStdout("Ensuring directories everywhere.")
1931
  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
1932
  if badnodes:
1933
    ToStderr("Warning: failed to ensure directories on %s." %
1934
             (", ".join(badnodes)))
1935
    returnvalue = 1
1936

    
1937
  ToStdout("Redistributing the configuration.")
1938
  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
1939
    returnvalue = 1
1940

    
1941
  ToStdout("Restarting daemons everywhere.")
1942
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1943
  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1944
  if badnodes:
1945
    ToStderr("Warning: failed to start daemons on %s." %
1946
             (", ".join(list(set(badnodes))),))
1947
    returnvalue = 1
1948

    
1949
  ToStdout("Undraining the queue.")
1950
  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
1951
    returnvalue = 1
1952

    
1953
  _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
1954

    
1955
  ToStdout("Verifying cluster.")
1956
  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
1957
    returnvalue = 1
1958

    
1959
  return returnvalue
1960

    
1961

    
1962
def UpgradeGanetiCommand(opts, args):
1963
  """Upgrade a cluster to a new ganeti version.
1964

1965
  @param opts: the command line options selected by the user
1966
  @type args: list
1967
  @param args: should be an empty list
1968
  @rtype: int
1969
  @return: the desired exit code
1970

1971
  """
1972
  if ((not opts.resume and opts.to is None)
1973
      or (opts.resume and opts.to is not None)):
1974
    ToStderr("Precisely one of the options --to and --resume"
1975
             " has to be given")
1976
    return 1
1977

    
1978
  if opts.resume:
1979
    ssconf.CheckMaster(False)
1980
    versionstring = _ReadIntentToUpgrade()
1981
    if versionstring is None:
1982
      return 0
1983
    version = utils.version.ParseVersion(versionstring)
1984
    if version is None:
1985
      return 1
1986
    configversion = _GetConfigVersion()
1987
    if configversion is None:
1988
      return 1
1989
    # If the upgrade we resume was an upgrade between compatible
1990
    # versions (like 2.10.0 to 2.10.1), the correct configversion
1991
    # does not guarantee that the config has been updated.
1992
    # However, in the case of a compatible update with the configuration
1993
    # not touched, we are running a different dirversion with the same
1994
    # config version.
1995
    config_already_modified = \
1996
      (utils.IsCorrectConfigVersion(version, configversion) and
1997
       not (versionstring != constants.DIR_VERSION and
1998
            configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
1999
                              constants.CONFIG_REVISION)))
2000
    if not config_already_modified:
2001
      # We have to start from the beginning; however, some daemons might have
2002
      # already been stopped, so the only way to get into a well-defined state
2003
      # is by starting all daemons again.
2004
      _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2005
  else:
2006
    versionstring = opts.to
2007
    config_already_modified = False
2008
    version = utils.version.ParseVersion(versionstring)
2009
    if version is None:
2010
      ToStderr("Could not parse version string %s" % versionstring)
2011
      return 1
2012

    
2013
  msg = utils.version.UpgradeRange(version)
2014
  if msg is not None:
2015
    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2016
    return 1
2017

    
2018
  if not config_already_modified:
2019
    success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2020
    if not success:
2021
      _ExecuteCommands(rollback)
2022
      return 1
2023
  else:
2024
    rollback = []
2025

    
2026
  downgrade = utils.version.ShouldCfgdowngrade(version)
2027

    
2028
  success, additionalrollback =  \
2029
      _SwitchVersionAndConfig(versionstring, downgrade)
2030
  if not success:
2031
    rollback.extend(additionalrollback)
2032
    _ExecuteCommands(rollback)
2033
    return 1
2034

    
2035
  return _UpgradeAfterConfigurationChange()
2036

    
2037

    
2038
commands = {
2039
  "init": (
2040
    InitCluster, [ArgHost(min=1, max=1)],
2041
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2042
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2043
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
2044
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
2045
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT,
2046
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
2047
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
2048
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2049
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2050
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2051
  "destroy": (
2052
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2053
    "", "Destroy cluster"),
2054
  "rename": (
2055
    RenameCluster, [ArgHost(min=1, max=1)],
2056
    [FORCE_OPT, DRY_RUN_OPT],
2057
    "<new_name>",
2058
    "Renames the cluster"),
2059
  "redist-conf": (
2060
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2061
    [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2062
    "", "Forces a push of the configuration file and ssconf files"
2063
    " to the nodes in the cluster"),
2064
  "verify": (
2065
    VerifyCluster, ARGS_NONE,
2066
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2067
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2068
    "", "Does a check on the cluster configuration"),
2069
  "verify-disks": (
2070
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2071
    "", "Does a check on the cluster disk status"),
2072
  "repair-disk-sizes": (
2073
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2074
    "[instance...]", "Updates mismatches in recorded disk sizes"),
2075
  "master-failover": (
2076
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2077
    "", "Makes the current node the master"),
2078
  "master-ping": (
2079
    MasterPing, ARGS_NONE, [],
2080
    "", "Checks if the master is alive"),
2081
  "version": (
2082
    ShowClusterVersion, ARGS_NONE, [],
2083
    "", "Shows the cluster version"),
2084
  "getmaster": (
2085
    ShowClusterMaster, ARGS_NONE, [],
2086
    "", "Shows the cluster master"),
2087
  "copyfile": (
2088
    ClusterCopyFile, [ArgFile(min=1, max=1)],
2089
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2090
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2091
  "command": (
2092
    RunClusterCommand, [ArgCommand(min=1)],
2093
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2094
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2095
  "info": (
2096
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2097
    "[--roman]", "Show cluster configuration"),
2098
  "list-tags": (
2099
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2100
  "add-tags": (
2101
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2102
    "tag...", "Add tags to the cluster"),
2103
  "remove-tags": (
2104
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2105
    "tag...", "Remove tags from the cluster"),
2106
  "search-tags": (
2107
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2108
    "Searches the tags on all objects on"
2109
    " the cluster for a given pattern (regex)"),
2110
  "queue": (
2111
    QueueOps,
2112
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2113
    [], "drain|undrain|info", "Change queue properties"),
2114
  "watcher": (
2115
    WatcherOps,
2116
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2117
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2118
    [],
2119
    "{pause <timespec>|continue|info}", "Change watcher properties"),
2120
  "modify": (
2121
    SetClusterParams, ARGS_NONE,
2122
    [FORCE_OPT,
2123
     BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
2124
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
2125
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
2126
     DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT,
2127
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
2128
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
2129
     DISK_STATE_OPT] + SUBMIT_OPTS +
2130
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
2131
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
2132
    "[opts...]",
2133
    "Alters the parameters of the cluster"),
2134
  "renew-crypto": (
2135
    RenewCrypto, ARGS_NONE,
2136
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2137
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2138
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2139
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
2140
    "[opts...]",
2141
    "Renews cluster certificates, keys and secrets"),
2142
  "epo": (
2143
    Epo, [ArgUnknown()],
2144
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2145
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2146
    "[opts...] [args]",
2147
    "Performs an emergency power-off on given args"),
2148
  "activate-master-ip": (
2149
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2150
  "deactivate-master-ip": (
2151
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2152
    "Deactivates the master IP"),
2153
  "show-ispecs-cmd": (
2154
    ShowCreateCommand, ARGS_NONE, [], "",
2155
    "Show the command line to re-create the cluster"),
2156
  "upgrade": (
2157
    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2158
    "Upgrade (or downgrade) to a new Ganeti version"),
2159
  }
2160

    
2161

    
2162
#: dictionary with aliases for commands
2163
aliases = {
2164
  "masterfailover": "master-failover",
2165
  "show": "info",
2166
}
2167

    
2168

    
2169
def Main():
2170
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2171
                     aliases=aliases)