Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 0359e5d0

History | View | Annotate | Download (68.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import pathutils
48
from ganeti import serializer
49
from ganeti import qlang
50

    
51

    
52
ON_OPT = cli_option("--on", default=False,
53
                    action="store_true", dest="on",
54
                    help="Recover from an EPO")
55

    
56
GROUPS_OPT = cli_option("--groups", default=False,
57
                        action="store_true", dest="groups",
58
                        help="Arguments are node groups instead of nodes")
59

    
60
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
61
                            help="Override interactive check for --no-voting",
62
                            default=False, action="store_true")
63

    
64
FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
65
                                help="Unconditionally distribute the"
66
                                " configuration, even if the queue"
67
                                " is drained",
68
                                default=False, action="store_true")
69

    
70
TO_OPT = cli_option("--to", default=None, type="string",
71
                    help="The Ganeti version to upgrade to")
72

    
73
RESUME_OPT = cli_option("--resume", default=False, action="store_true",
74
                        help="Resume any pending Ganeti upgrades")
75

    
76
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
77
_EPO_PING_TIMEOUT = 1 # 1 second
78
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
79

    
80

    
81
def _InitEnabledDiskTemplates(opts):
82
  """Initialize the list of enabled disk templates.
83

84
  """
85
  if opts.enabled_disk_templates:
86
    return opts.enabled_disk_templates.split(",")
87
  else:
88
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
89

    
90

    
91
def _InitVgName(opts, enabled_disk_templates):
92
  """Initialize the volume group name.
93

94
  @type enabled_disk_templates: list of strings
95
  @param enabled_disk_templates: cluster-wide enabled disk templates
96

97
  """
98
  vg_name = None
99
  if opts.vg_name is not None:
100
    vg_name = opts.vg_name
101
    if vg_name:
102
      if not utils.IsLvmEnabled(enabled_disk_templates):
103
        ToStdout("You specified a volume group with --vg-name, but you did not"
104
                 " enable any disk template that uses lvm.")
105
    elif utils.IsLvmEnabled(enabled_disk_templates):
106
      raise errors.OpPrereqError(
107
          "LVM disk templates are enabled, but vg name not set.")
108
  elif utils.IsLvmEnabled(enabled_disk_templates):
109
    vg_name = constants.DEFAULT_VG
110
  return vg_name
111

    
112

    
113
def _InitDrbdHelper(opts, enabled_disk_templates):
114
  """Initialize the DRBD usermode helper.
115

116
  """
117
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
118

    
119
  if not drbd_enabled and opts.drbd_helper is not None:
120
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
121
             " is not enabled.")
122

    
123
  if drbd_enabled:
124
    if opts.drbd_helper is None:
125
      return constants.DEFAULT_DRBD_HELPER
126
    if opts.drbd_helper == '':
127
      raise errors.OpPrereqError(
128
          "Unsetting the drbd usermode helper while enabling DRBD is not"
129
          " allowed.")
130

    
131
  return opts.drbd_helper
132

    
133

    
134
@UsesRPC
135
def InitCluster(opts, args):
136
  """Initialize the cluster.
137

138
  @param opts: the command line options selected by the user
139
  @type args: list
140
  @param args: should contain only one element, the desired
141
      cluster name
142
  @rtype: int
143
  @return: the desired exit code
144

145
  """
146
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
147

    
148
  try:
149
    vg_name = _InitVgName(opts, enabled_disk_templates)
150
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
151
  except errors.OpPrereqError, e:
152
    ToStderr(str(e))
153
    return 1
154

    
155
  master_netdev = opts.master_netdev
156
  if master_netdev is None:
157
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
158
    if not nic_mode:
159
      # default case, use bridging
160
      master_netdev = constants.DEFAULT_BRIDGE
161
    elif nic_mode == constants.NIC_MODE_OVS:
162
      # default ovs is different from default bridge
163
      master_netdev = constants.DEFAULT_OVS
164
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
165

    
166
  hvlist = opts.enabled_hypervisors
167
  if hvlist is None:
168
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
169
  hvlist = hvlist.split(",")
170

    
171
  hvparams = dict(opts.hvparams)
172
  beparams = opts.beparams
173
  nicparams = opts.nicparams
174

    
175
  diskparams = dict(opts.diskparams)
176

    
177
  # check the disk template types here, as we cannot rely on the type check done
178
  # by the opcode parameter types
179
  diskparams_keys = set(diskparams.keys())
180
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
181
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
182
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
183
    return 1
184

    
185
  # prepare beparams dict
186
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
187
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
188

    
189
  # prepare nicparams dict
190
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
191
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
192

    
193
  # prepare ndparams dict
194
  if opts.ndparams is None:
195
    ndparams = dict(constants.NDC_DEFAULTS)
196
  else:
197
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
198
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
199

    
200
  # prepare hvparams dict
201
  for hv in constants.HYPER_TYPES:
202
    if hv not in hvparams:
203
      hvparams[hv] = {}
204
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
205
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
206

    
207
  # prepare diskparams dict
208
  for templ in constants.DISK_TEMPLATES:
209
    if templ not in diskparams:
210
      diskparams[templ] = {}
211
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
212
                                         diskparams[templ])
213
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
214

    
215
  # prepare ipolicy dict
216
  ipolicy = CreateIPolicyFromOpts(
217
    ispecs_mem_size=opts.ispecs_mem_size,
218
    ispecs_cpu_count=opts.ispecs_cpu_count,
219
    ispecs_disk_count=opts.ispecs_disk_count,
220
    ispecs_disk_size=opts.ispecs_disk_size,
221
    ispecs_nic_count=opts.ispecs_nic_count,
222
    minmax_ispecs=opts.ipolicy_bounds_specs,
223
    std_ispecs=opts.ipolicy_std_specs,
224
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
225
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
226
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
227
    fill_all=True)
228

    
229
  if opts.candidate_pool_size is None:
230
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
231

    
232
  if opts.mac_prefix is None:
233
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
234

    
235
  uid_pool = opts.uid_pool
236
  if uid_pool is not None:
237
    uid_pool = uidpool.ParseUidPool(uid_pool)
238

    
239
  if opts.prealloc_wipe_disks is None:
240
    opts.prealloc_wipe_disks = False
241

    
242
  external_ip_setup_script = opts.use_external_mip_script
243
  if external_ip_setup_script is None:
244
    external_ip_setup_script = False
245

    
246
  try:
247
    primary_ip_version = int(opts.primary_ip_version)
248
  except (ValueError, TypeError), err:
249
    ToStderr("Invalid primary ip version value: %s" % str(err))
250
    return 1
251

    
252
  master_netmask = opts.master_netmask
253
  try:
254
    if master_netmask is not None:
255
      master_netmask = int(master_netmask)
256
  except (ValueError, TypeError), err:
257
    ToStderr("Invalid master netmask value: %s" % str(err))
258
    return 1
259

    
260
  if opts.disk_state:
261
    disk_state = utils.FlatToDict(opts.disk_state)
262
  else:
263
    disk_state = {}
264

    
265
  hv_state = dict(opts.hv_state)
266

    
267
  default_ialloc_params = opts.default_iallocator_params
268
  bootstrap.InitCluster(cluster_name=args[0],
269
                        secondary_ip=opts.secondary_ip,
270
                        vg_name=vg_name,
271
                        mac_prefix=opts.mac_prefix,
272
                        master_netmask=master_netmask,
273
                        master_netdev=master_netdev,
274
                        file_storage_dir=opts.file_storage_dir,
275
                        shared_file_storage_dir=opts.shared_file_storage_dir,
276
                        enabled_hypervisors=hvlist,
277
                        hvparams=hvparams,
278
                        beparams=beparams,
279
                        nicparams=nicparams,
280
                        ndparams=ndparams,
281
                        diskparams=diskparams,
282
                        ipolicy=ipolicy,
283
                        candidate_pool_size=opts.candidate_pool_size,
284
                        modify_etc_hosts=opts.modify_etc_hosts,
285
                        modify_ssh_setup=opts.modify_ssh_setup,
286
                        maintain_node_health=opts.maintain_node_health,
287
                        drbd_helper=drbd_helper,
288
                        uid_pool=uid_pool,
289
                        default_iallocator=opts.default_iallocator,
290
                        default_iallocator_params=default_ialloc_params,
291
                        primary_ip_version=primary_ip_version,
292
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
293
                        use_external_mip_script=external_ip_setup_script,
294
                        hv_state=hv_state,
295
                        disk_state=disk_state,
296
                        enabled_disk_templates=enabled_disk_templates,
297
                        )
298
  op = opcodes.OpClusterPostInit()
299
  SubmitOpCode(op, opts=opts)
300
  return 0
301

    
302

    
303
@UsesRPC
304
def DestroyCluster(opts, args):
305
  """Destroy the cluster.
306

307
  @param opts: the command line options selected by the user
308
  @type args: list
309
  @param args: should be an empty list
310
  @rtype: int
311
  @return: the desired exit code
312

313
  """
314
  if not opts.yes_do_it:
315
    ToStderr("Destroying a cluster is irreversible. If you really want"
316
             " destroy this cluster, supply the --yes-do-it option.")
317
    return 1
318

    
319
  op = opcodes.OpClusterDestroy()
320
  master_uuid = SubmitOpCode(op, opts=opts)
321
  # if we reached this, the opcode didn't fail; we can proceed to
322
  # shutdown all the daemons
323
  bootstrap.FinalizeClusterDestroy(master_uuid)
324
  return 0
325

    
326

    
327
def RenameCluster(opts, args):
328
  """Rename the cluster.
329

330
  @param opts: the command line options selected by the user
331
  @type args: list
332
  @param args: should contain only one element, the new cluster name
333
  @rtype: int
334
  @return: the desired exit code
335

336
  """
337
  cl = GetClient()
338

    
339
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
340

    
341
  new_name = args[0]
342
  if not opts.force:
343
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
344
                " connected over the network to the cluster name, the"
345
                " operation is very dangerous as the IP address will be"
346
                " removed from the node and the change may not go through."
347
                " Continue?") % (cluster_name, new_name)
348
    if not AskUser(usertext):
349
      return 1
350

    
351
  op = opcodes.OpClusterRename(name=new_name)
352
  result = SubmitOpCode(op, opts=opts, cl=cl)
353

    
354
  if result:
355
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
356

    
357
  return 0
358

    
359

    
360
def ActivateMasterIp(opts, args):
361
  """Activates the master IP.
362

363
  """
364
  op = opcodes.OpClusterActivateMasterIp()
365
  SubmitOpCode(op)
366
  return 0
367

    
368

    
369
def DeactivateMasterIp(opts, args):
370
  """Deactivates the master IP.
371

372
  """
373
  if not opts.confirm:
374
    usertext = ("This will disable the master IP. All the open connections to"
375
                " the master IP will be closed. To reach the master you will"
376
                " need to use its node IP."
377
                " Continue?")
378
    if not AskUser(usertext):
379
      return 1
380

    
381
  op = opcodes.OpClusterDeactivateMasterIp()
382
  SubmitOpCode(op)
383
  return 0
384

    
385

    
386
def RedistributeConfig(opts, args):
387
  """Forces push of the cluster configuration.
388

389
  @param opts: the command line options selected by the user
390
  @type args: list
391
  @param args: empty list
392
  @rtype: int
393
  @return: the desired exit code
394

395
  """
396
  op = opcodes.OpClusterRedistConf()
397
  if opts.yes_do_it:
398
    SubmitOpCodeToDrainedQueue(op)
399
  else:
400
    SubmitOrSend(op, opts)
401
  return 0
402

    
403

    
404
def ShowClusterVersion(opts, args):
405
  """Write version of ganeti software to the standard output.
406

407
  @param opts: the command line options selected by the user
408
  @type args: list
409
  @param args: should be an empty list
410
  @rtype: int
411
  @return: the desired exit code
412

413
  """
414
  cl = GetClient(query=True)
415
  result = cl.QueryClusterInfo()
416
  ToStdout("Software version: %s", result["software_version"])
417
  ToStdout("Internode protocol: %s", result["protocol_version"])
418
  ToStdout("Configuration format: %s", result["config_version"])
419
  ToStdout("OS api version: %s", result["os_api_version"])
420
  ToStdout("Export interface: %s", result["export_version"])
421
  ToStdout("VCS version: %s", result["vcs_version"])
422
  return 0
423

    
424

    
425
def ShowClusterMaster(opts, args):
426
  """Write name of master node to the standard output.
427

428
  @param opts: the command line options selected by the user
429
  @type args: list
430
  @param args: should be an empty list
431
  @rtype: int
432
  @return: the desired exit code
433

434
  """
435
  master = bootstrap.GetMaster()
436
  ToStdout(master)
437
  return 0
438

    
439

    
440
def _FormatGroupedParams(paramsdict, roman=False):
441
  """Format Grouped parameters (be, nic, disk) by group.
442

443
  @type paramsdict: dict of dicts
444
  @param paramsdict: {group: {param: value, ...}, ...}
445
  @rtype: dict of dicts
446
  @return: copy of the input dictionaries with strings as values
447

448
  """
449
  ret = {}
450
  for (item, val) in paramsdict.items():
451
    if isinstance(val, dict):
452
      ret[item] = _FormatGroupedParams(val, roman=roman)
453
    elif roman and isinstance(val, int):
454
      ret[item] = compat.TryToRoman(val)
455
    else:
456
      ret[item] = str(val)
457
  return ret
458

    
459

    
460
def ShowClusterConfig(opts, args):
461
  """Shows cluster information.
462

463
  @param opts: the command line options selected by the user
464
  @type args: list
465
  @param args: should be an empty list
466
  @rtype: int
467
  @return: the desired exit code
468

469
  """
470
  cl = GetClient(query=True)
471
  result = cl.QueryClusterInfo()
472

    
473
  if result["tags"]:
474
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
475
  else:
476
    tags = "(none)"
477
  if result["reserved_lvs"]:
478
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
479
  else:
480
    reserved_lvs = "(none)"
481

    
482
  enabled_hv = result["enabled_hypervisors"]
483
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
484
                  if k in enabled_hv)
485

    
486
  info = [
487
    ("Cluster name", result["name"]),
488
    ("Cluster UUID", result["uuid"]),
489

    
490
    ("Creation time", utils.FormatTime(result["ctime"])),
491
    ("Modification time", utils.FormatTime(result["mtime"])),
492

    
493
    ("Master node", result["master"]),
494

    
495
    ("Architecture (this node)",
496
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
497

    
498
    ("Tags", tags),
499

    
500
    ("Default hypervisor", result["default_hypervisor"]),
501
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
502

    
503
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
504

    
505
    ("OS-specific hypervisor parameters",
506
     _FormatGroupedParams(result["os_hvp"])),
507

    
508
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
509

    
510
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
511
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
512

    
513
    ("Cluster parameters", [
514
      ("candidate pool size",
515
       compat.TryToRoman(result["candidate_pool_size"],
516
                         convert=opts.roman_integers)),
517
      ("master netdev", result["master_netdev"]),
518
      ("master netmask", result["master_netmask"]),
519
      ("use external master IP address setup script",
520
       result["use_external_mip_script"]),
521
      ("lvm volume group", result["volume_group_name"]),
522
      ("lvm reserved volumes", reserved_lvs),
523
      ("drbd usermode helper", result["drbd_usermode_helper"]),
524
      ("file storage path", result["file_storage_dir"]),
525
      ("shared file storage path", result["shared_file_storage_dir"]),
526
      ("maintenance of node health", result["maintain_node_health"]),
527
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
528
      ("default instance allocator", result["default_iallocator"]),
529
      ("default instance allocator parameters",
530
       result["default_iallocator_params"]),
531
      ("primary ip version", result["primary_ip_version"]),
532
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
533
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
534
      ("ExtStorage Providers search path",
535
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
536
      ("enabled disk templates",
537
       utils.CommaJoin(result["enabled_disk_templates"])),
538
      ]),
539

    
540
    ("Default node parameters",
541
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
542

    
543
    ("Default instance parameters",
544
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
545

    
546
    ("Default nic parameters",
547
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
548

    
549
    ("Default disk parameters",
550
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
551

    
552
    ("Instance policy - limits for instances",
553
     FormatPolicyInfo(result["ipolicy"], None, True)),
554
    ]
555

    
556
  PrintGenericInfo(info)
557
  return 0
558

    
559

    
560
def ClusterCopyFile(opts, args):
561
  """Copy a file from master to some nodes.
562

563
  @param opts: the command line options selected by the user
564
  @type args: list
565
  @param args: should contain only one element, the path of
566
      the file to be copied
567
  @rtype: int
568
  @return: the desired exit code
569

570
  """
571
  filename = args[0]
572
  if not os.path.exists(filename):
573
    raise errors.OpPrereqError("No such filename '%s'" % filename,
574
                               errors.ECODE_INVAL)
575

    
576
  cl = GetClient()
577
  qcl = GetClient(query=True)
578
  try:
579
    cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
580

    
581
    results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
582
                             secondary_ips=opts.use_replication_network,
583
                             nodegroup=opts.nodegroup)
584
    ports = GetNodesSshPorts(opts.nodes, qcl)
585
  finally:
586
    cl.Close()
587
    qcl.Close()
588

    
589
  srun = ssh.SshRunner(cluster_name)
590
  for (node, port) in zip(results, ports):
591
    if not srun.CopyFileToNode(node, port, filename):
592
      ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
593

    
594
  return 0
595

    
596

    
597
def RunClusterCommand(opts, args):
598
  """Run a command on some nodes.
599

600
  @param opts: the command line options selected by the user
601
  @type args: list
602
  @param args: should contain the command to be run and its arguments
603
  @rtype: int
604
  @return: the desired exit code
605

606
  """
607
  cl = GetClient()
608
  qcl = GetClient(query=True)
609

    
610
  command = " ".join(args)
611

    
612
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
613
  ports = GetNodesSshPorts(nodes, qcl)
614

    
615
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
616
                                                    "master_node"])
617

    
618
  srun = ssh.SshRunner(cluster_name=cluster_name)
619

    
620
  # Make sure master node is at list end
621
  if master_node in nodes:
622
    nodes.remove(master_node)
623
    nodes.append(master_node)
624

    
625
  for (name, port) in zip(nodes, ports):
626
    result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
627

    
628
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
629
      # Do not output anything for successful commands
630
      continue
631

    
632
    ToStdout("------------------------------------------------")
633
    if opts.show_machine_names:
634
      for line in result.output.splitlines():
635
        ToStdout("%s: %s", name, line)
636
    else:
637
      ToStdout("node: %s", name)
638
      ToStdout("%s", result.output)
639
    ToStdout("return code = %s", result.exit_code)
640

    
641
  return 0
642

    
643

    
644
def VerifyCluster(opts, args):
645
  """Verify integrity of cluster, performing various test on nodes.
646

647
  @param opts: the command line options selected by the user
648
  @type args: list
649
  @param args: should be an empty list
650
  @rtype: int
651
  @return: the desired exit code
652

653
  """
654
  skip_checks = []
655

    
656
  if opts.skip_nplusone_mem:
657
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
658

    
659
  cl = GetClient()
660

    
661
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
662
                               error_codes=opts.error_codes,
663
                               debug_simulate_errors=opts.simulate_errors,
664
                               skip_checks=skip_checks,
665
                               ignore_errors=opts.ignore_errors,
666
                               group_name=opts.nodegroup)
667
  result = SubmitOpCode(op, cl=cl, opts=opts)
668

    
669
  # Keep track of submitted jobs
670
  jex = JobExecutor(cl=cl, opts=opts)
671

    
672
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
673
    jex.AddJobId(None, status, job_id)
674

    
675
  results = jex.GetResults()
676

    
677
  (bad_jobs, bad_results) = \
678
    map(len,
679
        # Convert iterators to lists
680
        map(list,
681
            # Count errors
682
            map(compat.partial(itertools.ifilterfalse, bool),
683
                # Convert result to booleans in a tuple
684
                zip(*((job_success, len(op_results) == 1 and op_results[0])
685
                      for (job_success, op_results) in results)))))
686

    
687
  if bad_jobs == 0 and bad_results == 0:
688
    rcode = constants.EXIT_SUCCESS
689
  else:
690
    rcode = constants.EXIT_FAILURE
691
    if bad_jobs > 0:
692
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
693

    
694
  return rcode
695

    
696

    
697
def VerifyDisks(opts, args):
698
  """Verify integrity of cluster disks.
699

700
  @param opts: the command line options selected by the user
701
  @type args: list
702
  @param args: should be an empty list
703
  @rtype: int
704
  @return: the desired exit code
705

706
  """
707
  cl = GetClient()
708

    
709
  op = opcodes.OpClusterVerifyDisks()
710

    
711
  result = SubmitOpCode(op, cl=cl, opts=opts)
712

    
713
  # Keep track of submitted jobs
714
  jex = JobExecutor(cl=cl, opts=opts)
715

    
716
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
717
    jex.AddJobId(None, status, job_id)
718

    
719
  retcode = constants.EXIT_SUCCESS
720

    
721
  for (status, result) in jex.GetResults():
722
    if not status:
723
      ToStdout("Job failed: %s", result)
724
      continue
725

    
726
    ((bad_nodes, instances, missing), ) = result
727

    
728
    for node, text in bad_nodes.items():
729
      ToStdout("Error gathering data on node %s: %s",
730
               node, utils.SafeEncode(text[-400:]))
731
      retcode = constants.EXIT_FAILURE
732
      ToStdout("You need to fix these nodes first before fixing instances")
733

    
734
    for iname in instances:
735
      if iname in missing:
736
        continue
737
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
738
      try:
739
        ToStdout("Activating disks for instance '%s'", iname)
740
        SubmitOpCode(op, opts=opts, cl=cl)
741
      except errors.GenericError, err:
742
        nret, msg = FormatError(err)
743
        retcode |= nret
744
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
745

    
746
    if missing:
747
      for iname, ival in missing.iteritems():
748
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
749
        if all_missing:
750
          ToStdout("Instance %s cannot be verified as it lives on"
751
                   " broken nodes", iname)
752
        else:
753
          ToStdout("Instance %s has missing logical volumes:", iname)
754
          ival.sort()
755
          for node, vol in ival:
756
            if node in bad_nodes:
757
              ToStdout("\tbroken node %s /dev/%s", node, vol)
758
            else:
759
              ToStdout("\t%s /dev/%s", node, vol)
760

    
761
      ToStdout("You need to replace or recreate disks for all the above"
762
               " instances if this message persists after fixing broken nodes.")
763
      retcode = constants.EXIT_FAILURE
764
    elif not instances:
765
      ToStdout("No disks need to be activated.")
766

    
767
  return retcode
768

    
769

    
770
def RepairDiskSizes(opts, args):
771
  """Verify sizes of cluster disks.
772

773
  @param opts: the command line options selected by the user
774
  @type args: list
775
  @param args: optional list of instances to restrict check to
776
  @rtype: int
777
  @return: the desired exit code
778

779
  """
780
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
781
  SubmitOpCode(op, opts=opts)
782

    
783

    
784
@UsesRPC
785
def MasterFailover(opts, args):
786
  """Failover the master node.
787

788
  This command, when run on a non-master node, will cause the current
789
  master to cease being master, and the non-master to become new
790
  master.
791

792
  @param opts: the command line options selected by the user
793
  @type args: list
794
  @param args: should be an empty list
795
  @rtype: int
796
  @return: the desired exit code
797

798
  """
799
  if opts.no_voting and not opts.yes_do_it:
800
    usertext = ("This will perform the failover even if most other nodes"
801
                " are down, or if this node is outdated. This is dangerous"
802
                " as it can lead to a non-consistent cluster. Check the"
803
                " gnt-cluster(8) man page before proceeding. Continue?")
804
    if not AskUser(usertext):
805
      return 1
806

    
807
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
808

    
809

    
810
def MasterPing(opts, args):
811
  """Checks if the master is alive.
812

813
  @param opts: the command line options selected by the user
814
  @type args: list
815
  @param args: should be an empty list
816
  @rtype: int
817
  @return: the desired exit code
818

819
  """
820
  try:
821
    cl = GetClient()
822
    cl.QueryClusterInfo()
823
    return 0
824
  except Exception: # pylint: disable=W0703
825
    return 1
826

    
827

    
828
def SearchTags(opts, args):
829
  """Searches the tags on all the cluster.
830

831
  @param opts: the command line options selected by the user
832
  @type args: list
833
  @param args: should contain only one element, the tag pattern
834
  @rtype: int
835
  @return: the desired exit code
836

837
  """
838
  op = opcodes.OpTagsSearch(pattern=args[0])
839
  result = SubmitOpCode(op, opts=opts)
840
  if not result:
841
    return 1
842
  result = list(result)
843
  result.sort()
844
  for path, tag in result:
845
    ToStdout("%s %s", path, tag)
846

    
847

    
848
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
849
  """Reads and verifies an X509 certificate.
850

851
  @type cert_filename: string
852
  @param cert_filename: the path of the file containing the certificate to
853
                        verify encoded in PEM format
854
  @type verify_private_key: bool
855
  @param verify_private_key: whether to verify the private key in addition to
856
                             the public certificate
857
  @rtype: string
858
  @return: a string containing the PEM-encoded certificate.
859

860
  """
861
  try:
862
    pem = utils.ReadFile(cert_filename)
863
  except IOError, err:
864
    raise errors.X509CertError(cert_filename,
865
                               "Unable to read certificate: %s" % str(err))
866

    
867
  try:
868
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
869
  except Exception, err:
870
    raise errors.X509CertError(cert_filename,
871
                               "Unable to load certificate: %s" % str(err))
872

    
873
  if verify_private_key:
874
    try:
875
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
876
    except Exception, err:
877
      raise errors.X509CertError(cert_filename,
878
                                 "Unable to load private key: %s" % str(err))
879

    
880
  return pem
881

    
882

    
883
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
884
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
885
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
886
                 cds_filename, force):
887
  """Renews cluster certificates, keys and secrets.
888

889
  @type new_cluster_cert: bool
890
  @param new_cluster_cert: Whether to generate a new cluster certificate
891
  @type new_rapi_cert: bool
892
  @param new_rapi_cert: Whether to generate a new RAPI certificate
893
  @type rapi_cert_filename: string
894
  @param rapi_cert_filename: Path to file containing new RAPI certificate
895
  @type new_spice_cert: bool
896
  @param new_spice_cert: Whether to generate a new SPICE certificate
897
  @type spice_cert_filename: string
898
  @param spice_cert_filename: Path to file containing new SPICE certificate
899
  @type spice_cacert_filename: string
900
  @param spice_cacert_filename: Path to file containing the certificate of the
901
                                CA that signed the SPICE certificate
902
  @type new_confd_hmac_key: bool
903
  @param new_confd_hmac_key: Whether to generate a new HMAC key
904
  @type new_cds: bool
905
  @param new_cds: Whether to generate a new cluster domain secret
906
  @type cds_filename: string
907
  @param cds_filename: Path to file containing new cluster domain secret
908
  @type force: bool
909
  @param force: Whether to ask user for confirmation
910

911
  """
912
  if new_rapi_cert and rapi_cert_filename:
913
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
914
             " options can be specified at the same time.")
915
    return 1
916

    
917
  if new_cds and cds_filename:
918
    ToStderr("Only one of the --new-cluster-domain-secret and"
919
             " --cluster-domain-secret options can be specified at"
920
             " the same time.")
921
    return 1
922

    
923
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
924
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
925
             " and --spice-ca-certificate must not be used.")
926
    return 1
927

    
928
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
929
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
930
             " specified.")
931
    return 1
932

    
933
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
934
  try:
935
    if rapi_cert_filename:
936
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
937
    if spice_cert_filename:
938
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
939
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
940
  except errors.X509CertError, err:
941
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
942
    return 1
943

    
944
  if cds_filename:
945
    try:
946
      cds = utils.ReadFile(cds_filename)
947
    except Exception, err: # pylint: disable=W0703
948
      ToStderr("Can't load new cluster domain secret from %s: %s" %
949
               (cds_filename, str(err)))
950
      return 1
951
  else:
952
    cds = None
953

    
954
  if not force:
955
    usertext = ("This requires all daemons on all nodes to be restarted and"
956
                " may take some time. Continue?")
957
    if not AskUser(usertext):
958
      return 1
959

    
960
  def _RenewCryptoInner(ctx):
961
    ctx.feedback_fn("Updating certificates and keys")
962
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
963
                                    new_rapi_cert,
964
                                    new_spice_cert,
965
                                    new_confd_hmac_key,
966
                                    new_cds,
967
                                    rapi_cert_pem=rapi_cert_pem,
968
                                    spice_cert_pem=spice_cert_pem,
969
                                    spice_cacert_pem=spice_cacert_pem,
970
                                    cds=cds)
971

    
972
    files_to_copy = []
973

    
974
    if new_cluster_cert:
975
      files_to_copy.append(pathutils.NODED_CERT_FILE)
976

    
977
    if new_rapi_cert or rapi_cert_pem:
978
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
979

    
980
    if new_spice_cert or spice_cert_pem:
981
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
982
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
983

    
984
    if new_confd_hmac_key:
985
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
986

    
987
    if new_cds or cds:
988
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
989

    
990
    if files_to_copy:
991
      for node_name in ctx.nonmaster_nodes:
992
        port = ctx.ssh_ports[node_name]
993
        ctx.feedback_fn("Copying %s to %s:%d" %
994
                        (", ".join(files_to_copy), node_name, port))
995
        for file_name in files_to_copy:
996
          ctx.ssh.CopyFileToNode(node_name, port, file_name)
997

    
998
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
999

    
1000
  ToStdout("All requested certificates and keys have been replaced."
1001
           " Running \"gnt-cluster verify\" now is recommended.")
1002

    
1003
  return 0
1004

    
1005

    
1006
def RenewCrypto(opts, args):
1007
  """Renews cluster certificates, keys and secrets.
1008

1009
  """
1010
  return _RenewCrypto(opts.new_cluster_cert,
1011
                      opts.new_rapi_cert,
1012
                      opts.rapi_cert,
1013
                      opts.new_spice_cert,
1014
                      opts.spice_cert,
1015
                      opts.spice_cacert,
1016
                      opts.new_confd_hmac_key,
1017
                      opts.new_cluster_domain_secret,
1018
                      opts.cluster_domain_secret,
1019
                      opts.force)
1020

    
1021

    
1022
def _GetEnabledDiskTemplates(opts):
1023
  """Determine the list of enabled disk templates.
1024

1025
  """
1026
  if opts.enabled_disk_templates:
1027
    return opts.enabled_disk_templates.split(",")
1028
  else:
1029
    return None
1030

    
1031

    
1032
def _GetVgName(opts, enabled_disk_templates):
1033
  """Determine the volume group name.
1034

1035
  @type enabled_disk_templates: list of strings
1036
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1037

1038
  """
1039
  # consistency between vg name and enabled disk templates
1040
  vg_name = None
1041
  if opts.vg_name is not None:
1042
    vg_name = opts.vg_name
1043
  if enabled_disk_templates:
1044
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1045
      ToStdout("You specified a volume group with --vg-name, but you did not"
1046
               " enable any of the following lvm-based disk templates: %s" %
1047
               utils.CommaJoin(constants.DTS_LVM))
1048
  return vg_name
1049

    
1050

    
1051
def _GetDrbdHelper(opts, enabled_disk_templates):
1052
  """Determine the DRBD usermode helper.
1053

1054
  """
1055
  drbd_helper = opts.drbd_helper
1056
  if enabled_disk_templates:
1057
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1058
    if not drbd_enabled and opts.drbd_helper:
1059
      ToStdout("You specified a DRBD usermode helper with "
1060
               " --drbd-usermode-helper while DRBD is not enabled.")
1061
  return drbd_helper
1062

    
1063

    
1064
def SetClusterParams(opts, args):
1065
  """Modify the cluster.
1066

1067
  @param opts: the command line options selected by the user
1068
  @type args: list
1069
  @param args: should be an empty list
1070
  @rtype: int
1071
  @return: the desired exit code
1072

1073
  """
1074
  if not (opts.vg_name is not None or
1075
          opts.drbd_helper is not None or
1076
          opts.enabled_hypervisors or opts.hvparams or
1077
          opts.beparams or opts.nicparams or
1078
          opts.ndparams or opts.diskparams or
1079
          opts.candidate_pool_size is not None or
1080
          opts.uid_pool is not None or
1081
          opts.maintain_node_health is not None or
1082
          opts.add_uids is not None or
1083
          opts.remove_uids is not None or
1084
          opts.default_iallocator is not None or
1085
          opts.default_iallocator_params or
1086
          opts.reserved_lvs is not None or
1087
          opts.master_netdev is not None or
1088
          opts.master_netmask is not None or
1089
          opts.use_external_mip_script is not None or
1090
          opts.prealloc_wipe_disks is not None or
1091
          opts.hv_state or
1092
          opts.enabled_disk_templates or
1093
          opts.disk_state or
1094
          opts.ipolicy_bounds_specs is not None or
1095
          opts.ipolicy_std_specs is not None or
1096
          opts.ipolicy_disk_templates is not None or
1097
          opts.ipolicy_vcpu_ratio is not None or
1098
          opts.ipolicy_spindle_ratio is not None or
1099
          opts.modify_etc_hosts is not None or
1100
          opts.file_storage_dir is not None):
1101
    ToStderr("Please give at least one of the parameters.")
1102
    return 1
1103

    
1104
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1105
  vg_name = _GetVgName(opts, enabled_disk_templates)
1106

    
1107
  try:
1108
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1109
  except errors.OpPrereqError, e:
1110
    ToStderr(str(e))
1111
    return 1
1112

    
1113
  hvlist = opts.enabled_hypervisors
1114
  if hvlist is not None:
1115
    hvlist = hvlist.split(",")
1116

    
1117
  # a list of (name, dict) we can pass directly to dict() (or [])
1118
  hvparams = dict(opts.hvparams)
1119
  for hv_params in hvparams.values():
1120
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1121

    
1122
  diskparams = dict(opts.diskparams)
1123

    
1124
  for dt_params in diskparams.values():
1125
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1126

    
1127
  beparams = opts.beparams
1128
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1129

    
1130
  nicparams = opts.nicparams
1131
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1132

    
1133
  ndparams = opts.ndparams
1134
  if ndparams is not None:
1135
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1136

    
1137
  ipolicy = CreateIPolicyFromOpts(
1138
    minmax_ispecs=opts.ipolicy_bounds_specs,
1139
    std_ispecs=opts.ipolicy_std_specs,
1140
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1141
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1142
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1143
    )
1144

    
1145
  mnh = opts.maintain_node_health
1146

    
1147
  uid_pool = opts.uid_pool
1148
  if uid_pool is not None:
1149
    uid_pool = uidpool.ParseUidPool(uid_pool)
1150

    
1151
  add_uids = opts.add_uids
1152
  if add_uids is not None:
1153
    add_uids = uidpool.ParseUidPool(add_uids)
1154

    
1155
  remove_uids = opts.remove_uids
1156
  if remove_uids is not None:
1157
    remove_uids = uidpool.ParseUidPool(remove_uids)
1158

    
1159
  if opts.reserved_lvs is not None:
1160
    if opts.reserved_lvs == "":
1161
      opts.reserved_lvs = []
1162
    else:
1163
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1164

    
1165
  if opts.master_netmask is not None:
1166
    try:
1167
      opts.master_netmask = int(opts.master_netmask)
1168
    except ValueError:
1169
      ToStderr("The --master-netmask option expects an int parameter.")
1170
      return 1
1171

    
1172
  ext_ip_script = opts.use_external_mip_script
1173

    
1174
  if opts.disk_state:
1175
    disk_state = utils.FlatToDict(opts.disk_state)
1176
  else:
1177
    disk_state = {}
1178

    
1179
  hv_state = dict(opts.hv_state)
1180

    
1181
  op = opcodes.OpClusterSetParams(
1182
    vg_name=vg_name,
1183
    drbd_helper=drbd_helper,
1184
    enabled_hypervisors=hvlist,
1185
    hvparams=hvparams,
1186
    os_hvp=None,
1187
    beparams=beparams,
1188
    nicparams=nicparams,
1189
    ndparams=ndparams,
1190
    diskparams=diskparams,
1191
    ipolicy=ipolicy,
1192
    candidate_pool_size=opts.candidate_pool_size,
1193
    maintain_node_health=mnh,
1194
    modify_etc_hosts=opts.modify_etc_hosts,
1195
    uid_pool=uid_pool,
1196
    add_uids=add_uids,
1197
    remove_uids=remove_uids,
1198
    default_iallocator=opts.default_iallocator,
1199
    default_iallocator_params=opts.default_iallocator_params,
1200
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1201
    master_netdev=opts.master_netdev,
1202
    master_netmask=opts.master_netmask,
1203
    reserved_lvs=opts.reserved_lvs,
1204
    use_external_mip_script=ext_ip_script,
1205
    hv_state=hv_state,
1206
    disk_state=disk_state,
1207
    enabled_disk_templates=enabled_disk_templates,
1208
    force=opts.force,
1209
    file_storage_dir=opts.file_storage_dir,
1210
    )
1211
  SubmitOrSend(op, opts)
1212
  return 0
1213

    
1214

    
1215
def QueueOps(opts, args):
1216
  """Queue operations.
1217

1218
  @param opts: the command line options selected by the user
1219
  @type args: list
1220
  @param args: should contain only one element, the subcommand
1221
  @rtype: int
1222
  @return: the desired exit code
1223

1224
  """
1225
  command = args[0]
1226
  client = GetClient()
1227
  if command in ("drain", "undrain"):
1228
    drain_flag = command == "drain"
1229
    client.SetQueueDrainFlag(drain_flag)
1230
  elif command == "info":
1231
    result = client.QueryConfigValues(["drain_flag"])
1232
    if result[0]:
1233
      val = "set"
1234
    else:
1235
      val = "unset"
1236
    ToStdout("The drain flag is %s" % val)
1237
  else:
1238
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1239
                               errors.ECODE_INVAL)
1240

    
1241
  return 0
1242

    
1243

    
1244
def _ShowWatcherPause(until):
1245
  if until is None or until < time.time():
1246
    ToStdout("The watcher is not paused.")
1247
  else:
1248
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1249

    
1250

    
1251
def WatcherOps(opts, args):
1252
  """Watcher operations.
1253

1254
  @param opts: the command line options selected by the user
1255
  @type args: list
1256
  @param args: should contain only one element, the subcommand
1257
  @rtype: int
1258
  @return: the desired exit code
1259

1260
  """
1261
  command = args[0]
1262
  client = GetClient()
1263

    
1264
  if command == "continue":
1265
    client.SetWatcherPause(None)
1266
    ToStdout("The watcher is no longer paused.")
1267

    
1268
  elif command == "pause":
1269
    if len(args) < 2:
1270
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1271

    
1272
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1273
    _ShowWatcherPause(result)
1274

    
1275
  elif command == "info":
1276
    result = client.QueryConfigValues(["watcher_pause"])
1277
    _ShowWatcherPause(result[0])
1278

    
1279
  else:
1280
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1281
                               errors.ECODE_INVAL)
1282

    
1283
  return 0
1284

    
1285

    
1286
def _OobPower(opts, node_list, power):
1287
  """Puts the node in the list to desired power state.
1288

1289
  @param opts: The command line options selected by the user
1290
  @param node_list: The list of nodes to operate on
1291
  @param power: True if they should be powered on, False otherwise
1292
  @return: The success of the operation (none failed)
1293

1294
  """
1295
  if power:
1296
    command = constants.OOB_POWER_ON
1297
  else:
1298
    command = constants.OOB_POWER_OFF
1299

    
1300
  op = opcodes.OpOobCommand(node_names=node_list,
1301
                            command=command,
1302
                            ignore_status=True,
1303
                            timeout=opts.oob_timeout,
1304
                            power_delay=opts.power_delay)
1305
  result = SubmitOpCode(op, opts=opts)
1306
  errs = 0
1307
  for node_result in result:
1308
    (node_tuple, data_tuple) = node_result
1309
    (_, node_name) = node_tuple
1310
    (data_status, _) = data_tuple
1311
    if data_status != constants.RS_NORMAL:
1312
      assert data_status != constants.RS_UNAVAIL
1313
      errs += 1
1314
      ToStderr("There was a problem changing power for %s, please investigate",
1315
               node_name)
1316

    
1317
  if errs > 0:
1318
    return False
1319

    
1320
  return True
1321

    
1322

    
1323
def _InstanceStart(opts, inst_list, start, no_remember=False):
1324
  """Puts the instances in the list to desired state.
1325

1326
  @param opts: The command line options selected by the user
1327
  @param inst_list: The list of instances to operate on
1328
  @param start: True if they should be started, False for shutdown
1329
  @param no_remember: If the instance state should be remembered
1330
  @return: The success of the operation (none failed)
1331

1332
  """
1333
  if start:
1334
    opcls = opcodes.OpInstanceStartup
1335
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1336
  else:
1337
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1338
                           timeout=opts.shutdown_timeout,
1339
                           no_remember=no_remember)
1340
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1341

    
1342
  jex = JobExecutor(opts=opts)
1343

    
1344
  for inst in inst_list:
1345
    ToStdout("Submit %s of instance %s", text_submit, inst)
1346
    op = opcls(instance_name=inst)
1347
    jex.QueueJob(inst, op)
1348

    
1349
  results = jex.GetResults()
1350
  bad_cnt = len([1 for (success, _) in results if not success])
1351

    
1352
  if bad_cnt == 0:
1353
    ToStdout("All instances have been %s successfully", text_success)
1354
  else:
1355
    ToStderr("There were errors while %s instances:\n"
1356
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1357
             len(results))
1358
    return False
1359

    
1360
  return True
1361

    
1362

    
1363
class _RunWhenNodesReachableHelper:
1364
  """Helper class to make shared internal state sharing easier.
1365

1366
  @ivar success: Indicates if all action_cb calls were successful
1367

1368
  """
1369
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1370
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1371
    """Init the object.
1372

1373
    @param node_list: The list of nodes to be reachable
1374
    @param action_cb: Callback called when a new host is reachable
1375
    @type node2ip: dict
1376
    @param node2ip: Node to ip mapping
1377
    @param port: The port to use for the TCP ping
1378
    @param feedback_fn: The function used for feedback
1379
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1380
    @param _sleep_fn: Function to sleep (for unittest use only)
1381

1382
    """
1383
    self.down = set(node_list)
1384
    self.up = set()
1385
    self.node2ip = node2ip
1386
    self.success = True
1387
    self.action_cb = action_cb
1388
    self.port = port
1389
    self.feedback_fn = feedback_fn
1390
    self._ping_fn = _ping_fn
1391
    self._sleep_fn = _sleep_fn
1392

    
1393
  def __call__(self):
1394
    """When called we run action_cb.
1395

1396
    @raises utils.RetryAgain: When there are still down nodes
1397

1398
    """
1399
    if not self.action_cb(self.up):
1400
      self.success = False
1401

    
1402
    if self.down:
1403
      raise utils.RetryAgain()
1404
    else:
1405
      return self.success
1406

    
1407
  def Wait(self, secs):
1408
    """Checks if a host is up or waits remaining seconds.
1409

1410
    @param secs: The secs remaining
1411

1412
    """
1413
    start = time.time()
1414
    for node in self.down:
1415
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1416
                       live_port_needed=True):
1417
        self.feedback_fn("Node %s became available" % node)
1418
        self.up.add(node)
1419
        self.down -= self.up
1420
        # If we have a node available there is the possibility to run the
1421
        # action callback successfully, therefore we don't wait and return
1422
        return
1423

    
1424
    self._sleep_fn(max(0.0, start + secs - time.time()))
1425

    
1426

    
1427
def _RunWhenNodesReachable(node_list, action_cb, interval):
1428
  """Run action_cb when nodes become reachable.
1429

1430
  @param node_list: The list of nodes to be reachable
1431
  @param action_cb: Callback called when a new host is reachable
1432
  @param interval: The earliest time to retry
1433

1434
  """
1435
  client = GetClient()
1436
  cluster_info = client.QueryClusterInfo()
1437
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1438
    family = netutils.IPAddress.family
1439
  else:
1440
    family = netutils.IP6Address.family
1441

    
1442
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1443
                 for node in node_list)
1444

    
1445
  port = netutils.GetDaemonPort(constants.NODED)
1446
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1447
                                        ToStdout)
1448

    
1449
  try:
1450
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1451
                       wait_fn=helper.Wait)
1452
  except utils.RetryTimeout:
1453
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1454
             " again:\n  - %s", "  - ".join(helper.down))
1455
    return False
1456

    
1457

    
1458
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1459
                          _instance_start_fn=_InstanceStart):
1460
  """Start the instances conditional based on node_states.
1461

1462
  @param opts: The command line options selected by the user
1463
  @param inst_map: A dict of inst -> nodes mapping
1464
  @param nodes_online: A list of nodes online
1465
  @param _instance_start_fn: Callback to start instances (unittest use only)
1466
  @return: Success of the operation on all instances
1467

1468
  """
1469
  start_inst_list = []
1470
  for (inst, nodes) in inst_map.items():
1471
    if not (nodes - nodes_online):
1472
      # All nodes the instance lives on are back online
1473
      start_inst_list.append(inst)
1474

    
1475
  for inst in start_inst_list:
1476
    del inst_map[inst]
1477

    
1478
  if start_inst_list:
1479
    return _instance_start_fn(opts, start_inst_list, True)
1480

    
1481
  return True
1482

    
1483

    
1484
def _EpoOn(opts, full_node_list, node_list, inst_map):
1485
  """Does the actual power on.
1486

1487
  @param opts: The command line options selected by the user
1488
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1489
                         OOB)
1490
  @param node_list: The list of nodes to operate on (all need to support OOB)
1491
  @param inst_map: A dict of inst -> nodes mapping
1492
  @return: The desired exit status
1493

1494
  """
1495
  if node_list and not _OobPower(opts, node_list, False):
1496
    ToStderr("Not all nodes seem to get back up, investigate and start"
1497
             " manually if needed")
1498

    
1499
  # Wait for the nodes to be back up
1500
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1501

    
1502
  ToStdout("Waiting until all nodes are available again")
1503
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1504
    ToStderr("Please investigate and start stopped instances manually")
1505
    return constants.EXIT_FAILURE
1506

    
1507
  return constants.EXIT_SUCCESS
1508

    
1509

    
1510
def _EpoOff(opts, node_list, inst_map):
1511
  """Does the actual power off.
1512

1513
  @param opts: The command line options selected by the user
1514
  @param node_list: The list of nodes to operate on (all need to support OOB)
1515
  @param inst_map: A dict of inst -> nodes mapping
1516
  @return: The desired exit status
1517

1518
  """
1519
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1520
    ToStderr("Please investigate and stop instances manually before continuing")
1521
    return constants.EXIT_FAILURE
1522

    
1523
  if not node_list:
1524
    return constants.EXIT_SUCCESS
1525

    
1526
  if _OobPower(opts, node_list, False):
1527
    return constants.EXIT_SUCCESS
1528
  else:
1529
    return constants.EXIT_FAILURE
1530

    
1531

    
1532
def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1533
        _confirm_fn=ConfirmOperation,
1534
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1535
  """EPO operations.
1536

1537
  @param opts: the command line options selected by the user
1538
  @type args: list
1539
  @param args: should contain only one element, the subcommand
1540
  @rtype: int
1541
  @return: the desired exit code
1542

1543
  """
1544
  if opts.groups and opts.show_all:
1545
    _stderr_fn("Only one of --groups or --all are allowed")
1546
    return constants.EXIT_FAILURE
1547
  elif args and opts.show_all:
1548
    _stderr_fn("Arguments in combination with --all are not allowed")
1549
    return constants.EXIT_FAILURE
1550

    
1551
  if qcl is None:
1552
    # Query client
1553
    qcl = GetClient(query=True)
1554

    
1555
  if opts.groups:
1556
    node_query_list = \
1557
      itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1558
  else:
1559
    node_query_list = args
1560

    
1561
  result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1562
                                            "sinst_list", "powered", "offline"],
1563
                          False)
1564

    
1565
  all_nodes = map(compat.fst, result)
1566
  node_list = []
1567
  inst_map = {}
1568
  for (node, master, pinsts, sinsts, powered, offline) in result:
1569
    if not offline:
1570
      for inst in (pinsts + sinsts):
1571
        if inst in inst_map:
1572
          if not master:
1573
            inst_map[inst].add(node)
1574
        elif master:
1575
          inst_map[inst] = set()
1576
        else:
1577
          inst_map[inst] = set([node])
1578

    
1579
    if master and opts.on:
1580
      # We ignore the master for turning on the machines, in fact we are
1581
      # already operating on the master at this point :)
1582
      continue
1583
    elif master and not opts.show_all:
1584
      _stderr_fn("%s is the master node, please do a master-failover to another"
1585
                 " node not affected by the EPO or use --all if you intend to"
1586
                 " shutdown the whole cluster", node)
1587
      return constants.EXIT_FAILURE
1588
    elif powered is None:
1589
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1590
                 " handled in a fully automated manner", node)
1591
    elif powered == opts.on:
1592
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1593
    elif not offline or (offline and powered):
1594
      node_list.append(node)
1595

    
1596
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1597
    return constants.EXIT_FAILURE
1598

    
1599
  if opts.on:
1600
    return _on_fn(opts, all_nodes, node_list, inst_map)
1601
  else:
1602
    return _off_fn(opts, node_list, inst_map)
1603

    
1604

    
1605
def _GetCreateCommand(info):
1606
  buf = StringIO()
1607
  buf.write("gnt-cluster init")
1608
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1609
  buf.write(" ")
1610
  buf.write(info["name"])
1611
  return buf.getvalue()
1612

    
1613

    
1614
def ShowCreateCommand(opts, args):
1615
  """Shows the command that can be used to re-create the cluster.
1616

1617
  Currently it works only for ipolicy specs.
1618

1619
  """
1620
  cl = GetClient(query=True)
1621
  result = cl.QueryClusterInfo()
1622
  ToStdout(_GetCreateCommand(result))
1623

    
1624

    
1625
def _RunCommandAndReport(cmd):
1626
  """Run a command and report its output, iff it failed.
1627

1628
  @param cmd: the command to execute
1629
  @type cmd: list
1630
  @rtype: bool
1631
  @return: False, if the execution failed.
1632

1633
  """
1634
  result = utils.RunCmd(cmd)
1635
  if result.failed:
1636
    ToStderr("Command %s failed: %s; Output %s" %
1637
             (cmd, result.fail_reason, result.output))
1638
    return False
1639
  return True
1640

    
1641

    
1642
def _VerifyCommand(cmd):
1643
  """Verify that a given command succeeds on all online nodes.
1644

1645
  As this function is intended to run during upgrades, it
1646
  is implemented in such a way that it still works, if all Ganeti
1647
  daemons are down.
1648

1649
  @param cmd: the command to execute
1650
  @type cmd: list
1651
  @rtype: list
1652
  @return: the list of node names that are online where
1653
      the command failed.
1654

1655
  """
1656
  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1657

    
1658
  nodes = ssconf.SimpleStore().GetOnlineNodeList()
1659
  master_node = ssconf.SimpleStore().GetMasterNode()
1660
  cluster_name = ssconf.SimpleStore().GetClusterName()
1661

    
1662
  # If master node is in 'nodes', make sure master node is at list end
1663
  if master_node in nodes:
1664
    nodes.remove(master_node)
1665
    nodes.append(master_node)
1666

    
1667
  failed = []
1668

    
1669
  srun = ssh.SshRunner(cluster_name=cluster_name)
1670
  for name in nodes:
1671
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1672
    if result.exit_code != 0:
1673
      failed.append(name)
1674

    
1675
  return failed
1676

    
1677

    
1678
def _VerifyVersionInstalled(versionstring):
1679
  """Verify that the given version of ganeti is installed on all online nodes.
1680

1681
  Do nothing, if this is the case, otherwise print an appropriate
1682
  message to stderr.
1683

1684
  @param versionstring: the version to check for
1685
  @type versionstring: string
1686
  @rtype: bool
1687
  @return: True, if the version is installed on all online nodes
1688

1689
  """
1690
  badnodes = _VerifyCommand(["test", "-d",
1691
                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
1692
  if badnodes:
1693
    ToStderr("Ganeti version %s not installed on nodes %s"
1694
             % (versionstring, ", ".join(badnodes)))
1695
    return False
1696

    
1697
  return True
1698

    
1699

    
1700
def _GetRunning():
1701
  """Determine the list of running jobs.
1702

1703
  @rtype: list
1704
  @return: the number of jobs still running
1705

1706
  """
1707
  cl = GetClient()
1708
  qfilter = qlang.MakeSimpleFilter("status",
1709
                                   frozenset([constants.JOB_STATUS_RUNNING]))
1710
  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1711

    
1712

    
1713
def _SetGanetiVersion(versionstring):
1714
  """Set the active version of ganeti to the given versionstring
1715

1716
  @type versionstring: string
1717
  @rtype: list
1718
  @return: the list of nodes where the version change failed
1719

1720
  """
1721
  failed = []
1722
  if constants.HAS_GNU_LN:
1723
    failed.extend(_VerifyCommand(
1724
        ["ln", "-s", "-f", "-T",
1725
         os.path.join(pathutils.PKGLIBDIR, versionstring),
1726
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1727
    failed.extend(_VerifyCommand(
1728
        ["ln", "-s", "-f", "-T",
1729
         os.path.join(pathutils.SHAREDIR, versionstring),
1730
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1731
  else:
1732
    failed.extend(_VerifyCommand(
1733
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1734
    failed.extend(_VerifyCommand(
1735
        ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1736
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1737
    failed.extend(_VerifyCommand(
1738
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1739
    failed.extend(_VerifyCommand(
1740
        ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1741
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1742
  return list(set(failed))
1743

    
1744

    
1745
def _ExecuteCommands(fns):
1746
  """Execute a list of functions, in reverse order.
1747

1748
  @type fns: list of functions.
1749
  @param fns: the functions to be executed.
1750

1751
  """
1752
  for fn in reversed(fns):
1753
    fn()
1754

    
1755

    
1756
def _GetConfigVersion():
1757
  """Determine the version the configuration file currently has.
1758

1759
  @rtype: tuple or None
1760
  @return: (major, minor, revision) if the version can be determined,
1761
      None otherwise
1762

1763
  """
1764
  config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1765
  try:
1766
    config_version = config_data["version"]
1767
  except KeyError:
1768
    return None
1769
  return utils.SplitVersion(config_version)
1770

    
1771

    
1772
def _ReadIntentToUpgrade():
1773
  """Read the file documenting the intent to upgrade the cluster.
1774

1775
  @rtype: string or None
1776
  @return: the version to upgrade to, if the file exists, and None
1777
      otherwise.
1778

1779
  """
1780
  if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1781
    return None
1782

    
1783
  contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1784
  contents = utils.UnescapeAndSplit(contentstring)
1785
  if len(contents) != 2:
1786
    # file syntactically mal-formed
1787
    return None
1788
  return contents[0]
1789

    
1790

    
1791
def _WriteIntentToUpgrade(version):
1792
  """Write file documenting the intent to upgrade the cluster.
1793

1794
  @type version: string
1795
  @param version: the version we intent to upgrade to
1796

1797
  """
1798
  utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1799
                  data=utils.EscapeAndJoin([version, "%d" % os.getpid()]))
1800

    
1801

    
1802
def _UpgradeBeforeConfigurationChange(versionstring):
1803
  """
1804
  Carry out all the tasks necessary for an upgrade that happen before
1805
  the configuration file, or Ganeti version, changes.
1806

1807
  @type versionstring: string
1808
  @param versionstring: the version to upgrade to
1809
  @rtype: (bool, list)
1810
  @return: tuple of a bool indicating success and a list of rollback tasks
1811

1812
  """
1813
  rollback = []
1814

    
1815
  if not _VerifyVersionInstalled(versionstring):
1816
    return (False, rollback)
1817

    
1818
  _WriteIntentToUpgrade(versionstring)
1819
  rollback.append(
1820
    lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1821

    
1822
  ToStdout("Draining queue")
1823
  client = GetClient()
1824
  client.SetQueueDrainFlag(True)
1825

    
1826
  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1827

    
1828
  if utils.SimpleRetry(0, _GetRunning,
1829
                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
1830
                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1831
    ToStderr("Failed to completely empty the queue.")
1832
    return (False, rollback)
1833

    
1834
  ToStdout("Stopping daemons on master node.")
1835
  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1836
    return (False, rollback)
1837

    
1838
  if not _VerifyVersionInstalled(versionstring):
1839
    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1840
    return (False, rollback)
1841

    
1842
  ToStdout("Stopping daemons everywhere.")
1843
  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1844
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1845
  if badnodes:
1846
    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
1847
    return (False, rollback)
1848

    
1849
  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
1850
                           "lib/ganeti%d.tar" % time.time())
1851
  ToStdout("Backing up configuration as %s" % backuptar)
1852
  if not _RunCommandAndReport(["tar", "cf", backuptar,
1853
                               pathutils.DATA_DIR]):
1854
    return (False, rollback)
1855

    
1856
  return (True, rollback)
1857

    
1858

    
1859
def _SwitchVersionAndConfig(versionstring, downgrade):
1860
  """
1861
  Switch to the new Ganeti version and change the configuration,
1862
  in correct order.
1863

1864
  @type versionstring: string
1865
  @param versionstring: the version to change to
1866
  @type downgrade: bool
1867
  @param downgrade: True, if the configuration should be downgraded
1868
  @rtype: (bool, list)
1869
  @return: tupe of a bool indicating success, and a list of
1870
      additional rollback tasks
1871

1872
  """
1873
  rollback = []
1874
  if downgrade:
1875
    ToStdout("Downgrading configuration")
1876
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
1877
      return (False, rollback)
1878

    
1879
  # Configuration change is the point of no return. From then onwards, it is
1880
  # safer to push through the up/dowgrade than to try to roll it back.
1881

    
1882
  ToStdout("Switching to version %s on all nodes" % versionstring)
1883
  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
1884
  badnodes = _SetGanetiVersion(versionstring)
1885
  if badnodes:
1886
    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
1887
             % (versionstring, ", ".join(badnodes)))
1888
    if not downgrade:
1889
      return (False, rollback)
1890

    
1891
  # Now that we have changed to the new version of Ganeti we should
1892
  # not communicate over luxi any more, as luxi might have changed in
1893
  # incompatible ways. Therefore, manually call the corresponding ganeti
1894
  # commands using their canonical (version independent) path.
1895

    
1896
  if not downgrade:
1897
    ToStdout("Upgrading configuration")
1898
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
1899
      return (False, rollback)
1900

    
1901
  return (True, rollback)
1902

    
1903

    
1904
def _UpgradeAfterConfigurationChange():
1905
  """
1906
  Carry out the upgrade actions necessary after switching to the new
1907
  Ganeti version and updating the configuration.
1908

1909
  As this part is run at a time where the new version of Ganeti is already
1910
  running, no communication should happen via luxi, as this is not a stable
1911
  interface. Also, as the configuration change is the point of no return,
1912
  all actions are pushed trough, even if some of them fail.
1913

1914
  @rtype: int
1915
  @return: the intended return value
1916

1917
  """
1918
  returnvalue = 0
1919

    
1920
  ToStdout("Starting daemons everywhere.")
1921
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1922
  if badnodes:
1923
    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
1924
    returnvalue = 1
1925

    
1926
  ToStdout("Ensuring directories everywhere.")
1927
  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
1928
  if badnodes:
1929
    ToStderr("Warning: failed to ensure directories on %s." %
1930
             (", ".join(badnodes)))
1931
    returnvalue = 1
1932

    
1933
  ToStdout("Redistributing the configuration.")
1934
  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
1935
    returnvalue = 1
1936

    
1937
  ToStdout("Restarting daemons everywhere.")
1938
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1939
  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1940
  if badnodes:
1941
    ToStderr("Warning: failed to start daemons on %s." %
1942
             (", ".join(list(set(badnodes))),))
1943
    returnvalue = 1
1944

    
1945
  ToStdout("Undraining the queue.")
1946
  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
1947
    returnvalue = 1
1948

    
1949
  _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
1950

    
1951
  ToStdout("Verifying cluster.")
1952
  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
1953
    returnvalue = 1
1954

    
1955
  return returnvalue
1956

    
1957

    
1958
def UpgradeGanetiCommand(opts, args):
1959
  """Upgrade a cluster to a new ganeti version.
1960

1961
  @param opts: the command line options selected by the user
1962
  @type args: list
1963
  @param args: should be an empty list
1964
  @rtype: int
1965
  @return: the desired exit code
1966

1967
  """
1968
  if ((not opts.resume and opts.to is None)
1969
      or (opts.resume and opts.to is not None)):
1970
    ToStderr("Precisely one of the options --to and --resume"
1971
             " has to be given")
1972
    return 1
1973

    
1974
  if opts.resume:
1975
    ssconf.CheckMaster(False)
1976
    versionstring = _ReadIntentToUpgrade()
1977
    if versionstring is None:
1978
      return 0
1979
    version = utils.version.ParseVersion(versionstring)
1980
    if version is None:
1981
      return 1
1982
    configversion = _GetConfigVersion()
1983
    if configversion is None:
1984
      return 1
1985
    # If the upgrade we resume was an upgrade between compatible
1986
    # versions (like 2.10.0 to 2.10.1), the correct configversion
1987
    # does not guarantee that the config has been updated.
1988
    # However, in the case of a compatible update with the configuration
1989
    # not touched, we are running a different dirversion with the same
1990
    # config version.
1991
    config_already_modified = \
1992
      (utils.IsCorrectConfigVersion(version, configversion) and
1993
       not (versionstring != constants.DIR_VERSION and
1994
            configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
1995
                              constants.CONFIG_REVISION)))
1996
    if not config_already_modified:
1997
      # We have to start from the beginning; however, some daemons might have
1998
      # already been stopped, so the only way to get into a well-defined state
1999
      # is by starting all daemons again.
2000
      _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2001
  else:
2002
    versionstring = opts.to
2003
    config_already_modified = False
2004
    version = utils.version.ParseVersion(versionstring)
2005
    if version is None:
2006
      ToStderr("Could not parse version string %s" % versionstring)
2007
      return 1
2008

    
2009
  msg = utils.version.UpgradeRange(version)
2010
  if msg is not None:
2011
    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2012
    return 1
2013

    
2014
  if not config_already_modified:
2015
    success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2016
    if not success:
2017
      _ExecuteCommands(rollback)
2018
      return 1
2019
  else:
2020
    rollback = []
2021

    
2022
  downgrade = utils.version.ShouldCfgdowngrade(version)
2023

    
2024
  success, additionalrollback =  \
2025
      _SwitchVersionAndConfig(versionstring, downgrade)
2026
  if not success:
2027
    rollback.extend(additionalrollback)
2028
    _ExecuteCommands(rollback)
2029
    return 1
2030

    
2031
  return _UpgradeAfterConfigurationChange()
2032

    
2033

    
2034
commands = {
2035
  "init": (
2036
    InitCluster, [ArgHost(min=1, max=1)],
2037
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2038
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2039
     NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2040
     SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2041
     DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2042
     PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2043
     GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2044
     HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2045
     IPOLICY_STD_SPECS_OPT]
2046
     + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2047
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2048
  "destroy": (
2049
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2050
    "", "Destroy cluster"),
2051
  "rename": (
2052
    RenameCluster, [ArgHost(min=1, max=1)],
2053
    [FORCE_OPT, DRY_RUN_OPT],
2054
    "<new_name>",
2055
    "Renames the cluster"),
2056
  "redist-conf": (
2057
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2058
    [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2059
    "", "Forces a push of the configuration file and ssconf files"
2060
    " to the nodes in the cluster"),
2061
  "verify": (
2062
    VerifyCluster, ARGS_NONE,
2063
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2064
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2065
    "", "Does a check on the cluster configuration"),
2066
  "verify-disks": (
2067
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2068
    "", "Does a check on the cluster disk status"),
2069
  "repair-disk-sizes": (
2070
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2071
    "[instance...]", "Updates mismatches in recorded disk sizes"),
2072
  "master-failover": (
2073
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2074
    "", "Makes the current node the master"),
2075
  "master-ping": (
2076
    MasterPing, ARGS_NONE, [],
2077
    "", "Checks if the master is alive"),
2078
  "version": (
2079
    ShowClusterVersion, ARGS_NONE, [],
2080
    "", "Shows the cluster version"),
2081
  "getmaster": (
2082
    ShowClusterMaster, ARGS_NONE, [],
2083
    "", "Shows the cluster master"),
2084
  "copyfile": (
2085
    ClusterCopyFile, [ArgFile(min=1, max=1)],
2086
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2087
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2088
  "command": (
2089
    RunClusterCommand, [ArgCommand(min=1)],
2090
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2091
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2092
  "info": (
2093
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2094
    "[--roman]", "Show cluster configuration"),
2095
  "list-tags": (
2096
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2097
  "add-tags": (
2098
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2099
    "tag...", "Add tags to the cluster"),
2100
  "remove-tags": (
2101
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2102
    "tag...", "Remove tags from the cluster"),
2103
  "search-tags": (
2104
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2105
    "Searches the tags on all objects on"
2106
    " the cluster for a given pattern (regex)"),
2107
  "queue": (
2108
    QueueOps,
2109
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2110
    [], "drain|undrain|info", "Change queue properties"),
2111
  "watcher": (
2112
    WatcherOps,
2113
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2114
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2115
    [],
2116
    "{pause <timespec>|continue|info}", "Change watcher properties"),
2117
  "modify": (
2118
    SetClusterParams, ARGS_NONE,
2119
    [FORCE_OPT,
2120
     BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
2121
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT,
2122
     UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT, DRBD_HELPER_OPT,
2123
     DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT,
2124
     DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2125
     USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] +
2126
     SUBMIT_OPTS +
2127
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
2128
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
2129
    "[opts...]",
2130
    "Alters the parameters of the cluster"),
2131
  "renew-crypto": (
2132
    RenewCrypto, ARGS_NONE,
2133
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2134
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2135
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2136
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
2137
    "[opts...]",
2138
    "Renews cluster certificates, keys and secrets"),
2139
  "epo": (
2140
    Epo, [ArgUnknown()],
2141
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2142
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2143
    "[opts...] [args]",
2144
    "Performs an emergency power-off on given args"),
2145
  "activate-master-ip": (
2146
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2147
  "deactivate-master-ip": (
2148
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2149
    "Deactivates the master IP"),
2150
  "show-ispecs-cmd": (
2151
    ShowCreateCommand, ARGS_NONE, [], "",
2152
    "Show the command line to re-create the cluster"),
2153
  "upgrade": (
2154
    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2155
    "Upgrade (or downgrade) to a new Ganeti version"),
2156
  }
2157

    
2158

    
2159
#: dictionary with aliases for commands
2160
aliases = {
2161
  "masterfailover": "master-failover",
2162
  "show": "info",
2163
}
2164

    
2165

    
2166
def Main():
2167
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2168
                     aliases=aliases)