Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 651ce6a3

History | View | Annotate | Download (68.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import pathutils
48
from ganeti import serializer
49
from ganeti import qlang
50

    
51

    
52
ON_OPT = cli_option("--on", default=False,
53
                    action="store_true", dest="on",
54
                    help="Recover from an EPO")
55

    
56
GROUPS_OPT = cli_option("--groups", default=False,
57
                        action="store_true", dest="groups",
58
                        help="Arguments are node groups instead of nodes")
59

    
60
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
61
                            help="Override interactive check for --no-voting",
62
                            default=False, action="store_true")
63

    
64
FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
65
                                help="Unconditionally distribute the"
66
                                " configuration, even if the queue"
67
                                " is drained",
68
                                default=False, action="store_true")
69

    
70
TO_OPT = cli_option("--to", default=None, type="string",
71
                    help="The Ganeti version to upgrade to")
72

    
73
RESUME_OPT = cli_option("--resume", default=False, action="store_true",
74
                        help="Resume any pending Ganeti upgrades")
75

    
76
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
77
_EPO_PING_TIMEOUT = 1 # 1 second
78
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
79

    
80

    
81
def _InitEnabledDiskTemplates(opts):
82
  """Initialize the list of enabled disk templates.
83

84
  """
85
  if opts.enabled_disk_templates:
86
    return opts.enabled_disk_templates.split(",")
87
  else:
88
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
89

    
90

    
91
def _InitVgName(opts, enabled_disk_templates):
92
  """Initialize the volume group name.
93

94
  @type enabled_disk_templates: list of strings
95
  @param enabled_disk_templates: cluster-wide enabled disk templates
96

97
  """
98
  vg_name = None
99
  if opts.vg_name is not None:
100
    vg_name = opts.vg_name
101
    if vg_name:
102
      if not utils.IsLvmEnabled(enabled_disk_templates):
103
        ToStdout("You specified a volume group with --vg-name, but you did not"
104
                 " enable any disk template that uses lvm.")
105
    elif utils.IsLvmEnabled(enabled_disk_templates):
106
      raise errors.OpPrereqError(
107
          "LVM disk templates are enabled, but vg name not set.")
108
  elif utils.IsLvmEnabled(enabled_disk_templates):
109
    vg_name = constants.DEFAULT_VG
110
  return vg_name
111

    
112

    
113
def _InitDrbdHelper(opts, enabled_disk_templates):
114
  """Initialize the DRBD usermode helper.
115

116
  """
117
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
118

    
119
  if not drbd_enabled and opts.drbd_helper is not None:
120
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
121
             " is not enabled.")
122

    
123
  if drbd_enabled:
124
    if opts.drbd_helper is None:
125
      return constants.DEFAULT_DRBD_HELPER
126
    if opts.drbd_helper == '':
127
      raise errors.OpPrereqError(
128
          "Unsetting the drbd usermode helper while enabling DRBD is not"
129
          " allowed.")
130

    
131
  return opts.drbd_helper
132

    
133

    
134
@UsesRPC
135
def InitCluster(opts, args):
136
  """Initialize the cluster.
137

138
  @param opts: the command line options selected by the user
139
  @type args: list
140
  @param args: should contain only one element, the desired
141
      cluster name
142
  @rtype: int
143
  @return: the desired exit code
144

145
  """
146
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
147

    
148
  try:
149
    vg_name = _InitVgName(opts, enabled_disk_templates)
150
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
151
  except errors.OpPrereqError, e:
152
    ToStderr(str(e))
153
    return 1
154

    
155
  master_netdev = opts.master_netdev
156
  if master_netdev is None:
157
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
158
    if not nic_mode:
159
      # default case, use bridging
160
      master_netdev = constants.DEFAULT_BRIDGE
161
    elif nic_mode == constants.NIC_MODE_OVS:
162
      # default ovs is different from default bridge
163
      master_netdev = constants.DEFAULT_OVS
164
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
165

    
166
  hvlist = opts.enabled_hypervisors
167
  if hvlist is None:
168
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
169
  hvlist = hvlist.split(",")
170

    
171
  hvparams = dict(opts.hvparams)
172
  beparams = opts.beparams
173
  nicparams = opts.nicparams
174

    
175
  diskparams = dict(opts.diskparams)
176

    
177
  # check the disk template types here, as we cannot rely on the type check done
178
  # by the opcode parameter types
179
  diskparams_keys = set(diskparams.keys())
180
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
181
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
182
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
183
    return 1
184

    
185
  # prepare beparams dict
186
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
187
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
188

    
189
  # prepare nicparams dict
190
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
191
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
192

    
193
  # prepare ndparams dict
194
  if opts.ndparams is None:
195
    ndparams = dict(constants.NDC_DEFAULTS)
196
  else:
197
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
198
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
199

    
200
  # prepare hvparams dict
201
  for hv in constants.HYPER_TYPES:
202
    if hv not in hvparams:
203
      hvparams[hv] = {}
204
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
205
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
206

    
207
  # prepare diskparams dict
208
  for templ in constants.DISK_TEMPLATES:
209
    if templ not in diskparams:
210
      diskparams[templ] = {}
211
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
212
                                         diskparams[templ])
213
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
214

    
215
  # prepare ipolicy dict
216
  ipolicy = CreateIPolicyFromOpts(
217
    ispecs_mem_size=opts.ispecs_mem_size,
218
    ispecs_cpu_count=opts.ispecs_cpu_count,
219
    ispecs_disk_count=opts.ispecs_disk_count,
220
    ispecs_disk_size=opts.ispecs_disk_size,
221
    ispecs_nic_count=opts.ispecs_nic_count,
222
    minmax_ispecs=opts.ipolicy_bounds_specs,
223
    std_ispecs=opts.ipolicy_std_specs,
224
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
225
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
226
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
227
    fill_all=True)
228

    
229
  if opts.candidate_pool_size is None:
230
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
231

    
232
  if opts.mac_prefix is None:
233
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
234

    
235
  uid_pool = opts.uid_pool
236
  if uid_pool is not None:
237
    uid_pool = uidpool.ParseUidPool(uid_pool)
238

    
239
  if opts.prealloc_wipe_disks is None:
240
    opts.prealloc_wipe_disks = False
241

    
242
  external_ip_setup_script = opts.use_external_mip_script
243
  if external_ip_setup_script is None:
244
    external_ip_setup_script = False
245

    
246
  try:
247
    primary_ip_version = int(opts.primary_ip_version)
248
  except (ValueError, TypeError), err:
249
    ToStderr("Invalid primary ip version value: %s" % str(err))
250
    return 1
251

    
252
  master_netmask = opts.master_netmask
253
  try:
254
    if master_netmask is not None:
255
      master_netmask = int(master_netmask)
256
  except (ValueError, TypeError), err:
257
    ToStderr("Invalid master netmask value: %s" % str(err))
258
    return 1
259

    
260
  if opts.disk_state:
261
    disk_state = utils.FlatToDict(opts.disk_state)
262
  else:
263
    disk_state = {}
264

    
265
  hv_state = dict(opts.hv_state)
266

    
267
  bootstrap.InitCluster(cluster_name=args[0],
268
                        secondary_ip=opts.secondary_ip,
269
                        vg_name=vg_name,
270
                        mac_prefix=opts.mac_prefix,
271
                        master_netmask=master_netmask,
272
                        master_netdev=master_netdev,
273
                        file_storage_dir=opts.file_storage_dir,
274
                        shared_file_storage_dir=opts.shared_file_storage_dir,
275
                        enabled_hypervisors=hvlist,
276
                        hvparams=hvparams,
277
                        beparams=beparams,
278
                        nicparams=nicparams,
279
                        ndparams=ndparams,
280
                        diskparams=diskparams,
281
                        ipolicy=ipolicy,
282
                        candidate_pool_size=opts.candidate_pool_size,
283
                        modify_etc_hosts=opts.modify_etc_hosts,
284
                        modify_ssh_setup=opts.modify_ssh_setup,
285
                        maintain_node_health=opts.maintain_node_health,
286
                        drbd_helper=drbd_helper,
287
                        uid_pool=uid_pool,
288
                        default_iallocator=opts.default_iallocator,
289
                        primary_ip_version=primary_ip_version,
290
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
291
                        use_external_mip_script=external_ip_setup_script,
292
                        hv_state=hv_state,
293
                        disk_state=disk_state,
294
                        enabled_disk_templates=enabled_disk_templates,
295
                        )
296
  op = opcodes.OpClusterPostInit()
297
  SubmitOpCode(op, opts=opts)
298
  return 0
299

    
300

    
301
@UsesRPC
302
def DestroyCluster(opts, args):
303
  """Destroy the cluster.
304

305
  @param opts: the command line options selected by the user
306
  @type args: list
307
  @param args: should be an empty list
308
  @rtype: int
309
  @return: the desired exit code
310

311
  """
312
  if not opts.yes_do_it:
313
    ToStderr("Destroying a cluster is irreversible. If you really want"
314
             " destroy this cluster, supply the --yes-do-it option.")
315
    return 1
316

    
317
  op = opcodes.OpClusterDestroy()
318
  master_uuid = SubmitOpCode(op, opts=opts)
319
  # if we reached this, the opcode didn't fail; we can proceed to
320
  # shutdown all the daemons
321
  bootstrap.FinalizeClusterDestroy(master_uuid)
322
  return 0
323

    
324

    
325
def RenameCluster(opts, args):
326
  """Rename the cluster.
327

328
  @param opts: the command line options selected by the user
329
  @type args: list
330
  @param args: should contain only one element, the new cluster name
331
  @rtype: int
332
  @return: the desired exit code
333

334
  """
335
  cl = GetClient()
336

    
337
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
338

    
339
  new_name = args[0]
340
  if not opts.force:
341
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
342
                " connected over the network to the cluster name, the"
343
                " operation is very dangerous as the IP address will be"
344
                " removed from the node and the change may not go through."
345
                " Continue?") % (cluster_name, new_name)
346
    if not AskUser(usertext):
347
      return 1
348

    
349
  op = opcodes.OpClusterRename(name=new_name)
350
  result = SubmitOpCode(op, opts=opts, cl=cl)
351

    
352
  if result:
353
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
354

    
355
  return 0
356

    
357

    
358
def ActivateMasterIp(opts, args):
359
  """Activates the master IP.
360

361
  """
362
  op = opcodes.OpClusterActivateMasterIp()
363
  SubmitOpCode(op)
364
  return 0
365

    
366

    
367
def DeactivateMasterIp(opts, args):
368
  """Deactivates the master IP.
369

370
  """
371
  if not opts.confirm:
372
    usertext = ("This will disable the master IP. All the open connections to"
373
                " the master IP will be closed. To reach the master you will"
374
                " need to use its node IP."
375
                " Continue?")
376
    if not AskUser(usertext):
377
      return 1
378

    
379
  op = opcodes.OpClusterDeactivateMasterIp()
380
  SubmitOpCode(op)
381
  return 0
382

    
383

    
384
def RedistributeConfig(opts, args):
385
  """Forces push of the cluster configuration.
386

387
  @param opts: the command line options selected by the user
388
  @type args: list
389
  @param args: empty list
390
  @rtype: int
391
  @return: the desired exit code
392

393
  """
394
  op = opcodes.OpClusterRedistConf()
395
  if opts.yes_do_it:
396
    SubmitOpCodeToDrainedQueue(op)
397
  else:
398
    SubmitOrSend(op, opts)
399
  return 0
400

    
401

    
402
def ShowClusterVersion(opts, args):
403
  """Write version of ganeti software to the standard output.
404

405
  @param opts: the command line options selected by the user
406
  @type args: list
407
  @param args: should be an empty list
408
  @rtype: int
409
  @return: the desired exit code
410

411
  """
412
  cl = GetClient(query=True)
413
  result = cl.QueryClusterInfo()
414
  ToStdout("Software version: %s", result["software_version"])
415
  ToStdout("Internode protocol: %s", result["protocol_version"])
416
  ToStdout("Configuration format: %s", result["config_version"])
417
  ToStdout("OS api version: %s", result["os_api_version"])
418
  ToStdout("Export interface: %s", result["export_version"])
419
  ToStdout("VCS version: %s", result["vcs_version"])
420
  return 0
421

    
422

    
423
def ShowClusterMaster(opts, args):
424
  """Write name of master node to the standard output.
425

426
  @param opts: the command line options selected by the user
427
  @type args: list
428
  @param args: should be an empty list
429
  @rtype: int
430
  @return: the desired exit code
431

432
  """
433
  master = bootstrap.GetMaster()
434
  ToStdout(master)
435
  return 0
436

    
437

    
438
def _FormatGroupedParams(paramsdict, roman=False):
439
  """Format Grouped parameters (be, nic, disk) by group.
440

441
  @type paramsdict: dict of dicts
442
  @param paramsdict: {group: {param: value, ...}, ...}
443
  @rtype: dict of dicts
444
  @return: copy of the input dictionaries with strings as values
445

446
  """
447
  ret = {}
448
  for (item, val) in paramsdict.items():
449
    if isinstance(val, dict):
450
      ret[item] = _FormatGroupedParams(val, roman=roman)
451
    elif roman and isinstance(val, int):
452
      ret[item] = compat.TryToRoman(val)
453
    else:
454
      ret[item] = str(val)
455
  return ret
456

    
457

    
458
def ShowClusterConfig(opts, args):
459
  """Shows cluster information.
460

461
  @param opts: the command line options selected by the user
462
  @type args: list
463
  @param args: should be an empty list
464
  @rtype: int
465
  @return: the desired exit code
466

467
  """
468
  cl = GetClient(query=True)
469
  result = cl.QueryClusterInfo()
470

    
471
  if result["tags"]:
472
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
473
  else:
474
    tags = "(none)"
475
  if result["reserved_lvs"]:
476
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
477
  else:
478
    reserved_lvs = "(none)"
479

    
480
  enabled_hv = result["enabled_hypervisors"]
481
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
482
                  if k in enabled_hv)
483

    
484
  info = [
485
    ("Cluster name", result["name"]),
486
    ("Cluster UUID", result["uuid"]),
487

    
488
    ("Creation time", utils.FormatTime(result["ctime"])),
489
    ("Modification time", utils.FormatTime(result["mtime"])),
490

    
491
    ("Master node", result["master"]),
492

    
493
    ("Architecture (this node)",
494
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
495

    
496
    ("Tags", tags),
497

    
498
    ("Default hypervisor", result["default_hypervisor"]),
499
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
500

    
501
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
502

    
503
    ("OS-specific hypervisor parameters",
504
     _FormatGroupedParams(result["os_hvp"])),
505

    
506
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
507

    
508
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
509
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
510

    
511
    ("Cluster parameters", [
512
      ("candidate pool size",
513
       compat.TryToRoman(result["candidate_pool_size"],
514
                         convert=opts.roman_integers)),
515
      ("master netdev", result["master_netdev"]),
516
      ("master netmask", result["master_netmask"]),
517
      ("use external master IP address setup script",
518
       result["use_external_mip_script"]),
519
      ("lvm volume group", result["volume_group_name"]),
520
      ("lvm reserved volumes", reserved_lvs),
521
      ("drbd usermode helper", result["drbd_usermode_helper"]),
522
      ("file storage path", result["file_storage_dir"]),
523
      ("shared file storage path", result["shared_file_storage_dir"]),
524
      ("maintenance of node health", result["maintain_node_health"]),
525
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
526
      ("default instance allocator", result["default_iallocator"]),
527
      ("primary ip version", result["primary_ip_version"]),
528
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
529
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
530
      ("ExtStorage Providers search path",
531
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
532
      ("enabled disk templates",
533
       utils.CommaJoin(result["enabled_disk_templates"])),
534
      ]),
535

    
536
    ("Default node parameters",
537
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
538

    
539
    ("Default instance parameters",
540
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
541

    
542
    ("Default nic parameters",
543
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
544

    
545
    ("Default disk parameters",
546
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
547

    
548
    ("Instance policy - limits for instances",
549
     FormatPolicyInfo(result["ipolicy"], None, True)),
550
    ]
551

    
552
  PrintGenericInfo(info)
553
  return 0
554

    
555

    
556
def ClusterCopyFile(opts, args):
557
  """Copy a file from master to some nodes.
558

559
  @param opts: the command line options selected by the user
560
  @type args: list
561
  @param args: should contain only one element, the path of
562
      the file to be copied
563
  @rtype: int
564
  @return: the desired exit code
565

566
  """
567
  filename = args[0]
568
  if not os.path.exists(filename):
569
    raise errors.OpPrereqError("No such filename '%s'" % filename,
570
                               errors.ECODE_INVAL)
571

    
572
  cl = GetClient()
573
  try:
574
    cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
575

    
576
    results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
577
                             secondary_ips=opts.use_replication_network,
578
                             nodegroup=opts.nodegroup)
579
    ports = GetNodesSshPorts(opts.nodes, cl)
580
  finally:
581
    cl.Close()
582

    
583
  srun = ssh.SshRunner(cluster_name)
584
  for (node, port) in zip(results, ports):
585
    if not srun.CopyFileToNode(node, port, filename):
586
      ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
587

    
588
  return 0
589

    
590

    
591
def RunClusterCommand(opts, args):
592
  """Run a command on some nodes.
593

594
  @param opts: the command line options selected by the user
595
  @type args: list
596
  @param args: should contain the command to be run and its arguments
597
  @rtype: int
598
  @return: the desired exit code
599

600
  """
601
  cl = GetClient()
602

    
603
  command = " ".join(args)
604

    
605
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
606
  ports = GetNodesSshPorts(nodes, cl)
607

    
608
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
609
                                                    "master_node"])
610

    
611
  srun = ssh.SshRunner(cluster_name=cluster_name)
612

    
613
  # Make sure master node is at list end
614
  if master_node in nodes:
615
    nodes.remove(master_node)
616
    nodes.append(master_node)
617

    
618
  for (name, port) in zip(nodes, ports):
619
    result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
620

    
621
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
622
      # Do not output anything for successful commands
623
      continue
624

    
625
    ToStdout("------------------------------------------------")
626
    if opts.show_machine_names:
627
      for line in result.output.splitlines():
628
        ToStdout("%s: %s", name, line)
629
    else:
630
      ToStdout("node: %s", name)
631
      ToStdout("%s", result.output)
632
    ToStdout("return code = %s", result.exit_code)
633

    
634
  return 0
635

    
636

    
637
def VerifyCluster(opts, args):
638
  """Verify integrity of cluster, performing various test on nodes.
639

640
  @param opts: the command line options selected by the user
641
  @type args: list
642
  @param args: should be an empty list
643
  @rtype: int
644
  @return: the desired exit code
645

646
  """
647
  skip_checks = []
648

    
649
  if opts.skip_nplusone_mem:
650
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
651

    
652
  cl = GetClient()
653

    
654
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
655
                               error_codes=opts.error_codes,
656
                               debug_simulate_errors=opts.simulate_errors,
657
                               skip_checks=skip_checks,
658
                               ignore_errors=opts.ignore_errors,
659
                               group_name=opts.nodegroup)
660
  result = SubmitOpCode(op, cl=cl, opts=opts)
661

    
662
  # Keep track of submitted jobs
663
  jex = JobExecutor(cl=cl, opts=opts)
664

    
665
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
666
    jex.AddJobId(None, status, job_id)
667

    
668
  results = jex.GetResults()
669

    
670
  (bad_jobs, bad_results) = \
671
    map(len,
672
        # Convert iterators to lists
673
        map(list,
674
            # Count errors
675
            map(compat.partial(itertools.ifilterfalse, bool),
676
                # Convert result to booleans in a tuple
677
                zip(*((job_success, len(op_results) == 1 and op_results[0])
678
                      for (job_success, op_results) in results)))))
679

    
680
  if bad_jobs == 0 and bad_results == 0:
681
    rcode = constants.EXIT_SUCCESS
682
  else:
683
    rcode = constants.EXIT_FAILURE
684
    if bad_jobs > 0:
685
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
686

    
687
  return rcode
688

    
689

    
690
def VerifyDisks(opts, args):
691
  """Verify integrity of cluster disks.
692

693
  @param opts: the command line options selected by the user
694
  @type args: list
695
  @param args: should be an empty list
696
  @rtype: int
697
  @return: the desired exit code
698

699
  """
700
  cl = GetClient()
701

    
702
  op = opcodes.OpClusterVerifyDisks()
703

    
704
  result = SubmitOpCode(op, cl=cl, opts=opts)
705

    
706
  # Keep track of submitted jobs
707
  jex = JobExecutor(cl=cl, opts=opts)
708

    
709
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
710
    jex.AddJobId(None, status, job_id)
711

    
712
  retcode = constants.EXIT_SUCCESS
713

    
714
  for (status, result) in jex.GetResults():
715
    if not status:
716
      ToStdout("Job failed: %s", result)
717
      continue
718

    
719
    ((bad_nodes, instances, missing), ) = result
720

    
721
    for node, text in bad_nodes.items():
722
      ToStdout("Error gathering data on node %s: %s",
723
               node, utils.SafeEncode(text[-400:]))
724
      retcode = constants.EXIT_FAILURE
725
      ToStdout("You need to fix these nodes first before fixing instances")
726

    
727
    for iname in instances:
728
      if iname in missing:
729
        continue
730
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
731
      try:
732
        ToStdout("Activating disks for instance '%s'", iname)
733
        SubmitOpCode(op, opts=opts, cl=cl)
734
      except errors.GenericError, err:
735
        nret, msg = FormatError(err)
736
        retcode |= nret
737
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
738

    
739
    if missing:
740
      for iname, ival in missing.iteritems():
741
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
742
        if all_missing:
743
          ToStdout("Instance %s cannot be verified as it lives on"
744
                   " broken nodes", iname)
745
        else:
746
          ToStdout("Instance %s has missing logical volumes:", iname)
747
          ival.sort()
748
          for node, vol in ival:
749
            if node in bad_nodes:
750
              ToStdout("\tbroken node %s /dev/%s", node, vol)
751
            else:
752
              ToStdout("\t%s /dev/%s", node, vol)
753

    
754
      ToStdout("You need to replace or recreate disks for all the above"
755
               " instances if this message persists after fixing broken nodes.")
756
      retcode = constants.EXIT_FAILURE
757
    elif not instances:
758
      ToStdout("No disks need to be activated.")
759

    
760
  return retcode
761

    
762

    
763
def RepairDiskSizes(opts, args):
764
  """Verify sizes of cluster disks.
765

766
  @param opts: the command line options selected by the user
767
  @type args: list
768
  @param args: optional list of instances to restrict check to
769
  @rtype: int
770
  @return: the desired exit code
771

772
  """
773
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
774
  SubmitOpCode(op, opts=opts)
775

    
776

    
777
@UsesRPC
778
def MasterFailover(opts, args):
779
  """Failover the master node.
780

781
  This command, when run on a non-master node, will cause the current
782
  master to cease being master, and the non-master to become new
783
  master.
784

785
  @param opts: the command line options selected by the user
786
  @type args: list
787
  @param args: should be an empty list
788
  @rtype: int
789
  @return: the desired exit code
790

791
  """
792
  if opts.no_voting and not opts.yes_do_it:
793
    usertext = ("This will perform the failover even if most other nodes"
794
                " are down, or if this node is outdated. This is dangerous"
795
                " as it can lead to a non-consistent cluster. Check the"
796
                " gnt-cluster(8) man page before proceeding. Continue?")
797
    if not AskUser(usertext):
798
      return 1
799

    
800
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
801

    
802

    
803
def MasterPing(opts, args):
804
  """Checks if the master is alive.
805

806
  @param opts: the command line options selected by the user
807
  @type args: list
808
  @param args: should be an empty list
809
  @rtype: int
810
  @return: the desired exit code
811

812
  """
813
  try:
814
    cl = GetClient()
815
    cl.QueryClusterInfo()
816
    return 0
817
  except Exception: # pylint: disable=W0703
818
    return 1
819

    
820

    
821
def SearchTags(opts, args):
822
  """Searches the tags on all the cluster.
823

824
  @param opts: the command line options selected by the user
825
  @type args: list
826
  @param args: should contain only one element, the tag pattern
827
  @rtype: int
828
  @return: the desired exit code
829

830
  """
831
  op = opcodes.OpTagsSearch(pattern=args[0])
832
  result = SubmitOpCode(op, opts=opts)
833
  if not result:
834
    return 1
835
  result = list(result)
836
  result.sort()
837
  for path, tag in result:
838
    ToStdout("%s %s", path, tag)
839

    
840

    
841
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
842
  """Reads and verifies an X509 certificate.
843

844
  @type cert_filename: string
845
  @param cert_filename: the path of the file containing the certificate to
846
                        verify encoded in PEM format
847
  @type verify_private_key: bool
848
  @param verify_private_key: whether to verify the private key in addition to
849
                             the public certificate
850
  @rtype: string
851
  @return: a string containing the PEM-encoded certificate.
852

853
  """
854
  try:
855
    pem = utils.ReadFile(cert_filename)
856
  except IOError, err:
857
    raise errors.X509CertError(cert_filename,
858
                               "Unable to read certificate: %s" % str(err))
859

    
860
  try:
861
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
862
  except Exception, err:
863
    raise errors.X509CertError(cert_filename,
864
                               "Unable to load certificate: %s" % str(err))
865

    
866
  if verify_private_key:
867
    try:
868
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
869
    except Exception, err:
870
      raise errors.X509CertError(cert_filename,
871
                                 "Unable to load private key: %s" % str(err))
872

    
873
  return pem
874

    
875

    
876
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
877
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
878
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
879
                 cds_filename, force):
880
  """Renews cluster certificates, keys and secrets.
881

882
  @type new_cluster_cert: bool
883
  @param new_cluster_cert: Whether to generate a new cluster certificate
884
  @type new_rapi_cert: bool
885
  @param new_rapi_cert: Whether to generate a new RAPI certificate
886
  @type rapi_cert_filename: string
887
  @param rapi_cert_filename: Path to file containing new RAPI certificate
888
  @type new_spice_cert: bool
889
  @param new_spice_cert: Whether to generate a new SPICE certificate
890
  @type spice_cert_filename: string
891
  @param spice_cert_filename: Path to file containing new SPICE certificate
892
  @type spice_cacert_filename: string
893
  @param spice_cacert_filename: Path to file containing the certificate of the
894
                                CA that signed the SPICE certificate
895
  @type new_confd_hmac_key: bool
896
  @param new_confd_hmac_key: Whether to generate a new HMAC key
897
  @type new_cds: bool
898
  @param new_cds: Whether to generate a new cluster domain secret
899
  @type cds_filename: string
900
  @param cds_filename: Path to file containing new cluster domain secret
901
  @type force: bool
902
  @param force: Whether to ask user for confirmation
903

904
  """
905
  if new_rapi_cert and rapi_cert_filename:
906
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
907
             " options can be specified at the same time.")
908
    return 1
909

    
910
  if new_cds and cds_filename:
911
    ToStderr("Only one of the --new-cluster-domain-secret and"
912
             " --cluster-domain-secret options can be specified at"
913
             " the same time.")
914
    return 1
915

    
916
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
917
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
918
             " and --spice-ca-certificate must not be used.")
919
    return 1
920

    
921
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
922
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
923
             " specified.")
924
    return 1
925

    
926
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
927
  try:
928
    if rapi_cert_filename:
929
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
930
    if spice_cert_filename:
931
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
932
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
933
  except errors.X509CertError, err:
934
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
935
    return 1
936

    
937
  if cds_filename:
938
    try:
939
      cds = utils.ReadFile(cds_filename)
940
    except Exception, err: # pylint: disable=W0703
941
      ToStderr("Can't load new cluster domain secret from %s: %s" %
942
               (cds_filename, str(err)))
943
      return 1
944
  else:
945
    cds = None
946

    
947
  if not force:
948
    usertext = ("This requires all daemons on all nodes to be restarted and"
949
                " may take some time. Continue?")
950
    if not AskUser(usertext):
951
      return 1
952

    
953
  def _RenewCryptoInner(ctx):
954
    ctx.feedback_fn("Updating certificates and keys")
955
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
956
                                    new_rapi_cert,
957
                                    new_spice_cert,
958
                                    new_confd_hmac_key,
959
                                    new_cds,
960
                                    rapi_cert_pem=rapi_cert_pem,
961
                                    spice_cert_pem=spice_cert_pem,
962
                                    spice_cacert_pem=spice_cacert_pem,
963
                                    cds=cds)
964

    
965
    files_to_copy = []
966

    
967
    if new_cluster_cert:
968
      files_to_copy.append(pathutils.NODED_CERT_FILE)
969

    
970
    if new_rapi_cert or rapi_cert_pem:
971
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
972

    
973
    if new_spice_cert or spice_cert_pem:
974
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
975
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
976

    
977
    if new_confd_hmac_key:
978
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
979

    
980
    if new_cds or cds:
981
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
982

    
983
    if files_to_copy:
984
      for node_name in ctx.nonmaster_nodes:
985
        port = ctx.ssh_ports[node_name]
986
        ctx.feedback_fn("Copying %s to %s:%d" %
987
                        (", ".join(files_to_copy), node_name, port))
988
        for file_name in files_to_copy:
989
          ctx.ssh.CopyFileToNode(node_name, port, file_name)
990

    
991
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
992

    
993
  ToStdout("All requested certificates and keys have been replaced."
994
           " Running \"gnt-cluster verify\" now is recommended.")
995

    
996
  return 0
997

    
998

    
999
def RenewCrypto(opts, args):
1000
  """Renews cluster certificates, keys and secrets.
1001

1002
  """
1003
  return _RenewCrypto(opts.new_cluster_cert,
1004
                      opts.new_rapi_cert,
1005
                      opts.rapi_cert,
1006
                      opts.new_spice_cert,
1007
                      opts.spice_cert,
1008
                      opts.spice_cacert,
1009
                      opts.new_confd_hmac_key,
1010
                      opts.new_cluster_domain_secret,
1011
                      opts.cluster_domain_secret,
1012
                      opts.force)
1013

    
1014

    
1015
def _GetEnabledDiskTemplates(opts):
1016
  """Determine the list of enabled disk templates.
1017

1018
  """
1019
  if opts.enabled_disk_templates:
1020
    return opts.enabled_disk_templates.split(",")
1021
  else:
1022
    return None
1023

    
1024

    
1025
def _GetVgName(opts, enabled_disk_templates):
1026
  """Determine the volume group name.
1027

1028
  @type enabled_disk_templates: list of strings
1029
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1030

1031
  """
1032
  # consistency between vg name and enabled disk templates
1033
  vg_name = None
1034
  if opts.vg_name is not None:
1035
    vg_name = opts.vg_name
1036
  if enabled_disk_templates:
1037
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1038
      ToStdout("You specified a volume group with --vg-name, but you did not"
1039
               " enable any of the following lvm-based disk templates: %s" %
1040
               utils.CommaJoin(constants.DTS_LVM))
1041
  return vg_name
1042

    
1043

    
1044
def _GetDrbdHelper(opts, enabled_disk_templates):
1045
  """Determine the DRBD usermode helper.
1046

1047
  """
1048
  drbd_helper = opts.drbd_helper
1049
  if enabled_disk_templates:
1050
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1051
    if not drbd_enabled and opts.drbd_helper:
1052
      ToStdout("You specified a DRBD usermode helper with "
1053
               " --drbd-usermode-helper while DRBD is not enabled.")
1054
  return drbd_helper
1055

    
1056

    
1057
def SetClusterParams(opts, args):
1058
  """Modify the cluster.
1059

1060
  @param opts: the command line options selected by the user
1061
  @type args: list
1062
  @param args: should be an empty list
1063
  @rtype: int
1064
  @return: the desired exit code
1065

1066
  """
1067
  if not (opts.vg_name is not None or
1068
          opts.drbd_helper is not None or
1069
          opts.enabled_hypervisors or opts.hvparams or
1070
          opts.beparams or opts.nicparams or
1071
          opts.ndparams or opts.diskparams or
1072
          opts.candidate_pool_size is not None or
1073
          opts.uid_pool is not None or
1074
          opts.maintain_node_health is not None or
1075
          opts.add_uids is not None or
1076
          opts.remove_uids is not None or
1077
          opts.default_iallocator is not None or
1078
          opts.reserved_lvs is not None or
1079
          opts.master_netdev is not None or
1080
          opts.master_netmask is not None or
1081
          opts.use_external_mip_script is not None or
1082
          opts.prealloc_wipe_disks is not None or
1083
          opts.hv_state or
1084
          opts.enabled_disk_templates or
1085
          opts.disk_state or
1086
          opts.ipolicy_bounds_specs is not None or
1087
          opts.ipolicy_std_specs is not None or
1088
          opts.ipolicy_disk_templates is not None or
1089
          opts.ipolicy_vcpu_ratio is not None or
1090
          opts.ipolicy_spindle_ratio is not None or
1091
          opts.modify_etc_hosts is not None or
1092
          opts.file_storage_dir is not None):
1093
    ToStderr("Please give at least one of the parameters.")
1094
    return 1
1095

    
1096
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1097
  vg_name = _GetVgName(opts, enabled_disk_templates)
1098

    
1099
  try:
1100
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1101
  except errors.OpPrereqError, e:
1102
    ToStderr(str(e))
1103
    return 1
1104

    
1105
  hvlist = opts.enabled_hypervisors
1106
  if hvlist is not None:
1107
    hvlist = hvlist.split(",")
1108

    
1109
  # a list of (name, dict) we can pass directly to dict() (or [])
1110
  hvparams = dict(opts.hvparams)
1111
  for hv_params in hvparams.values():
1112
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1113

    
1114
  diskparams = dict(opts.diskparams)
1115

    
1116
  for dt_params in diskparams.values():
1117
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1118

    
1119
  beparams = opts.beparams
1120
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1121

    
1122
  nicparams = opts.nicparams
1123
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1124

    
1125
  ndparams = opts.ndparams
1126
  if ndparams is not None:
1127
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1128

    
1129
  ipolicy = CreateIPolicyFromOpts(
1130
    minmax_ispecs=opts.ipolicy_bounds_specs,
1131
    std_ispecs=opts.ipolicy_std_specs,
1132
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1133
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1134
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1135
    )
1136

    
1137
  mnh = opts.maintain_node_health
1138

    
1139
  uid_pool = opts.uid_pool
1140
  if uid_pool is not None:
1141
    uid_pool = uidpool.ParseUidPool(uid_pool)
1142

    
1143
  add_uids = opts.add_uids
1144
  if add_uids is not None:
1145
    add_uids = uidpool.ParseUidPool(add_uids)
1146

    
1147
  remove_uids = opts.remove_uids
1148
  if remove_uids is not None:
1149
    remove_uids = uidpool.ParseUidPool(remove_uids)
1150

    
1151
  if opts.reserved_lvs is not None:
1152
    if opts.reserved_lvs == "":
1153
      opts.reserved_lvs = []
1154
    else:
1155
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1156

    
1157
  if opts.master_netmask is not None:
1158
    try:
1159
      opts.master_netmask = int(opts.master_netmask)
1160
    except ValueError:
1161
      ToStderr("The --master-netmask option expects an int parameter.")
1162
      return 1
1163

    
1164
  ext_ip_script = opts.use_external_mip_script
1165

    
1166
  if opts.disk_state:
1167
    disk_state = utils.FlatToDict(opts.disk_state)
1168
  else:
1169
    disk_state = {}
1170

    
1171
  hv_state = dict(opts.hv_state)
1172

    
1173
  op = opcodes.OpClusterSetParams(
1174
    vg_name=vg_name,
1175
    drbd_helper=drbd_helper,
1176
    enabled_hypervisors=hvlist,
1177
    hvparams=hvparams,
1178
    os_hvp=None,
1179
    beparams=beparams,
1180
    nicparams=nicparams,
1181
    ndparams=ndparams,
1182
    diskparams=diskparams,
1183
    ipolicy=ipolicy,
1184
    candidate_pool_size=opts.candidate_pool_size,
1185
    maintain_node_health=mnh,
1186
    modify_etc_hosts=opts.modify_etc_hosts,
1187
    uid_pool=uid_pool,
1188
    add_uids=add_uids,
1189
    remove_uids=remove_uids,
1190
    default_iallocator=opts.default_iallocator,
1191
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1192
    master_netdev=opts.master_netdev,
1193
    master_netmask=opts.master_netmask,
1194
    reserved_lvs=opts.reserved_lvs,
1195
    use_external_mip_script=ext_ip_script,
1196
    hv_state=hv_state,
1197
    disk_state=disk_state,
1198
    enabled_disk_templates=enabled_disk_templates,
1199
    force=opts.force,
1200
    file_storage_dir=opts.file_storage_dir,
1201
    )
1202
  SubmitOrSend(op, opts)
1203
  return 0
1204

    
1205

    
1206
def QueueOps(opts, args):
1207
  """Queue operations.
1208

1209
  @param opts: the command line options selected by the user
1210
  @type args: list
1211
  @param args: should contain only one element, the subcommand
1212
  @rtype: int
1213
  @return: the desired exit code
1214

1215
  """
1216
  command = args[0]
1217
  client = GetClient()
1218
  if command in ("drain", "undrain"):
1219
    drain_flag = command == "drain"
1220
    client.SetQueueDrainFlag(drain_flag)
1221
  elif command == "info":
1222
    result = client.QueryConfigValues(["drain_flag"])
1223
    if result[0]:
1224
      val = "set"
1225
    else:
1226
      val = "unset"
1227
    ToStdout("The drain flag is %s" % val)
1228
  else:
1229
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1230
                               errors.ECODE_INVAL)
1231

    
1232
  return 0
1233

    
1234

    
1235
def _ShowWatcherPause(until):
1236
  if until is None or until < time.time():
1237
    ToStdout("The watcher is not paused.")
1238
  else:
1239
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1240

    
1241

    
1242
def WatcherOps(opts, args):
1243
  """Watcher operations.
1244

1245
  @param opts: the command line options selected by the user
1246
  @type args: list
1247
  @param args: should contain only one element, the subcommand
1248
  @rtype: int
1249
  @return: the desired exit code
1250

1251
  """
1252
  command = args[0]
1253
  client = GetClient()
1254

    
1255
  if command == "continue":
1256
    client.SetWatcherPause(None)
1257
    ToStdout("The watcher is no longer paused.")
1258

    
1259
  elif command == "pause":
1260
    if len(args) < 2:
1261
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1262

    
1263
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1264
    _ShowWatcherPause(result)
1265

    
1266
  elif command == "info":
1267
    result = client.QueryConfigValues(["watcher_pause"])
1268
    _ShowWatcherPause(result[0])
1269

    
1270
  else:
1271
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1272
                               errors.ECODE_INVAL)
1273

    
1274
  return 0
1275

    
1276

    
1277
def _OobPower(opts, node_list, power):
1278
  """Puts the node in the list to desired power state.
1279

1280
  @param opts: The command line options selected by the user
1281
  @param node_list: The list of nodes to operate on
1282
  @param power: True if they should be powered on, False otherwise
1283
  @return: The success of the operation (none failed)
1284

1285
  """
1286
  if power:
1287
    command = constants.OOB_POWER_ON
1288
  else:
1289
    command = constants.OOB_POWER_OFF
1290

    
1291
  op = opcodes.OpOobCommand(node_names=node_list,
1292
                            command=command,
1293
                            ignore_status=True,
1294
                            timeout=opts.oob_timeout,
1295
                            power_delay=opts.power_delay)
1296
  result = SubmitOpCode(op, opts=opts)
1297
  errs = 0
1298
  for node_result in result:
1299
    (node_tuple, data_tuple) = node_result
1300
    (_, node_name) = node_tuple
1301
    (data_status, _) = data_tuple
1302
    if data_status != constants.RS_NORMAL:
1303
      assert data_status != constants.RS_UNAVAIL
1304
      errs += 1
1305
      ToStderr("There was a problem changing power for %s, please investigate",
1306
               node_name)
1307

    
1308
  if errs > 0:
1309
    return False
1310

    
1311
  return True
1312

    
1313

    
1314
def _InstanceStart(opts, inst_list, start, no_remember=False):
1315
  """Puts the instances in the list to desired state.
1316

1317
  @param opts: The command line options selected by the user
1318
  @param inst_list: The list of instances to operate on
1319
  @param start: True if they should be started, False for shutdown
1320
  @param no_remember: If the instance state should be remembered
1321
  @return: The success of the operation (none failed)
1322

1323
  """
1324
  if start:
1325
    opcls = opcodes.OpInstanceStartup
1326
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1327
  else:
1328
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1329
                           timeout=opts.shutdown_timeout,
1330
                           no_remember=no_remember)
1331
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1332

    
1333
  jex = JobExecutor(opts=opts)
1334

    
1335
  for inst in inst_list:
1336
    ToStdout("Submit %s of instance %s", text_submit, inst)
1337
    op = opcls(instance_name=inst)
1338
    jex.QueueJob(inst, op)
1339

    
1340
  results = jex.GetResults()
1341
  bad_cnt = len([1 for (success, _) in results if not success])
1342

    
1343
  if bad_cnt == 0:
1344
    ToStdout("All instances have been %s successfully", text_success)
1345
  else:
1346
    ToStderr("There were errors while %s instances:\n"
1347
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1348
             len(results))
1349
    return False
1350

    
1351
  return True
1352

    
1353

    
1354
class _RunWhenNodesReachableHelper:
1355
  """Helper class to make shared internal state sharing easier.
1356

1357
  @ivar success: Indicates if all action_cb calls were successful
1358

1359
  """
1360
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1361
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1362
    """Init the object.
1363

1364
    @param node_list: The list of nodes to be reachable
1365
    @param action_cb: Callback called when a new host is reachable
1366
    @type node2ip: dict
1367
    @param node2ip: Node to ip mapping
1368
    @param port: The port to use for the TCP ping
1369
    @param feedback_fn: The function used for feedback
1370
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1371
    @param _sleep_fn: Function to sleep (for unittest use only)
1372

1373
    """
1374
    self.down = set(node_list)
1375
    self.up = set()
1376
    self.node2ip = node2ip
1377
    self.success = True
1378
    self.action_cb = action_cb
1379
    self.port = port
1380
    self.feedback_fn = feedback_fn
1381
    self._ping_fn = _ping_fn
1382
    self._sleep_fn = _sleep_fn
1383

    
1384
  def __call__(self):
1385
    """When called we run action_cb.
1386

1387
    @raises utils.RetryAgain: When there are still down nodes
1388

1389
    """
1390
    if not self.action_cb(self.up):
1391
      self.success = False
1392

    
1393
    if self.down:
1394
      raise utils.RetryAgain()
1395
    else:
1396
      return self.success
1397

    
1398
  def Wait(self, secs):
1399
    """Checks if a host is up or waits remaining seconds.
1400

1401
    @param secs: The secs remaining
1402

1403
    """
1404
    start = time.time()
1405
    for node in self.down:
1406
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1407
                       live_port_needed=True):
1408
        self.feedback_fn("Node %s became available" % node)
1409
        self.up.add(node)
1410
        self.down -= self.up
1411
        # If we have a node available there is the possibility to run the
1412
        # action callback successfully, therefore we don't wait and return
1413
        return
1414

    
1415
    self._sleep_fn(max(0.0, start + secs - time.time()))
1416

    
1417

    
1418
def _RunWhenNodesReachable(node_list, action_cb, interval):
1419
  """Run action_cb when nodes become reachable.
1420

1421
  @param node_list: The list of nodes to be reachable
1422
  @param action_cb: Callback called when a new host is reachable
1423
  @param interval: The earliest time to retry
1424

1425
  """
1426
  client = GetClient()
1427
  cluster_info = client.QueryClusterInfo()
1428
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1429
    family = netutils.IPAddress.family
1430
  else:
1431
    family = netutils.IP6Address.family
1432

    
1433
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1434
                 for node in node_list)
1435

    
1436
  port = netutils.GetDaemonPort(constants.NODED)
1437
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1438
                                        ToStdout)
1439

    
1440
  try:
1441
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1442
                       wait_fn=helper.Wait)
1443
  except utils.RetryTimeout:
1444
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1445
             " again:\n  - %s", "  - ".join(helper.down))
1446
    return False
1447

    
1448

    
1449
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1450
                          _instance_start_fn=_InstanceStart):
1451
  """Start the instances conditional based on node_states.
1452

1453
  @param opts: The command line options selected by the user
1454
  @param inst_map: A dict of inst -> nodes mapping
1455
  @param nodes_online: A list of nodes online
1456
  @param _instance_start_fn: Callback to start instances (unittest use only)
1457
  @return: Success of the operation on all instances
1458

1459
  """
1460
  start_inst_list = []
1461
  for (inst, nodes) in inst_map.items():
1462
    if not (nodes - nodes_online):
1463
      # All nodes the instance lives on are back online
1464
      start_inst_list.append(inst)
1465

    
1466
  for inst in start_inst_list:
1467
    del inst_map[inst]
1468

    
1469
  if start_inst_list:
1470
    return _instance_start_fn(opts, start_inst_list, True)
1471

    
1472
  return True
1473

    
1474

    
1475
def _EpoOn(opts, full_node_list, node_list, inst_map):
1476
  """Does the actual power on.
1477

1478
  @param opts: The command line options selected by the user
1479
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1480
                         OOB)
1481
  @param node_list: The list of nodes to operate on (all need to support OOB)
1482
  @param inst_map: A dict of inst -> nodes mapping
1483
  @return: The desired exit status
1484

1485
  """
1486
  if node_list and not _OobPower(opts, node_list, False):
1487
    ToStderr("Not all nodes seem to get back up, investigate and start"
1488
             " manually if needed")
1489

    
1490
  # Wait for the nodes to be back up
1491
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1492

    
1493
  ToStdout("Waiting until all nodes are available again")
1494
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1495
    ToStderr("Please investigate and start stopped instances manually")
1496
    return constants.EXIT_FAILURE
1497

    
1498
  return constants.EXIT_SUCCESS
1499

    
1500

    
1501
def _EpoOff(opts, node_list, inst_map):
1502
  """Does the actual power off.
1503

1504
  @param opts: The command line options selected by the user
1505
  @param node_list: The list of nodes to operate on (all need to support OOB)
1506
  @param inst_map: A dict of inst -> nodes mapping
1507
  @return: The desired exit status
1508

1509
  """
1510
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1511
    ToStderr("Please investigate and stop instances manually before continuing")
1512
    return constants.EXIT_FAILURE
1513

    
1514
  if not node_list:
1515
    return constants.EXIT_SUCCESS
1516

    
1517
  if _OobPower(opts, node_list, False):
1518
    return constants.EXIT_SUCCESS
1519
  else:
1520
    return constants.EXIT_FAILURE
1521

    
1522

    
1523
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1524
        _confirm_fn=ConfirmOperation,
1525
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1526
  """EPO operations.
1527

1528
  @param opts: the command line options selected by the user
1529
  @type args: list
1530
  @param args: should contain only one element, the subcommand
1531
  @rtype: int
1532
  @return: the desired exit code
1533

1534
  """
1535
  if opts.groups and opts.show_all:
1536
    _stderr_fn("Only one of --groups or --all are allowed")
1537
    return constants.EXIT_FAILURE
1538
  elif args and opts.show_all:
1539
    _stderr_fn("Arguments in combination with --all are not allowed")
1540
    return constants.EXIT_FAILURE
1541

    
1542
  if cl is None:
1543
    cl = GetClient()
1544

    
1545
  if opts.groups:
1546
    node_query_list = \
1547
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1548
  else:
1549
    node_query_list = args
1550

    
1551
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1552
                                           "sinst_list", "powered", "offline"],
1553
                         False)
1554

    
1555
  all_nodes = map(compat.fst, result)
1556
  node_list = []
1557
  inst_map = {}
1558
  for (node, master, pinsts, sinsts, powered, offline) in result:
1559
    if not offline:
1560
      for inst in (pinsts + sinsts):
1561
        if inst in inst_map:
1562
          if not master:
1563
            inst_map[inst].add(node)
1564
        elif master:
1565
          inst_map[inst] = set()
1566
        else:
1567
          inst_map[inst] = set([node])
1568

    
1569
    if master and opts.on:
1570
      # We ignore the master for turning on the machines, in fact we are
1571
      # already operating on the master at this point :)
1572
      continue
1573
    elif master and not opts.show_all:
1574
      _stderr_fn("%s is the master node, please do a master-failover to another"
1575
                 " node not affected by the EPO or use --all if you intend to"
1576
                 " shutdown the whole cluster", node)
1577
      return constants.EXIT_FAILURE
1578
    elif powered is None:
1579
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1580
                 " handled in a fully automated manner", node)
1581
    elif powered == opts.on:
1582
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1583
    elif not offline or (offline and powered):
1584
      node_list.append(node)
1585

    
1586
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1587
    return constants.EXIT_FAILURE
1588

    
1589
  if opts.on:
1590
    return _on_fn(opts, all_nodes, node_list, inst_map)
1591
  else:
1592
    return _off_fn(opts, node_list, inst_map)
1593

    
1594

    
1595
def _GetCreateCommand(info):
1596
  buf = StringIO()
1597
  buf.write("gnt-cluster init")
1598
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1599
  buf.write(" ")
1600
  buf.write(info["name"])
1601
  return buf.getvalue()
1602

    
1603

    
1604
def ShowCreateCommand(opts, args):
1605
  """Shows the command that can be used to re-create the cluster.
1606

1607
  Currently it works only for ipolicy specs.
1608

1609
  """
1610
  cl = GetClient(query=True)
1611
  result = cl.QueryClusterInfo()
1612
  ToStdout(_GetCreateCommand(result))
1613

    
1614

    
1615
def _RunCommandAndReport(cmd):
1616
  """Run a command and report its output, iff it failed.
1617

1618
  @param cmd: the command to execute
1619
  @type cmd: list
1620
  @rtype: bool
1621
  @return: False, if the execution failed.
1622

1623
  """
1624
  result = utils.RunCmd(cmd)
1625
  if result.failed:
1626
    ToStderr("Command %s failed: %s; Output %s" %
1627
             (cmd, result.fail_reason, result.output))
1628
    return False
1629
  return True
1630

    
1631

    
1632
def _VerifyCommand(cmd):
1633
  """Verify that a given command succeeds on all online nodes.
1634

1635
  As this function is intended to run during upgrades, it
1636
  is implemented in such a way that it still works, if all Ganeti
1637
  daemons are down.
1638

1639
  @param cmd: the command to execute
1640
  @type cmd: list
1641
  @rtype: list
1642
  @return: the list of node names that are online where
1643
      the command failed.
1644

1645
  """
1646
  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1647

    
1648
  nodes = ssconf.SimpleStore().GetOnlineNodeList()
1649
  master_node = ssconf.SimpleStore().GetMasterNode()
1650
  cluster_name = ssconf.SimpleStore().GetClusterName()
1651

    
1652
  # If master node is in 'nodes', make sure master node is at list end
1653
  if master_node in nodes:
1654
    nodes.remove(master_node)
1655
    nodes.append(master_node)
1656

    
1657
  failed = []
1658

    
1659
  srun = ssh.SshRunner(cluster_name=cluster_name)
1660
  for name in nodes:
1661
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1662
    if result.exit_code != 0:
1663
      failed.append(name)
1664

    
1665
  return failed
1666

    
1667

    
1668
def _VerifyVersionInstalled(versionstring):
1669
  """Verify that the given version of ganeti is installed on all online nodes.
1670

1671
  Do nothing, if this is the case, otherwise print an appropriate
1672
  message to stderr.
1673

1674
  @param versionstring: the version to check for
1675
  @type versionstring: string
1676
  @rtype: bool
1677
  @return: True, if the version is installed on all online nodes
1678

1679
  """
1680
  badnodes = _VerifyCommand(["test", "-d",
1681
                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
1682
  if badnodes:
1683
    ToStderr("Ganeti version %s not installed on nodes %s"
1684
             % (versionstring, ", ".join(badnodes)))
1685
    return False
1686

    
1687
  return True
1688

    
1689

    
1690
def _GetRunning():
1691
  """Determine the list of running jobs.
1692

1693
  @rtype: list
1694
  @return: the number of jobs still running
1695

1696
  """
1697
  cl = GetClient()
1698
  qfilter = qlang.MakeSimpleFilter("status",
1699
                                   frozenset([constants.JOB_STATUS_RUNNING]))
1700
  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1701

    
1702

    
1703
def _SetGanetiVersion(versionstring):
1704
  """Set the active version of ganeti to the given versionstring
1705

1706
  @type versionstring: string
1707
  @rtype: list
1708
  @return: the list of nodes where the version change failed
1709

1710
  """
1711
  failed = []
1712
  if constants.HAS_GNU_LN:
1713
    failed.extend(_VerifyCommand(
1714
        ["ln", "-s", "-f", "-T",
1715
         os.path.join(pathutils.PKGLIBDIR, versionstring),
1716
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1717
    failed.extend(_VerifyCommand(
1718
        ["ln", "-s", "-f", "-T",
1719
         os.path.join(pathutils.SHAREDIR, versionstring),
1720
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1721
  else:
1722
    failed.extend(_VerifyCommand(
1723
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1724
    failed.extend(_VerifyCommand(
1725
        ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1726
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1727
    failed.extend(_VerifyCommand(
1728
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1729
    failed.extend(_VerifyCommand(
1730
        ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1731
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1732
  return list(set(failed))
1733

    
1734

    
1735
def _ExecuteCommands(fns):
1736
  """Execute a list of functions, in reverse order.
1737

1738
  @type fns: list of functions.
1739
  @param fns: the functions to be executed.
1740

1741
  """
1742
  for fn in reversed(fns):
1743
    fn()
1744

    
1745

    
1746
def _GetConfigVersion():
1747
  """Determine the version the configuration file currently has.
1748

1749
  @rtype: tuple or None
1750
  @return: (major, minor, revision) if the version can be determined,
1751
      None otherwise
1752

1753
  """
1754
  config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1755
  try:
1756
    config_version = config_data["version"]
1757
  except KeyError:
1758
    return None
1759
  return utils.SplitVersion(config_version)
1760

    
1761

    
1762
def _ReadIntentToUpgrade():
1763
  """Read the file documenting the intent to upgrade the cluster.
1764

1765
  @rtype: string or None
1766
  @return: the version to upgrade to, if the file exists, and None
1767
      otherwise.
1768

1769
  """
1770
  if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1771
    return None
1772

    
1773
  contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1774
  contents = utils.UnescapeAndSplit(contentstring)
1775
  if len(contents) != 2:
1776
    # file syntactically mal-formed
1777
    return None
1778
  return contents[0]
1779

    
1780

    
1781
def _WriteIntentToUpgrade(version):
1782
  """Write file documenting the intent to upgrade the cluster.
1783

1784
  @type version: string
1785
  @param version: the version we intent to upgrade to
1786

1787
  """
1788
  utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1789
                  data=utils.EscapeAndJoin([version, "%d" % os.getpid()]))
1790

    
1791

    
1792
def _UpgradeBeforeConfigurationChange(versionstring):
1793
  """
1794
  Carry out all the tasks necessary for an upgrade that happen before
1795
  the configuration file, or Ganeti version, changes.
1796

1797
  @type versionstring: string
1798
  @param versionstring: the version to upgrade to
1799
  @rtype: (bool, list)
1800
  @return: tuple of a bool indicating success and a list of rollback tasks
1801

1802
  """
1803
  rollback = []
1804

    
1805
  if not _VerifyVersionInstalled(versionstring):
1806
    return (False, rollback)
1807

    
1808
  _WriteIntentToUpgrade(versionstring)
1809
  rollback.append(
1810
    lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1811

    
1812
  ToStdout("Draining queue")
1813
  client = GetClient()
1814
  client.SetQueueDrainFlag(True)
1815

    
1816
  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1817

    
1818
  if utils.SimpleRetry(0, _GetRunning,
1819
                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
1820
                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1821
    ToStderr("Failed to completely empty the queue.")
1822
    return (False, rollback)
1823

    
1824
  ToStdout("Stopping daemons on master node.")
1825
  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1826
    return (False, rollback)
1827

    
1828
  if not _VerifyVersionInstalled(versionstring):
1829
    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1830
    return (False, rollback)
1831

    
1832
  ToStdout("Stopping daemons everywhere.")
1833
  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1834
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1835
  if badnodes:
1836
    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
1837
    return (False, rollback)
1838

    
1839
  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
1840
                           "lib/ganeti%d.tar" % time.time())
1841
  ToStdout("Backing up configuration as %s" % backuptar)
1842
  if not _RunCommandAndReport(["tar", "cf", backuptar,
1843
                               pathutils.DATA_DIR]):
1844
    return (False, rollback)
1845

    
1846
  return (True, rollback)
1847

    
1848

    
1849
def _SwitchVersionAndConfig(versionstring, downgrade):
1850
  """
1851
  Switch to the new Ganeti version and change the configuration,
1852
  in correct order.
1853

1854
  @type versionstring: string
1855
  @param versionstring: the version to change to
1856
  @type downgrade: bool
1857
  @param downgrade: True, if the configuration should be downgraded
1858
  @rtype: (bool, list)
1859
  @return: tupe of a bool indicating success, and a list of
1860
      additional rollback tasks
1861

1862
  """
1863
  rollback = []
1864
  if downgrade:
1865
    ToStdout("Downgrading configuration")
1866
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
1867
      return (False, rollback)
1868

    
1869
  # Configuration change is the point of no return. From then onwards, it is
1870
  # safer to push through the up/dowgrade than to try to roll it back.
1871

    
1872
  ToStdout("Switching to version %s on all nodes" % versionstring)
1873
  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
1874
  badnodes = _SetGanetiVersion(versionstring)
1875
  if badnodes:
1876
    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
1877
             % (versionstring, ", ".join(badnodes)))
1878
    if not downgrade:
1879
      return (False, rollback)
1880

    
1881
  # Now that we have changed to the new version of Ganeti we should
1882
  # not communicate over luxi any more, as luxi might have changed in
1883
  # incompatible ways. Therefore, manually call the corresponding ganeti
1884
  # commands using their canonical (version independent) path.
1885

    
1886
  if not downgrade:
1887
    ToStdout("Upgrading configuration")
1888
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
1889
      return (False, rollback)
1890

    
1891
  return (True, rollback)
1892

    
1893

    
1894
def _UpgradeAfterConfigurationChange():
1895
  """
1896
  Carry out the upgrade actions necessary after switching to the new
1897
  Ganeti version and updating the configuration.
1898

1899
  As this part is run at a time where the new version of Ganeti is already
1900
  running, no communication should happen via luxi, as this is not a stable
1901
  interface. Also, as the configuration change is the point of no return,
1902
  all actions are pushed trough, even if some of them fail.
1903

1904
  @rtype: int
1905
  @return: the intended return value
1906

1907
  """
1908
  returnvalue = 0
1909

    
1910
  ToStdout("Starting daemons everywhere.")
1911
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1912
  if badnodes:
1913
    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
1914
    returnvalue = 1
1915

    
1916
  ToStdout("Ensuring directories everywhere.")
1917
  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
1918
  if badnodes:
1919
    ToStderr("Warning: failed to ensure directories on %s." %
1920
             (", ".join(badnodes)))
1921
    returnvalue = 1
1922

    
1923
  ToStdout("Redistributing the configuration.")
1924
  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
1925
    returnvalue = 1
1926

    
1927
  ToStdout("Restarting daemons everywhere.")
1928
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1929
  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1930
  if badnodes:
1931
    ToStderr("Warning: failed to start daemons on %s." %
1932
             (", ".join(list(set(badnodes))),))
1933
    returnvalue = 1
1934

    
1935
  ToStdout("Undraining the queue.")
1936
  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
1937
    returnvalue = 1
1938

    
1939
  _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
1940

    
1941
  ToStdout("Verifying cluster.")
1942
  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
1943
    returnvalue = 1
1944

    
1945
  return returnvalue
1946

    
1947

    
1948
def UpgradeGanetiCommand(opts, args):
1949
  """Upgrade a cluster to a new ganeti version.
1950

1951
  @param opts: the command line options selected by the user
1952
  @type args: list
1953
  @param args: should be an empty list
1954
  @rtype: int
1955
  @return: the desired exit code
1956

1957
  """
1958
  if ((not opts.resume and opts.to is None)
1959
      or (opts.resume and opts.to is not None)):
1960
    ToStderr("Precisely one of the options --to and --resume"
1961
             " has to be given")
1962
    return 1
1963

    
1964
  if opts.resume:
1965
    ssconf.CheckMaster(False)
1966
    versionstring = _ReadIntentToUpgrade()
1967
    if versionstring is None:
1968
      return 0
1969
    version = utils.version.ParseVersion(versionstring)
1970
    if version is None:
1971
      return 1
1972
    configversion = _GetConfigVersion()
1973
    if configversion is None:
1974
      return 1
1975
    # If the upgrade we resume was an upgrade between compatible
1976
    # versions (like 2.10.0 to 2.10.1), the correct configversion
1977
    # does not guarantee that the config has been updated.
1978
    # However, in the case of a compatible update with the configuration
1979
    # not touched, we are running a different dirversion with the same
1980
    # config version.
1981
    config_already_modified = \
1982
      (utils.IsCorrectConfigVersion(version, configversion) and
1983
       not (versionstring != constants.DIR_VERSION and
1984
            configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
1985
                              constants.CONFIG_REVISION)))
1986
    if not config_already_modified:
1987
      # We have to start from the beginning; however, some daemons might have
1988
      # already been stopped, so the only way to get into a well-defined state
1989
      # is by starting all daemons again.
1990
      _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1991
  else:
1992
    versionstring = opts.to
1993
    config_already_modified = False
1994
    version = utils.version.ParseVersion(versionstring)
1995
    if version is None:
1996
      ToStderr("Could not parse version string %s" % versionstring)
1997
      return 1
1998

    
1999
  msg = utils.version.UpgradeRange(version)
2000
  if msg is not None:
2001
    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2002
    return 1
2003

    
2004
  if not config_already_modified:
2005
    success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2006
    if not success:
2007
      _ExecuteCommands(rollback)
2008
      return 1
2009
  else:
2010
    rollback = []
2011

    
2012
  downgrade = utils.version.ShouldCfgdowngrade(version)
2013

    
2014
  success, additionalrollback =  \
2015
      _SwitchVersionAndConfig(versionstring, downgrade)
2016
  if not success:
2017
    rollback.extend(additionalrollback)
2018
    _ExecuteCommands(rollback)
2019
    return 1
2020

    
2021
  return _UpgradeAfterConfigurationChange()
2022

    
2023

    
2024
commands = {
2025
  "init": (
2026
    InitCluster, [ArgHost(min=1, max=1)],
2027
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2028
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2029
     NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2030
     SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2031
     DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT,
2032
     PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT,
2033
     USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT,
2034
     ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT]
2035
     + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2036
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2037
  "destroy": (
2038
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2039
    "", "Destroy cluster"),
2040
  "rename": (
2041
    RenameCluster, [ArgHost(min=1, max=1)],
2042
    [FORCE_OPT, DRY_RUN_OPT],
2043
    "<new_name>",
2044
    "Renames the cluster"),
2045
  "redist-conf": (
2046
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2047
    [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2048
    "", "Forces a push of the configuration file and ssconf files"
2049
    " to the nodes in the cluster"),
2050
  "verify": (
2051
    VerifyCluster, ARGS_NONE,
2052
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2053
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2054
    "", "Does a check on the cluster configuration"),
2055
  "verify-disks": (
2056
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2057
    "", "Does a check on the cluster disk status"),
2058
  "repair-disk-sizes": (
2059
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2060
    "[instance...]", "Updates mismatches in recorded disk sizes"),
2061
  "master-failover": (
2062
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2063
    "", "Makes the current node the master"),
2064
  "master-ping": (
2065
    MasterPing, ARGS_NONE, [],
2066
    "", "Checks if the master is alive"),
2067
  "version": (
2068
    ShowClusterVersion, ARGS_NONE, [],
2069
    "", "Shows the cluster version"),
2070
  "getmaster": (
2071
    ShowClusterMaster, ARGS_NONE, [],
2072
    "", "Shows the cluster master"),
2073
  "copyfile": (
2074
    ClusterCopyFile, [ArgFile(min=1, max=1)],
2075
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2076
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2077
  "command": (
2078
    RunClusterCommand, [ArgCommand(min=1)],
2079
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2080
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2081
  "info": (
2082
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2083
    "[--roman]", "Show cluster configuration"),
2084
  "list-tags": (
2085
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2086
  "add-tags": (
2087
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2088
    "tag...", "Add tags to the cluster"),
2089
  "remove-tags": (
2090
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2091
    "tag...", "Remove tags from the cluster"),
2092
  "search-tags": (
2093
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2094
    "Searches the tags on all objects on"
2095
    " the cluster for a given pattern (regex)"),
2096
  "queue": (
2097
    QueueOps,
2098
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2099
    [], "drain|undrain|info", "Change queue properties"),
2100
  "watcher": (
2101
    WatcherOps,
2102
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2103
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2104
    [],
2105
    "{pause <timespec>|continue|info}", "Change watcher properties"),
2106
  "modify": (
2107
    SetClusterParams, ARGS_NONE,
2108
    [FORCE_OPT,
2109
     BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
2110
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT,
2111
     UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT, DRBD_HELPER_OPT,
2112
     DEFAULT_IALLOCATOR_OPT, RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT,
2113
     PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT,
2114
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + SUBMIT_OPTS +
2115
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
2116
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
2117
    "[opts...]",
2118
    "Alters the parameters of the cluster"),
2119
  "renew-crypto": (
2120
    RenewCrypto, ARGS_NONE,
2121
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2122
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2123
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2124
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
2125
    "[opts...]",
2126
    "Renews cluster certificates, keys and secrets"),
2127
  "epo": (
2128
    Epo, [ArgUnknown()],
2129
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2130
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2131
    "[opts...] [args]",
2132
    "Performs an emergency power-off on given args"),
2133
  "activate-master-ip": (
2134
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2135
  "deactivate-master-ip": (
2136
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2137
    "Deactivates the master IP"),
2138
  "show-ispecs-cmd": (
2139
    ShowCreateCommand, ARGS_NONE, [], "",
2140
    "Show the command line to re-create the cluster"),
2141
  "upgrade": (
2142
    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2143
    "Upgrade (or downgrade) to a new Ganeti version"),
2144
  }
2145

    
2146

    
2147
#: dictionary with aliases for commands
2148
aliases = {
2149
  "masterfailover": "master-failover",
2150
  "show": "info",
2151
}
2152

    
2153

    
2154
def Main():
2155
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2156
                     aliases=aliases)