Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ ea22736b

History | View | Annotate | Download (71.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import pathutils
48
from ganeti import serializer
49
from ganeti import qlang
50

    
51

    
52
ON_OPT = cli_option("--on", default=False,
53
                    action="store_true", dest="on",
54
                    help="Recover from an EPO")
55

    
56
GROUPS_OPT = cli_option("--groups", default=False,
57
                        action="store_true", dest="groups",
58
                        help="Arguments are node groups instead of nodes")
59

    
60
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
61
                            help="Override interactive check for --no-voting",
62
                            default=False, action="store_true")
63

    
64
FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
65
                                help="Unconditionally distribute the"
66
                                " configuration, even if the queue"
67
                                " is drained",
68
                                default=False, action="store_true")
69

    
70
TO_OPT = cli_option("--to", default=None, type="string",
71
                    help="The Ganeti version to upgrade to")
72

    
73
RESUME_OPT = cli_option("--resume", default=False, action="store_true",
74
                        help="Resume any pending Ganeti upgrades")
75

    
76
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
77
_EPO_PING_TIMEOUT = 1 # 1 second
78
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
79

    
80

    
81
def _InitEnabledDiskTemplates(opts):
82
  """Initialize the list of enabled disk templates.
83

84
  """
85
  if opts.enabled_disk_templates:
86
    return opts.enabled_disk_templates.split(",")
87
  else:
88
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
89

    
90

    
91
def _InitVgName(opts, enabled_disk_templates):
92
  """Initialize the volume group name.
93

94
  @type enabled_disk_templates: list of strings
95
  @param enabled_disk_templates: cluster-wide enabled disk templates
96

97
  """
98
  vg_name = None
99
  if opts.vg_name is not None:
100
    vg_name = opts.vg_name
101
    if vg_name:
102
      if not utils.IsLvmEnabled(enabled_disk_templates):
103
        ToStdout("You specified a volume group with --vg-name, but you did not"
104
                 " enable any disk template that uses lvm.")
105
    elif utils.IsLvmEnabled(enabled_disk_templates):
106
      raise errors.OpPrereqError(
107
          "LVM disk templates are enabled, but vg name not set.")
108
  elif utils.IsLvmEnabled(enabled_disk_templates):
109
    vg_name = constants.DEFAULT_VG
110
  return vg_name
111

    
112

    
113
def _InitDrbdHelper(opts, enabled_disk_templates):
114
  """Initialize the DRBD usermode helper.
115

116
  """
117
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
118

    
119
  if not drbd_enabled and opts.drbd_helper is not None:
120
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
121
             " is not enabled.")
122

    
123
  if drbd_enabled:
124
    if opts.drbd_helper is None:
125
      return constants.DEFAULT_DRBD_HELPER
126
    if opts.drbd_helper == '':
127
      raise errors.OpPrereqError(
128
          "Unsetting the drbd usermode helper while enabling DRBD is not"
129
          " allowed.")
130

    
131
  return opts.drbd_helper
132

    
133

    
134
@UsesRPC
135
def InitCluster(opts, args):
136
  """Initialize the cluster.
137

138
  @param opts: the command line options selected by the user
139
  @type args: list
140
  @param args: should contain only one element, the desired
141
      cluster name
142
  @rtype: int
143
  @return: the desired exit code
144

145
  """
146
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
147

    
148
  try:
149
    vg_name = _InitVgName(opts, enabled_disk_templates)
150
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
151
  except errors.OpPrereqError, e:
152
    ToStderr(str(e))
153
    return 1
154

    
155
  master_netdev = opts.master_netdev
156
  if master_netdev is None:
157
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
158
    if not nic_mode:
159
      # default case, use bridging
160
      master_netdev = constants.DEFAULT_BRIDGE
161
    elif nic_mode == constants.NIC_MODE_OVS:
162
      # default ovs is different from default bridge
163
      master_netdev = constants.DEFAULT_OVS
164
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
165

    
166
  hvlist = opts.enabled_hypervisors
167
  if hvlist is None:
168
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
169
  hvlist = hvlist.split(",")
170

    
171
  hvparams = dict(opts.hvparams)
172
  beparams = opts.beparams
173
  nicparams = opts.nicparams
174

    
175
  diskparams = dict(opts.diskparams)
176

    
177
  # check the disk template types here, as we cannot rely on the type check done
178
  # by the opcode parameter types
179
  diskparams_keys = set(diskparams.keys())
180
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
181
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
182
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
183
    return 1
184

    
185
  # prepare beparams dict
186
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
187
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
188

    
189
  # prepare nicparams dict
190
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
191
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
192

    
193
  # prepare ndparams dict
194
  if opts.ndparams is None:
195
    ndparams = dict(constants.NDC_DEFAULTS)
196
  else:
197
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
198
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
199

    
200
  # prepare hvparams dict
201
  for hv in constants.HYPER_TYPES:
202
    if hv not in hvparams:
203
      hvparams[hv] = {}
204
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
205
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
206

    
207
  # prepare diskparams dict
208
  for templ in constants.DISK_TEMPLATES:
209
    if templ not in diskparams:
210
      diskparams[templ] = {}
211
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
212
                                         diskparams[templ])
213
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
214

    
215
  # prepare ipolicy dict
216
  ipolicy = CreateIPolicyFromOpts(
217
    ispecs_mem_size=opts.ispecs_mem_size,
218
    ispecs_cpu_count=opts.ispecs_cpu_count,
219
    ispecs_disk_count=opts.ispecs_disk_count,
220
    ispecs_disk_size=opts.ispecs_disk_size,
221
    ispecs_nic_count=opts.ispecs_nic_count,
222
    minmax_ispecs=opts.ipolicy_bounds_specs,
223
    std_ispecs=opts.ipolicy_std_specs,
224
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
225
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
226
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
227
    fill_all=True)
228

    
229
  if opts.candidate_pool_size is None:
230
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
231

    
232
  if opts.mac_prefix is None:
233
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
234

    
235
  uid_pool = opts.uid_pool
236
  if uid_pool is not None:
237
    uid_pool = uidpool.ParseUidPool(uid_pool)
238

    
239
  if opts.prealloc_wipe_disks is None:
240
    opts.prealloc_wipe_disks = False
241

    
242
  external_ip_setup_script = opts.use_external_mip_script
243
  if external_ip_setup_script is None:
244
    external_ip_setup_script = False
245

    
246
  try:
247
    primary_ip_version = int(opts.primary_ip_version)
248
  except (ValueError, TypeError), err:
249
    ToStderr("Invalid primary ip version value: %s" % str(err))
250
    return 1
251

    
252
  master_netmask = opts.master_netmask
253
  try:
254
    if master_netmask is not None:
255
      master_netmask = int(master_netmask)
256
  except (ValueError, TypeError), err:
257
    ToStderr("Invalid master netmask value: %s" % str(err))
258
    return 1
259

    
260
  if opts.disk_state:
261
    disk_state = utils.FlatToDict(opts.disk_state)
262
  else:
263
    disk_state = {}
264

    
265
  hv_state = dict(opts.hv_state)
266

    
267
  default_ialloc_params = opts.default_iallocator_params
268
  bootstrap.InitCluster(cluster_name=args[0],
269
                        secondary_ip=opts.secondary_ip,
270
                        vg_name=vg_name,
271
                        mac_prefix=opts.mac_prefix,
272
                        master_netmask=master_netmask,
273
                        master_netdev=master_netdev,
274
                        file_storage_dir=opts.file_storage_dir,
275
                        shared_file_storage_dir=opts.shared_file_storage_dir,
276
                        gluster_storage_dir=opts.gluster_storage_dir,
277
                        enabled_hypervisors=hvlist,
278
                        hvparams=hvparams,
279
                        beparams=beparams,
280
                        nicparams=nicparams,
281
                        ndparams=ndparams,
282
                        diskparams=diskparams,
283
                        ipolicy=ipolicy,
284
                        candidate_pool_size=opts.candidate_pool_size,
285
                        modify_etc_hosts=opts.modify_etc_hosts,
286
                        modify_ssh_setup=opts.modify_ssh_setup,
287
                        maintain_node_health=opts.maintain_node_health,
288
                        drbd_helper=drbd_helper,
289
                        uid_pool=uid_pool,
290
                        default_iallocator=opts.default_iallocator,
291
                        default_iallocator_params=default_ialloc_params,
292
                        primary_ip_version=primary_ip_version,
293
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
294
                        use_external_mip_script=external_ip_setup_script,
295
                        hv_state=hv_state,
296
                        disk_state=disk_state,
297
                        enabled_disk_templates=enabled_disk_templates,
298
                        )
299
  op = opcodes.OpClusterPostInit()
300
  SubmitOpCode(op, opts=opts)
301
  return 0
302

    
303

    
304
@UsesRPC
305
def DestroyCluster(opts, args):
306
  """Destroy the cluster.
307

308
  @param opts: the command line options selected by the user
309
  @type args: list
310
  @param args: should be an empty list
311
  @rtype: int
312
  @return: the desired exit code
313

314
  """
315
  if not opts.yes_do_it:
316
    ToStderr("Destroying a cluster is irreversible. If you really want"
317
             " destroy this cluster, supply the --yes-do-it option.")
318
    return 1
319

    
320
  op = opcodes.OpClusterDestroy()
321
  master_uuid = SubmitOpCode(op, opts=opts)
322
  # if we reached this, the opcode didn't fail; we can proceed to
323
  # shutdown all the daemons
324
  bootstrap.FinalizeClusterDestroy(master_uuid)
325
  return 0
326

    
327

    
328
def RenameCluster(opts, args):
329
  """Rename the cluster.
330

331
  @param opts: the command line options selected by the user
332
  @type args: list
333
  @param args: should contain only one element, the new cluster name
334
  @rtype: int
335
  @return: the desired exit code
336

337
  """
338
  cl = GetClient()
339

    
340
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
341

    
342
  new_name = args[0]
343
  if not opts.force:
344
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
345
                " connected over the network to the cluster name, the"
346
                " operation is very dangerous as the IP address will be"
347
                " removed from the node and the change may not go through."
348
                " Continue?") % (cluster_name, new_name)
349
    if not AskUser(usertext):
350
      return 1
351

    
352
  op = opcodes.OpClusterRename(name=new_name)
353
  result = SubmitOpCode(op, opts=opts, cl=cl)
354

    
355
  if result:
356
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
357

    
358
  return 0
359

    
360

    
361
def ActivateMasterIp(opts, args):
362
  """Activates the master IP.
363

364
  """
365
  op = opcodes.OpClusterActivateMasterIp()
366
  SubmitOpCode(op)
367
  return 0
368

    
369

    
370
def DeactivateMasterIp(opts, args):
371
  """Deactivates the master IP.
372

373
  """
374
  if not opts.confirm:
375
    usertext = ("This will disable the master IP. All the open connections to"
376
                " the master IP will be closed. To reach the master you will"
377
                " need to use its node IP."
378
                " Continue?")
379
    if not AskUser(usertext):
380
      return 1
381

    
382
  op = opcodes.OpClusterDeactivateMasterIp()
383
  SubmitOpCode(op)
384
  return 0
385

    
386

    
387
def RedistributeConfig(opts, args):
388
  """Forces push of the cluster configuration.
389

390
  @param opts: the command line options selected by the user
391
  @type args: list
392
  @param args: empty list
393
  @rtype: int
394
  @return: the desired exit code
395

396
  """
397
  op = opcodes.OpClusterRedistConf()
398
  if opts.yes_do_it:
399
    SubmitOpCodeToDrainedQueue(op)
400
  else:
401
    SubmitOrSend(op, opts)
402
  return 0
403

    
404

    
405
def ShowClusterVersion(opts, args):
406
  """Write version of ganeti software to the standard output.
407

408
  @param opts: the command line options selected by the user
409
  @type args: list
410
  @param args: should be an empty list
411
  @rtype: int
412
  @return: the desired exit code
413

414
  """
415
  cl = GetClient(query=True)
416
  result = cl.QueryClusterInfo()
417
  ToStdout("Software version: %s", result["software_version"])
418
  ToStdout("Internode protocol: %s", result["protocol_version"])
419
  ToStdout("Configuration format: %s", result["config_version"])
420
  ToStdout("OS api version: %s", result["os_api_version"])
421
  ToStdout("Export interface: %s", result["export_version"])
422
  ToStdout("VCS version: %s", result["vcs_version"])
423
  return 0
424

    
425

    
426
def ShowClusterMaster(opts, args):
427
  """Write name of master node to the standard output.
428

429
  @param opts: the command line options selected by the user
430
  @type args: list
431
  @param args: should be an empty list
432
  @rtype: int
433
  @return: the desired exit code
434

435
  """
436
  master = bootstrap.GetMaster()
437
  ToStdout(master)
438
  return 0
439

    
440

    
441
def _FormatGroupedParams(paramsdict, roman=False):
442
  """Format Grouped parameters (be, nic, disk) by group.
443

444
  @type paramsdict: dict of dicts
445
  @param paramsdict: {group: {param: value, ...}, ...}
446
  @rtype: dict of dicts
447
  @return: copy of the input dictionaries with strings as values
448

449
  """
450
  ret = {}
451
  for (item, val) in paramsdict.items():
452
    if isinstance(val, dict):
453
      ret[item] = _FormatGroupedParams(val, roman=roman)
454
    elif roman and isinstance(val, int):
455
      ret[item] = compat.TryToRoman(val)
456
    else:
457
      ret[item] = str(val)
458
  return ret
459

    
460

    
461
def ShowClusterConfig(opts, args):
462
  """Shows cluster information.
463

464
  @param opts: the command line options selected by the user
465
  @type args: list
466
  @param args: should be an empty list
467
  @rtype: int
468
  @return: the desired exit code
469

470
  """
471
  cl = GetClient(query=True)
472
  result = cl.QueryClusterInfo()
473

    
474
  if result["tags"]:
475
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
476
  else:
477
    tags = "(none)"
478
  if result["reserved_lvs"]:
479
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
480
  else:
481
    reserved_lvs = "(none)"
482

    
483
  enabled_hv = result["enabled_hypervisors"]
484
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
485
                  if k in enabled_hv)
486

    
487
  info = [
488
    ("Cluster name", result["name"]),
489
    ("Cluster UUID", result["uuid"]),
490

    
491
    ("Creation time", utils.FormatTime(result["ctime"])),
492
    ("Modification time", utils.FormatTime(result["mtime"])),
493

    
494
    ("Master node", result["master"]),
495

    
496
    ("Architecture (this node)",
497
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
498

    
499
    ("Tags", tags),
500

    
501
    ("Default hypervisor", result["default_hypervisor"]),
502
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
503

    
504
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
505

    
506
    ("OS-specific hypervisor parameters",
507
     _FormatGroupedParams(result["os_hvp"])),
508

    
509
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
510

    
511
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
512
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
513

    
514
    ("Cluster parameters", [
515
      ("candidate pool size",
516
       compat.TryToRoman(result["candidate_pool_size"],
517
                         convert=opts.roman_integers)),
518
      ("master netdev", result["master_netdev"]),
519
      ("master netmask", result["master_netmask"]),
520
      ("use external master IP address setup script",
521
       result["use_external_mip_script"]),
522
      ("lvm volume group", result["volume_group_name"]),
523
      ("lvm reserved volumes", reserved_lvs),
524
      ("drbd usermode helper", result["drbd_usermode_helper"]),
525
      ("file storage path", result["file_storage_dir"]),
526
      ("shared file storage path", result["shared_file_storage_dir"]),
527
      ("gluster storage path", result["gluster_storage_dir"]),
528
      ("maintenance of node health", result["maintain_node_health"]),
529
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
530
      ("default instance allocator", result["default_iallocator"]),
531
      ("default instance allocator parameters",
532
       result["default_iallocator_params"]),
533
      ("primary ip version", result["primary_ip_version"]),
534
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
535
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
536
      ("ExtStorage Providers search path",
537
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
538
      ("enabled disk templates",
539
       utils.CommaJoin(result["enabled_disk_templates"])),
540
      ]),
541

    
542
    ("Default node parameters",
543
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
544

    
545
    ("Default instance parameters",
546
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
547

    
548
    ("Default nic parameters",
549
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
550

    
551
    ("Default disk parameters",
552
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
553

    
554
    ("Instance policy - limits for instances",
555
     FormatPolicyInfo(result["ipolicy"], None, True)),
556
    ]
557

    
558
  PrintGenericInfo(info)
559
  return 0
560

    
561

    
562
def ClusterCopyFile(opts, args):
563
  """Copy a file from master to some nodes.
564

565
  @param opts: the command line options selected by the user
566
  @type args: list
567
  @param args: should contain only one element, the path of
568
      the file to be copied
569
  @rtype: int
570
  @return: the desired exit code
571

572
  """
573
  filename = args[0]
574
  if not os.path.exists(filename):
575
    raise errors.OpPrereqError("No such filename '%s'" % filename,
576
                               errors.ECODE_INVAL)
577

    
578
  cl = GetClient()
579
  qcl = GetClient(query=True)
580
  try:
581
    cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
582

    
583
    results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
584
                             secondary_ips=opts.use_replication_network,
585
                             nodegroup=opts.nodegroup)
586
    ports = GetNodesSshPorts(opts.nodes, qcl)
587
  finally:
588
    cl.Close()
589
    qcl.Close()
590

    
591
  srun = ssh.SshRunner(cluster_name)
592
  for (node, port) in zip(results, ports):
593
    if not srun.CopyFileToNode(node, port, filename):
594
      ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
595

    
596
  return 0
597

    
598

    
599
def RunClusterCommand(opts, args):
600
  """Run a command on some nodes.
601

602
  @param opts: the command line options selected by the user
603
  @type args: list
604
  @param args: should contain the command to be run and its arguments
605
  @rtype: int
606
  @return: the desired exit code
607

608
  """
609
  cl = GetClient()
610
  qcl = GetClient(query=True)
611

    
612
  command = " ".join(args)
613

    
614
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
615
  ports = GetNodesSshPorts(nodes, qcl)
616

    
617
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
618
                                                    "master_node"])
619

    
620
  srun = ssh.SshRunner(cluster_name=cluster_name)
621

    
622
  # Make sure master node is at list end
623
  if master_node in nodes:
624
    nodes.remove(master_node)
625
    nodes.append(master_node)
626

    
627
  for (name, port) in zip(nodes, ports):
628
    result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
629

    
630
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
631
      # Do not output anything for successful commands
632
      continue
633

    
634
    ToStdout("------------------------------------------------")
635
    if opts.show_machine_names:
636
      for line in result.output.splitlines():
637
        ToStdout("%s: %s", name, line)
638
    else:
639
      ToStdout("node: %s", name)
640
      ToStdout("%s", result.output)
641
    ToStdout("return code = %s", result.exit_code)
642

    
643
  return 0
644

    
645

    
646
def VerifyCluster(opts, args):
647
  """Verify integrity of cluster, performing various test on nodes.
648

649
  @param opts: the command line options selected by the user
650
  @type args: list
651
  @param args: should be an empty list
652
  @rtype: int
653
  @return: the desired exit code
654

655
  """
656
  skip_checks = []
657

    
658
  if opts.skip_nplusone_mem:
659
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
660

    
661
  cl = GetClient()
662

    
663
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
664
                               error_codes=opts.error_codes,
665
                               debug_simulate_errors=opts.simulate_errors,
666
                               skip_checks=skip_checks,
667
                               ignore_errors=opts.ignore_errors,
668
                               group_name=opts.nodegroup)
669
  result = SubmitOpCode(op, cl=cl, opts=opts)
670

    
671
  # Keep track of submitted jobs
672
  jex = JobExecutor(cl=cl, opts=opts)
673

    
674
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
675
    jex.AddJobId(None, status, job_id)
676

    
677
  results = jex.GetResults()
678

    
679
  (bad_jobs, bad_results) = \
680
    map(len,
681
        # Convert iterators to lists
682
        map(list,
683
            # Count errors
684
            map(compat.partial(itertools.ifilterfalse, bool),
685
                # Convert result to booleans in a tuple
686
                zip(*((job_success, len(op_results) == 1 and op_results[0])
687
                      for (job_success, op_results) in results)))))
688

    
689
  if bad_jobs == 0 and bad_results == 0:
690
    rcode = constants.EXIT_SUCCESS
691
  else:
692
    rcode = constants.EXIT_FAILURE
693
    if bad_jobs > 0:
694
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
695

    
696
  return rcode
697

    
698

    
699
def VerifyDisks(opts, args):
700
  """Verify integrity of cluster disks.
701

702
  @param opts: the command line options selected by the user
703
  @type args: list
704
  @param args: should be an empty list
705
  @rtype: int
706
  @return: the desired exit code
707

708
  """
709
  cl = GetClient()
710

    
711
  op = opcodes.OpClusterVerifyDisks()
712

    
713
  result = SubmitOpCode(op, cl=cl, opts=opts)
714

    
715
  # Keep track of submitted jobs
716
  jex = JobExecutor(cl=cl, opts=opts)
717

    
718
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
719
    jex.AddJobId(None, status, job_id)
720

    
721
  retcode = constants.EXIT_SUCCESS
722

    
723
  for (status, result) in jex.GetResults():
724
    if not status:
725
      ToStdout("Job failed: %s", result)
726
      continue
727

    
728
    ((bad_nodes, instances, missing), ) = result
729

    
730
    for node, text in bad_nodes.items():
731
      ToStdout("Error gathering data on node %s: %s",
732
               node, utils.SafeEncode(text[-400:]))
733
      retcode = constants.EXIT_FAILURE
734
      ToStdout("You need to fix these nodes first before fixing instances")
735

    
736
    for iname in instances:
737
      if iname in missing:
738
        continue
739
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
740
      try:
741
        ToStdout("Activating disks for instance '%s'", iname)
742
        SubmitOpCode(op, opts=opts, cl=cl)
743
      except errors.GenericError, err:
744
        nret, msg = FormatError(err)
745
        retcode |= nret
746
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
747

    
748
    if missing:
749
      for iname, ival in missing.iteritems():
750
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
751
        if all_missing:
752
          ToStdout("Instance %s cannot be verified as it lives on"
753
                   " broken nodes", iname)
754
        else:
755
          ToStdout("Instance %s has missing logical volumes:", iname)
756
          ival.sort()
757
          for node, vol in ival:
758
            if node in bad_nodes:
759
              ToStdout("\tbroken node %s /dev/%s", node, vol)
760
            else:
761
              ToStdout("\t%s /dev/%s", node, vol)
762

    
763
      ToStdout("You need to replace or recreate disks for all the above"
764
               " instances if this message persists after fixing broken nodes.")
765
      retcode = constants.EXIT_FAILURE
766
    elif not instances:
767
      ToStdout("No disks need to be activated.")
768

    
769
  return retcode
770

    
771

    
772
def RepairDiskSizes(opts, args):
773
  """Verify sizes of cluster disks.
774

775
  @param opts: the command line options selected by the user
776
  @type args: list
777
  @param args: optional list of instances to restrict check to
778
  @rtype: int
779
  @return: the desired exit code
780

781
  """
782
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
783
  SubmitOpCode(op, opts=opts)
784

    
785

    
786
@UsesRPC
787
def MasterFailover(opts, args):
788
  """Failover the master node.
789

790
  This command, when run on a non-master node, will cause the current
791
  master to cease being master, and the non-master to become new
792
  master.
793

794
  @param opts: the command line options selected by the user
795
  @type args: list
796
  @param args: should be an empty list
797
  @rtype: int
798
  @return: the desired exit code
799

800
  """
801
  if opts.no_voting and not opts.yes_do_it:
802
    usertext = ("This will perform the failover even if most other nodes"
803
                " are down, or if this node is outdated. This is dangerous"
804
                " as it can lead to a non-consistent cluster. Check the"
805
                " gnt-cluster(8) man page before proceeding. Continue?")
806
    if not AskUser(usertext):
807
      return 1
808

    
809
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
810

    
811

    
812
def MasterPing(opts, args):
813
  """Checks if the master is alive.
814

815
  @param opts: the command line options selected by the user
816
  @type args: list
817
  @param args: should be an empty list
818
  @rtype: int
819
  @return: the desired exit code
820

821
  """
822
  try:
823
    cl = GetClient()
824
    cl.QueryClusterInfo()
825
    return 0
826
  except Exception: # pylint: disable=W0703
827
    return 1
828

    
829

    
830
def SearchTags(opts, args):
831
  """Searches the tags on all the cluster.
832

833
  @param opts: the command line options selected by the user
834
  @type args: list
835
  @param args: should contain only one element, the tag pattern
836
  @rtype: int
837
  @return: the desired exit code
838

839
  """
840
  op = opcodes.OpTagsSearch(pattern=args[0])
841
  result = SubmitOpCode(op, opts=opts)
842
  if not result:
843
    return 1
844
  result = list(result)
845
  result.sort()
846
  for path, tag in result:
847
    ToStdout("%s %s", path, tag)
848

    
849

    
850
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
851
  """Reads and verifies an X509 certificate.
852

853
  @type cert_filename: string
854
  @param cert_filename: the path of the file containing the certificate to
855
                        verify encoded in PEM format
856
  @type verify_private_key: bool
857
  @param verify_private_key: whether to verify the private key in addition to
858
                             the public certificate
859
  @rtype: string
860
  @return: a string containing the PEM-encoded certificate.
861

862
  """
863
  try:
864
    pem = utils.ReadFile(cert_filename)
865
  except IOError, err:
866
    raise errors.X509CertError(cert_filename,
867
                               "Unable to read certificate: %s" % str(err))
868

    
869
  try:
870
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
871
  except Exception, err:
872
    raise errors.X509CertError(cert_filename,
873
                               "Unable to load certificate: %s" % str(err))
874

    
875
  if verify_private_key:
876
    try:
877
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
878
    except Exception, err:
879
      raise errors.X509CertError(cert_filename,
880
                                 "Unable to load private key: %s" % str(err))
881

    
882
  return pem
883

    
884

    
885
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
886
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
887
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
888
                 cds_filename, force, new_node_cert):
889
  """Renews cluster certificates, keys and secrets.
890

891
  @type new_cluster_cert: bool
892
  @param new_cluster_cert: Whether to generate a new cluster certificate
893
  @type new_rapi_cert: bool
894
  @param new_rapi_cert: Whether to generate a new RAPI certificate
895
  @type rapi_cert_filename: string
896
  @param rapi_cert_filename: Path to file containing new RAPI certificate
897
  @type new_spice_cert: bool
898
  @param new_spice_cert: Whether to generate a new SPICE certificate
899
  @type spice_cert_filename: string
900
  @param spice_cert_filename: Path to file containing new SPICE certificate
901
  @type spice_cacert_filename: string
902
  @param spice_cacert_filename: Path to file containing the certificate of the
903
                                CA that signed the SPICE certificate
904
  @type new_confd_hmac_key: bool
905
  @param new_confd_hmac_key: Whether to generate a new HMAC key
906
  @type new_cds: bool
907
  @param new_cds: Whether to generate a new cluster domain secret
908
  @type cds_filename: string
909
  @param cds_filename: Path to file containing new cluster domain secret
910
  @type force: bool
911
  @param force: Whether to ask user for confirmation
912
  @type new_node_cert: string
913
  @param new_node_cert: Whether to generate new node certificates
914

915
  """
916
  if new_rapi_cert and rapi_cert_filename:
917
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
918
             " options can be specified at the same time.")
919
    return 1
920

    
921
  if new_cds and cds_filename:
922
    ToStderr("Only one of the --new-cluster-domain-secret and"
923
             " --cluster-domain-secret options can be specified at"
924
             " the same time.")
925
    return 1
926

    
927
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
928
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
929
             " and --spice-ca-certificate must not be used.")
930
    return 1
931

    
932
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
933
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
934
             " specified.")
935
    return 1
936

    
937
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
938
  try:
939
    if rapi_cert_filename:
940
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
941
    if spice_cert_filename:
942
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
943
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
944
  except errors.X509CertError, err:
945
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
946
    return 1
947

    
948
  if cds_filename:
949
    try:
950
      cds = utils.ReadFile(cds_filename)
951
    except Exception, err: # pylint: disable=W0703
952
      ToStderr("Can't load new cluster domain secret from %s: %s" %
953
               (cds_filename, str(err)))
954
      return 1
955
  else:
956
    cds = None
957

    
958
  if not force:
959
    usertext = ("This requires all daemons on all nodes to be restarted and"
960
                " may take some time. Continue?")
961
    if not AskUser(usertext):
962
      return 1
963

    
964
  def _RenewCryptoInner(ctx):
965
    ctx.feedback_fn("Updating certificates and keys")
966
    # Note: the node certificate will be generated in the LU
967
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
968
                                    new_rapi_cert,
969
                                    new_spice_cert,
970
                                    new_confd_hmac_key,
971
                                    new_cds,
972
                                    rapi_cert_pem=rapi_cert_pem,
973
                                    spice_cert_pem=spice_cert_pem,
974
                                    spice_cacert_pem=spice_cacert_pem,
975
                                    cds=cds)
976

    
977
    files_to_copy = []
978

    
979
    if new_cluster_cert:
980
      files_to_copy.append(pathutils.NODED_CERT_FILE)
981

    
982
    if new_rapi_cert or rapi_cert_pem:
983
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
984

    
985
    if new_spice_cert or spice_cert_pem:
986
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
987
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
988

    
989
    if new_confd_hmac_key:
990
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
991

    
992
    if new_cds or cds:
993
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
994

    
995
    if files_to_copy:
996
      for node_name in ctx.nonmaster_nodes:
997
        port = ctx.ssh_ports[node_name]
998
        ctx.feedback_fn("Copying %s to %s:%d" %
999
                        (", ".join(files_to_copy), node_name, port))
1000
        for file_name in files_to_copy:
1001
          ctx.ssh.CopyFileToNode(node_name, port, file_name)
1002

    
1003
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1004

    
1005
  ToStdout("All requested certificates and keys have been replaced."
1006
           " Running \"gnt-cluster verify\" now is recommended.")
1007

    
1008
  if new_node_cert:
1009
    cl = GetClient()
1010
    renew_op = opcodes.OpClusterRenewCrypto()
1011
    SubmitOpCode(renew_op, cl=cl)
1012

    
1013
  return 0
1014

    
1015

    
1016
def RenewCrypto(opts, args):
1017
  """Renews cluster certificates, keys and secrets.
1018

1019
  """
1020
  return _RenewCrypto(opts.new_cluster_cert,
1021
                      opts.new_rapi_cert,
1022
                      opts.rapi_cert,
1023
                      opts.new_spice_cert,
1024
                      opts.spice_cert,
1025
                      opts.spice_cacert,
1026
                      opts.new_confd_hmac_key,
1027
                      opts.new_cluster_domain_secret,
1028
                      opts.cluster_domain_secret,
1029
                      opts.force,
1030
                      opts.new_node_cert)
1031

    
1032

    
1033
def _GetEnabledDiskTemplates(opts):
1034
  """Determine the list of enabled disk templates.
1035

1036
  """
1037
  if opts.enabled_disk_templates:
1038
    return opts.enabled_disk_templates.split(",")
1039
  else:
1040
    return None
1041

    
1042

    
1043
def _GetVgName(opts, enabled_disk_templates):
1044
  """Determine the volume group name.
1045

1046
  @type enabled_disk_templates: list of strings
1047
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1048

1049
  """
1050
  # consistency between vg name and enabled disk templates
1051
  vg_name = None
1052
  if opts.vg_name is not None:
1053
    vg_name = opts.vg_name
1054
  if enabled_disk_templates:
1055
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1056
      ToStdout("You specified a volume group with --vg-name, but you did not"
1057
               " enable any of the following lvm-based disk templates: %s" %
1058
               utils.CommaJoin(constants.DTS_LVM))
1059
  return vg_name
1060

    
1061

    
1062
def _GetDrbdHelper(opts, enabled_disk_templates):
1063
  """Determine the DRBD usermode helper.
1064

1065
  """
1066
  drbd_helper = opts.drbd_helper
1067
  if enabled_disk_templates:
1068
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1069
    if not drbd_enabled and opts.drbd_helper:
1070
      ToStdout("You specified a DRBD usermode helper with "
1071
               " --drbd-usermode-helper while DRBD is not enabled.")
1072
  return drbd_helper
1073

    
1074

    
1075
def SetClusterParams(opts, args):
1076
  """Modify the cluster.
1077

1078
  @param opts: the command line options selected by the user
1079
  @type args: list
1080
  @param args: should be an empty list
1081
  @rtype: int
1082
  @return: the desired exit code
1083

1084
  """
1085
  if not (opts.vg_name is not None or
1086
          opts.drbd_helper is not None or
1087
          opts.enabled_hypervisors or opts.hvparams or
1088
          opts.beparams or opts.nicparams or
1089
          opts.ndparams or opts.diskparams or
1090
          opts.candidate_pool_size is not None or
1091
          opts.max_running_jobs is not None or
1092
          opts.uid_pool is not None or
1093
          opts.maintain_node_health is not None or
1094
          opts.add_uids is not None or
1095
          opts.remove_uids is not None or
1096
          opts.default_iallocator is not None or
1097
          opts.default_iallocator_params or
1098
          opts.reserved_lvs is not None or
1099
          opts.master_netdev is not None or
1100
          opts.master_netmask is not None or
1101
          opts.use_external_mip_script is not None or
1102
          opts.prealloc_wipe_disks is not None or
1103
          opts.hv_state or
1104
          opts.enabled_disk_templates or
1105
          opts.disk_state or
1106
          opts.ipolicy_bounds_specs is not None or
1107
          opts.ipolicy_std_specs is not None or
1108
          opts.ipolicy_disk_templates is not None or
1109
          opts.ipolicy_vcpu_ratio is not None or
1110
          opts.ipolicy_spindle_ratio is not None or
1111
          opts.modify_etc_hosts is not None or
1112
          opts.file_storage_dir is not None):
1113
    ToStderr("Please give at least one of the parameters.")
1114
    return 1
1115

    
1116
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1117
  vg_name = _GetVgName(opts, enabled_disk_templates)
1118

    
1119
  try:
1120
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1121
  except errors.OpPrereqError, e:
1122
    ToStderr(str(e))
1123
    return 1
1124

    
1125
  hvlist = opts.enabled_hypervisors
1126
  if hvlist is not None:
1127
    hvlist = hvlist.split(",")
1128

    
1129
  # a list of (name, dict) we can pass directly to dict() (or [])
1130
  hvparams = dict(opts.hvparams)
1131
  for hv_params in hvparams.values():
1132
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1133

    
1134
  diskparams = dict(opts.diskparams)
1135

    
1136
  for dt_params in diskparams.values():
1137
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1138

    
1139
  beparams = opts.beparams
1140
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1141

    
1142
  nicparams = opts.nicparams
1143
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1144

    
1145
  ndparams = opts.ndparams
1146
  if ndparams is not None:
1147
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1148

    
1149
  ipolicy = CreateIPolicyFromOpts(
1150
    minmax_ispecs=opts.ipolicy_bounds_specs,
1151
    std_ispecs=opts.ipolicy_std_specs,
1152
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1153
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1154
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1155
    )
1156

    
1157
  mnh = opts.maintain_node_health
1158

    
1159
  uid_pool = opts.uid_pool
1160
  if uid_pool is not None:
1161
    uid_pool = uidpool.ParseUidPool(uid_pool)
1162

    
1163
  add_uids = opts.add_uids
1164
  if add_uids is not None:
1165
    add_uids = uidpool.ParseUidPool(add_uids)
1166

    
1167
  remove_uids = opts.remove_uids
1168
  if remove_uids is not None:
1169
    remove_uids = uidpool.ParseUidPool(remove_uids)
1170

    
1171
  if opts.reserved_lvs is not None:
1172
    if opts.reserved_lvs == "":
1173
      opts.reserved_lvs = []
1174
    else:
1175
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1176

    
1177
  if opts.master_netmask is not None:
1178
    try:
1179
      opts.master_netmask = int(opts.master_netmask)
1180
    except ValueError:
1181
      ToStderr("The --master-netmask option expects an int parameter.")
1182
      return 1
1183

    
1184
  ext_ip_script = opts.use_external_mip_script
1185

    
1186
  if opts.disk_state:
1187
    disk_state = utils.FlatToDict(opts.disk_state)
1188
  else:
1189
    disk_state = {}
1190

    
1191
  hv_state = dict(opts.hv_state)
1192

    
1193
  op = opcodes.OpClusterSetParams(
1194
    vg_name=vg_name,
1195
    drbd_helper=drbd_helper,
1196
    enabled_hypervisors=hvlist,
1197
    hvparams=hvparams,
1198
    os_hvp=None,
1199
    beparams=beparams,
1200
    nicparams=nicparams,
1201
    ndparams=ndparams,
1202
    diskparams=diskparams,
1203
    ipolicy=ipolicy,
1204
    candidate_pool_size=opts.candidate_pool_size,
1205
    max_running_jobs=opts.max_running_jobs,
1206
    maintain_node_health=mnh,
1207
    modify_etc_hosts=opts.modify_etc_hosts,
1208
    uid_pool=uid_pool,
1209
    add_uids=add_uids,
1210
    remove_uids=remove_uids,
1211
    default_iallocator=opts.default_iallocator,
1212
    default_iallocator_params=opts.default_iallocator_params,
1213
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1214
    master_netdev=opts.master_netdev,
1215
    master_netmask=opts.master_netmask,
1216
    reserved_lvs=opts.reserved_lvs,
1217
    use_external_mip_script=ext_ip_script,
1218
    hv_state=hv_state,
1219
    disk_state=disk_state,
1220
    enabled_disk_templates=enabled_disk_templates,
1221
    force=opts.force,
1222
    file_storage_dir=opts.file_storage_dir,
1223
    )
1224
  SubmitOrSend(op, opts)
1225
  return 0
1226

    
1227

    
1228
def QueueOps(opts, args):
1229
  """Queue operations.
1230

1231
  @param opts: the command line options selected by the user
1232
  @type args: list
1233
  @param args: should contain only one element, the subcommand
1234
  @rtype: int
1235
  @return: the desired exit code
1236

1237
  """
1238
  command = args[0]
1239
  client = GetClient()
1240
  if command in ("drain", "undrain"):
1241
    drain_flag = command == "drain"
1242
    client.SetQueueDrainFlag(drain_flag)
1243
  elif command == "info":
1244
    result = client.QueryConfigValues(["drain_flag"])
1245
    if result[0]:
1246
      val = "set"
1247
    else:
1248
      val = "unset"
1249
    ToStdout("The drain flag is %s" % val)
1250
  else:
1251
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1252
                               errors.ECODE_INVAL)
1253

    
1254
  return 0
1255

    
1256

    
1257
def _ShowWatcherPause(until):
1258
  if until is None or until < time.time():
1259
    ToStdout("The watcher is not paused.")
1260
  else:
1261
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1262

    
1263

    
1264
def WatcherOps(opts, args):
1265
  """Watcher operations.
1266

1267
  @param opts: the command line options selected by the user
1268
  @type args: list
1269
  @param args: should contain only one element, the subcommand
1270
  @rtype: int
1271
  @return: the desired exit code
1272

1273
  """
1274
  command = args[0]
1275
  client = GetClient()
1276

    
1277
  if command == "continue":
1278
    client.SetWatcherPause(None)
1279
    ToStdout("The watcher is no longer paused.")
1280

    
1281
  elif command == "pause":
1282
    if len(args) < 2:
1283
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1284

    
1285
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1286
    _ShowWatcherPause(result)
1287

    
1288
  elif command == "info":
1289
    result = client.QueryConfigValues(["watcher_pause"])
1290
    _ShowWatcherPause(result[0])
1291

    
1292
  else:
1293
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1294
                               errors.ECODE_INVAL)
1295

    
1296
  return 0
1297

    
1298

    
1299
def _OobPower(opts, node_list, power):
1300
  """Puts the node in the list to desired power state.
1301

1302
  @param opts: The command line options selected by the user
1303
  @param node_list: The list of nodes to operate on
1304
  @param power: True if they should be powered on, False otherwise
1305
  @return: The success of the operation (none failed)
1306

1307
  """
1308
  if power:
1309
    command = constants.OOB_POWER_ON
1310
  else:
1311
    command = constants.OOB_POWER_OFF
1312

    
1313
  op = opcodes.OpOobCommand(node_names=node_list,
1314
                            command=command,
1315
                            ignore_status=True,
1316
                            timeout=opts.oob_timeout,
1317
                            power_delay=opts.power_delay)
1318
  result = SubmitOpCode(op, opts=opts)
1319
  errs = 0
1320
  for node_result in result:
1321
    (node_tuple, data_tuple) = node_result
1322
    (_, node_name) = node_tuple
1323
    (data_status, _) = data_tuple
1324
    if data_status != constants.RS_NORMAL:
1325
      assert data_status != constants.RS_UNAVAIL
1326
      errs += 1
1327
      ToStderr("There was a problem changing power for %s, please investigate",
1328
               node_name)
1329

    
1330
  if errs > 0:
1331
    return False
1332

    
1333
  return True
1334

    
1335

    
1336
def _InstanceStart(opts, inst_list, start, no_remember=False):
1337
  """Puts the instances in the list to desired state.
1338

1339
  @param opts: The command line options selected by the user
1340
  @param inst_list: The list of instances to operate on
1341
  @param start: True if they should be started, False for shutdown
1342
  @param no_remember: If the instance state should be remembered
1343
  @return: The success of the operation (none failed)
1344

1345
  """
1346
  if start:
1347
    opcls = opcodes.OpInstanceStartup
1348
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1349
  else:
1350
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1351
                           timeout=opts.shutdown_timeout,
1352
                           no_remember=no_remember)
1353
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1354

    
1355
  jex = JobExecutor(opts=opts)
1356

    
1357
  for inst in inst_list:
1358
    ToStdout("Submit %s of instance %s", text_submit, inst)
1359
    op = opcls(instance_name=inst)
1360
    jex.QueueJob(inst, op)
1361

    
1362
  results = jex.GetResults()
1363
  bad_cnt = len([1 for (success, _) in results if not success])
1364

    
1365
  if bad_cnt == 0:
1366
    ToStdout("All instances have been %s successfully", text_success)
1367
  else:
1368
    ToStderr("There were errors while %s instances:\n"
1369
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1370
             len(results))
1371
    return False
1372

    
1373
  return True
1374

    
1375

    
1376
class _RunWhenNodesReachableHelper:
1377
  """Helper class to make shared internal state sharing easier.
1378

1379
  @ivar success: Indicates if all action_cb calls were successful
1380

1381
  """
1382
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1383
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1384
    """Init the object.
1385

1386
    @param node_list: The list of nodes to be reachable
1387
    @param action_cb: Callback called when a new host is reachable
1388
    @type node2ip: dict
1389
    @param node2ip: Node to ip mapping
1390
    @param port: The port to use for the TCP ping
1391
    @param feedback_fn: The function used for feedback
1392
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1393
    @param _sleep_fn: Function to sleep (for unittest use only)
1394

1395
    """
1396
    self.down = set(node_list)
1397
    self.up = set()
1398
    self.node2ip = node2ip
1399
    self.success = True
1400
    self.action_cb = action_cb
1401
    self.port = port
1402
    self.feedback_fn = feedback_fn
1403
    self._ping_fn = _ping_fn
1404
    self._sleep_fn = _sleep_fn
1405

    
1406
  def __call__(self):
1407
    """When called we run action_cb.
1408

1409
    @raises utils.RetryAgain: When there are still down nodes
1410

1411
    """
1412
    if not self.action_cb(self.up):
1413
      self.success = False
1414

    
1415
    if self.down:
1416
      raise utils.RetryAgain()
1417
    else:
1418
      return self.success
1419

    
1420
  def Wait(self, secs):
1421
    """Checks if a host is up or waits remaining seconds.
1422

1423
    @param secs: The secs remaining
1424

1425
    """
1426
    start = time.time()
1427
    for node in self.down:
1428
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1429
                       live_port_needed=True):
1430
        self.feedback_fn("Node %s became available" % node)
1431
        self.up.add(node)
1432
        self.down -= self.up
1433
        # If we have a node available there is the possibility to run the
1434
        # action callback successfully, therefore we don't wait and return
1435
        return
1436

    
1437
    self._sleep_fn(max(0.0, start + secs - time.time()))
1438

    
1439

    
1440
def _RunWhenNodesReachable(node_list, action_cb, interval):
1441
  """Run action_cb when nodes become reachable.
1442

1443
  @param node_list: The list of nodes to be reachable
1444
  @param action_cb: Callback called when a new host is reachable
1445
  @param interval: The earliest time to retry
1446

1447
  """
1448
  client = GetClient()
1449
  cluster_info = client.QueryClusterInfo()
1450
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1451
    family = netutils.IPAddress.family
1452
  else:
1453
    family = netutils.IP6Address.family
1454

    
1455
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1456
                 for node in node_list)
1457

    
1458
  port = netutils.GetDaemonPort(constants.NODED)
1459
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1460
                                        ToStdout)
1461

    
1462
  try:
1463
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1464
                       wait_fn=helper.Wait)
1465
  except utils.RetryTimeout:
1466
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1467
             " again:\n  - %s", "  - ".join(helper.down))
1468
    return False
1469

    
1470

    
1471
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1472
                          _instance_start_fn=_InstanceStart):
1473
  """Start the instances conditional based on node_states.
1474

1475
  @param opts: The command line options selected by the user
1476
  @param inst_map: A dict of inst -> nodes mapping
1477
  @param nodes_online: A list of nodes online
1478
  @param _instance_start_fn: Callback to start instances (unittest use only)
1479
  @return: Success of the operation on all instances
1480

1481
  """
1482
  start_inst_list = []
1483
  for (inst, nodes) in inst_map.items():
1484
    if not (nodes - nodes_online):
1485
      # All nodes the instance lives on are back online
1486
      start_inst_list.append(inst)
1487

    
1488
  for inst in start_inst_list:
1489
    del inst_map[inst]
1490

    
1491
  if start_inst_list:
1492
    return _instance_start_fn(opts, start_inst_list, True)
1493

    
1494
  return True
1495

    
1496

    
1497
def _EpoOn(opts, full_node_list, node_list, inst_map):
1498
  """Does the actual power on.
1499

1500
  @param opts: The command line options selected by the user
1501
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1502
                         OOB)
1503
  @param node_list: The list of nodes to operate on (all need to support OOB)
1504
  @param inst_map: A dict of inst -> nodes mapping
1505
  @return: The desired exit status
1506

1507
  """
1508
  if node_list and not _OobPower(opts, node_list, False):
1509
    ToStderr("Not all nodes seem to get back up, investigate and start"
1510
             " manually if needed")
1511

    
1512
  # Wait for the nodes to be back up
1513
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1514

    
1515
  ToStdout("Waiting until all nodes are available again")
1516
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1517
    ToStderr("Please investigate and start stopped instances manually")
1518
    return constants.EXIT_FAILURE
1519

    
1520
  return constants.EXIT_SUCCESS
1521

    
1522

    
1523
def _EpoOff(opts, node_list, inst_map):
1524
  """Does the actual power off.
1525

1526
  @param opts: The command line options selected by the user
1527
  @param node_list: The list of nodes to operate on (all need to support OOB)
1528
  @param inst_map: A dict of inst -> nodes mapping
1529
  @return: The desired exit status
1530

1531
  """
1532
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1533
    ToStderr("Please investigate and stop instances manually before continuing")
1534
    return constants.EXIT_FAILURE
1535

    
1536
  if not node_list:
1537
    return constants.EXIT_SUCCESS
1538

    
1539
  if _OobPower(opts, node_list, False):
1540
    return constants.EXIT_SUCCESS
1541
  else:
1542
    return constants.EXIT_FAILURE
1543

    
1544

    
1545
def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1546
        _confirm_fn=ConfirmOperation,
1547
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1548
  """EPO operations.
1549

1550
  @param opts: the command line options selected by the user
1551
  @type args: list
1552
  @param args: should contain only one element, the subcommand
1553
  @rtype: int
1554
  @return: the desired exit code
1555

1556
  """
1557
  if opts.groups and opts.show_all:
1558
    _stderr_fn("Only one of --groups or --all are allowed")
1559
    return constants.EXIT_FAILURE
1560
  elif args and opts.show_all:
1561
    _stderr_fn("Arguments in combination with --all are not allowed")
1562
    return constants.EXIT_FAILURE
1563

    
1564
  if qcl is None:
1565
    # Query client
1566
    qcl = GetClient(query=True)
1567

    
1568
  if opts.groups:
1569
    node_query_list = \
1570
      itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1571
  else:
1572
    node_query_list = args
1573

    
1574
  result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1575
                                            "sinst_list", "powered", "offline"],
1576
                          False)
1577

    
1578
  all_nodes = map(compat.fst, result)
1579
  node_list = []
1580
  inst_map = {}
1581
  for (node, master, pinsts, sinsts, powered, offline) in result:
1582
    if not offline:
1583
      for inst in (pinsts + sinsts):
1584
        if inst in inst_map:
1585
          if not master:
1586
            inst_map[inst].add(node)
1587
        elif master:
1588
          inst_map[inst] = set()
1589
        else:
1590
          inst_map[inst] = set([node])
1591

    
1592
    if master and opts.on:
1593
      # We ignore the master for turning on the machines, in fact we are
1594
      # already operating on the master at this point :)
1595
      continue
1596
    elif master and not opts.show_all:
1597
      _stderr_fn("%s is the master node, please do a master-failover to another"
1598
                 " node not affected by the EPO or use --all if you intend to"
1599
                 " shutdown the whole cluster", node)
1600
      return constants.EXIT_FAILURE
1601
    elif powered is None:
1602
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1603
                 " handled in a fully automated manner", node)
1604
    elif powered == opts.on:
1605
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1606
    elif not offline or (offline and powered):
1607
      node_list.append(node)
1608

    
1609
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1610
    return constants.EXIT_FAILURE
1611

    
1612
  if opts.on:
1613
    return _on_fn(opts, all_nodes, node_list, inst_map)
1614
  else:
1615
    return _off_fn(opts, node_list, inst_map)
1616

    
1617

    
1618
def _GetCreateCommand(info):
1619
  buf = StringIO()
1620
  buf.write("gnt-cluster init")
1621
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1622
  buf.write(" ")
1623
  buf.write(info["name"])
1624
  return buf.getvalue()
1625

    
1626

    
1627
def ShowCreateCommand(opts, args):
1628
  """Shows the command that can be used to re-create the cluster.
1629

1630
  Currently it works only for ipolicy specs.
1631

1632
  """
1633
  cl = GetClient(query=True)
1634
  result = cl.QueryClusterInfo()
1635
  ToStdout(_GetCreateCommand(result))
1636

    
1637

    
1638
def _RunCommandAndReport(cmd):
1639
  """Run a command and report its output, iff it failed.
1640

1641
  @param cmd: the command to execute
1642
  @type cmd: list
1643
  @rtype: bool
1644
  @return: False, if the execution failed.
1645

1646
  """
1647
  result = utils.RunCmd(cmd)
1648
  if result.failed:
1649
    ToStderr("Command %s failed: %s; Output %s" %
1650
             (cmd, result.fail_reason, result.output))
1651
    return False
1652
  return True
1653

    
1654

    
1655
def _VerifyCommand(cmd):
1656
  """Verify that a given command succeeds on all online nodes.
1657

1658
  As this function is intended to run during upgrades, it
1659
  is implemented in such a way that it still works, if all Ganeti
1660
  daemons are down.
1661

1662
  @param cmd: the command to execute
1663
  @type cmd: list
1664
  @rtype: list
1665
  @return: the list of node names that are online where
1666
      the command failed.
1667

1668
  """
1669
  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1670

    
1671
  nodes = ssconf.SimpleStore().GetOnlineNodeList()
1672
  master_node = ssconf.SimpleStore().GetMasterNode()
1673
  cluster_name = ssconf.SimpleStore().GetClusterName()
1674

    
1675
  # If master node is in 'nodes', make sure master node is at list end
1676
  if master_node in nodes:
1677
    nodes.remove(master_node)
1678
    nodes.append(master_node)
1679

    
1680
  failed = []
1681

    
1682
  srun = ssh.SshRunner(cluster_name=cluster_name)
1683
  for name in nodes:
1684
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1685
    if result.exit_code != 0:
1686
      failed.append(name)
1687

    
1688
  return failed
1689

    
1690

    
1691
def _VerifyVersionInstalled(versionstring):
1692
  """Verify that the given version of ganeti is installed on all online nodes.
1693

1694
  Do nothing, if this is the case, otherwise print an appropriate
1695
  message to stderr.
1696

1697
  @param versionstring: the version to check for
1698
  @type versionstring: string
1699
  @rtype: bool
1700
  @return: True, if the version is installed on all online nodes
1701

1702
  """
1703
  badnodes = _VerifyCommand(["test", "-d",
1704
                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
1705
  if badnodes:
1706
    ToStderr("Ganeti version %s not installed on nodes %s"
1707
             % (versionstring, ", ".join(badnodes)))
1708
    return False
1709

    
1710
  return True
1711

    
1712

    
1713
def _GetRunning():
1714
  """Determine the list of running jobs.
1715

1716
  @rtype: list
1717
  @return: the number of jobs still running
1718

1719
  """
1720
  cl = GetClient()
1721
  qfilter = qlang.MakeSimpleFilter("status",
1722
                                   frozenset([constants.JOB_STATUS_RUNNING]))
1723
  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1724

    
1725

    
1726
def _SetGanetiVersion(versionstring):
1727
  """Set the active version of ganeti to the given versionstring
1728

1729
  @type versionstring: string
1730
  @rtype: list
1731
  @return: the list of nodes where the version change failed
1732

1733
  """
1734
  failed = []
1735
  if constants.HAS_GNU_LN:
1736
    failed.extend(_VerifyCommand(
1737
        ["ln", "-s", "-f", "-T",
1738
         os.path.join(pathutils.PKGLIBDIR, versionstring),
1739
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1740
    failed.extend(_VerifyCommand(
1741
        ["ln", "-s", "-f", "-T",
1742
         os.path.join(pathutils.SHAREDIR, versionstring),
1743
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1744
  else:
1745
    failed.extend(_VerifyCommand(
1746
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1747
    failed.extend(_VerifyCommand(
1748
        ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1749
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1750
    failed.extend(_VerifyCommand(
1751
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1752
    failed.extend(_VerifyCommand(
1753
        ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1754
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1755
  return list(set(failed))
1756

    
1757

    
1758
def _ExecuteCommands(fns):
1759
  """Execute a list of functions, in reverse order.
1760

1761
  @type fns: list of functions.
1762
  @param fns: the functions to be executed.
1763

1764
  """
1765
  for fn in reversed(fns):
1766
    fn()
1767

    
1768

    
1769
def _GetConfigVersion():
1770
  """Determine the version the configuration file currently has.
1771

1772
  @rtype: tuple or None
1773
  @return: (major, minor, revision) if the version can be determined,
1774
      None otherwise
1775

1776
  """
1777
  config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1778
  try:
1779
    config_version = config_data["version"]
1780
  except KeyError:
1781
    return None
1782
  return utils.SplitVersion(config_version)
1783

    
1784

    
1785
def _ReadIntentToUpgrade():
1786
  """Read the file documenting the intent to upgrade the cluster.
1787

1788
  @rtype: (string, string) or (None, None)
1789
  @return: (old version, version to upgrade to), if the file exists,
1790
      and (None, None) otherwise.
1791

1792
  """
1793
  if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1794
    return (None, None)
1795

    
1796
  contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1797
  contents = utils.UnescapeAndSplit(contentstring)
1798
  if len(contents) != 3:
1799
    # file syntactically mal-formed
1800
    return (None, None)
1801
  return (contents[0], contents[1])
1802

    
1803

    
1804
def _WriteIntentToUpgrade(version):
1805
  """Write file documenting the intent to upgrade the cluster.
1806

1807
  @type version: string
1808
  @param version: the version we intent to upgrade to
1809

1810
  """
1811
  utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1812
                  data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1813
                                            "%d" % os.getpid()]))
1814

    
1815

    
1816
def _UpgradeBeforeConfigurationChange(versionstring):
1817
  """
1818
  Carry out all the tasks necessary for an upgrade that happen before
1819
  the configuration file, or Ganeti version, changes.
1820

1821
  @type versionstring: string
1822
  @param versionstring: the version to upgrade to
1823
  @rtype: (bool, list)
1824
  @return: tuple of a bool indicating success and a list of rollback tasks
1825

1826
  """
1827
  rollback = []
1828

    
1829
  if not _VerifyVersionInstalled(versionstring):
1830
    return (False, rollback)
1831

    
1832
  _WriteIntentToUpgrade(versionstring)
1833
  rollback.append(
1834
    lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1835

    
1836
  ToStdout("Draining queue")
1837
  client = GetClient()
1838
  client.SetQueueDrainFlag(True)
1839

    
1840
  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1841

    
1842
  if utils.SimpleRetry(0, _GetRunning,
1843
                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
1844
                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1845
    ToStderr("Failed to completely empty the queue.")
1846
    return (False, rollback)
1847

    
1848
  ToStdout("Stopping daemons on master node.")
1849
  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1850
    return (False, rollback)
1851

    
1852
  if not _VerifyVersionInstalled(versionstring):
1853
    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1854
    return (False, rollback)
1855

    
1856
  ToStdout("Stopping daemons everywhere.")
1857
  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1858
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1859
  if badnodes:
1860
    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
1861
    return (False, rollback)
1862

    
1863
  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
1864
                           "lib/ganeti%d.tar" % time.time())
1865
  ToStdout("Backing up configuration as %s" % backuptar)
1866
  if not _RunCommandAndReport(["tar", "cf", backuptar,
1867
                               pathutils.DATA_DIR]):
1868
    return (False, rollback)
1869

    
1870
  return (True, rollback)
1871

    
1872

    
1873
def _VersionSpecificDowngrade():
1874
  """
1875
  Perform any additional downrade tasks that are version specific
1876
  and need to be done just after the configuration downgrade. This
1877
  function needs to be idempotent, so that it can be redone if the
1878
  downgrade procedure gets interrupted after changing the
1879
  configuration.
1880

1881
  Note that this function has to be reset with every version bump.
1882

1883
  @return: True upon success
1884
  """
1885
  ToStdout("Performing version-specific downgrade tasks.")
1886

    
1887
  ToStdout("...removing client certificates ssconf file")
1888
  ssconffile = ssconf.SimpleStore().KeyToFilename(
1889
    constants.SS_MASTER_CANDIDATES_CERTS)
1890
  badnodes = _VerifyCommand(["rm", "-f", ssconffile])
1891
  if badnodes:
1892
    ToStderr("Warning: failed to clean up ssconf on %s."
1893
             % (", ".join(badnodes),))
1894
    return False
1895

    
1896
  ToStdout("...removing client certificates")
1897
  badnodes = _VerifyCommand(["rm", "-f", pathutils.NODED_CLIENT_CERT_FILE])
1898
  if badnodes:
1899
    ToStderr("Warning: failed to clean up certificates on %s."
1900
             % (", ".join(badnodes),))
1901
    return False
1902

    
1903
  return True
1904

    
1905

    
1906
def _SwitchVersionAndConfig(versionstring, downgrade):
1907
  """
1908
  Switch to the new Ganeti version and change the configuration,
1909
  in correct order.
1910

1911
  @type versionstring: string
1912
  @param versionstring: the version to change to
1913
  @type downgrade: bool
1914
  @param downgrade: True, if the configuration should be downgraded
1915
  @rtype: (bool, list)
1916
  @return: tupe of a bool indicating success, and a list of
1917
      additional rollback tasks
1918

1919
  """
1920
  rollback = []
1921
  if downgrade:
1922
    ToStdout("Downgrading configuration")
1923
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
1924
      return (False, rollback)
1925
    # Note: version specific downgrades need to be done before switching
1926
    # binaries, so that we still have the knowledgeable binary if the downgrade
1927
    # process gets interrupted at this point.
1928
    if not _VersionSpecificDowngrade():
1929
      return (False, rollback)
1930

    
1931
  # Configuration change is the point of no return. From then onwards, it is
1932
  # safer to push through the up/dowgrade than to try to roll it back.
1933

    
1934
  ToStdout("Switching to version %s on all nodes" % versionstring)
1935
  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
1936
  badnodes = _SetGanetiVersion(versionstring)
1937
  if badnodes:
1938
    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
1939
             % (versionstring, ", ".join(badnodes)))
1940
    if not downgrade:
1941
      return (False, rollback)
1942

    
1943
  # Now that we have changed to the new version of Ganeti we should
1944
  # not communicate over luxi any more, as luxi might have changed in
1945
  # incompatible ways. Therefore, manually call the corresponding ganeti
1946
  # commands using their canonical (version independent) path.
1947

    
1948
  if not downgrade:
1949
    ToStdout("Upgrading configuration")
1950
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
1951
      return (False, rollback)
1952

    
1953
  return (True, rollback)
1954

    
1955

    
1956
def _UpgradeAfterConfigurationChange(oldversion):
1957
  """
1958
  Carry out the upgrade actions necessary after switching to the new
1959
  Ganeti version and updating the configuration.
1960

1961
  As this part is run at a time where the new version of Ganeti is already
1962
  running, no communication should happen via luxi, as this is not a stable
1963
  interface. Also, as the configuration change is the point of no return,
1964
  all actions are pushed trough, even if some of them fail.
1965

1966
  @param oldversion: the version the upgrade started from
1967
  @type oldversion: string
1968
  @rtype: int
1969
  @return: the intended return value
1970

1971
  """
1972
  returnvalue = 0
1973

    
1974
  ToStdout("Starting daemons everywhere.")
1975
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1976
  if badnodes:
1977
    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
1978
    returnvalue = 1
1979

    
1980
  ToStdout("Ensuring directories everywhere.")
1981
  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
1982
  if badnodes:
1983
    ToStderr("Warning: failed to ensure directories on %s." %
1984
             (", ".join(badnodes)))
1985
    returnvalue = 1
1986

    
1987
  ToStdout("Redistributing the configuration.")
1988
  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
1989
    returnvalue = 1
1990

    
1991
  ToStdout("Restarting daemons everywhere.")
1992
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1993
  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1994
  if badnodes:
1995
    ToStderr("Warning: failed to start daemons on %s." %
1996
             (", ".join(list(set(badnodes))),))
1997
    returnvalue = 1
1998

    
1999
  ToStdout("Undraining the queue.")
2000
  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
2001
    returnvalue = 1
2002

    
2003
  _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
2004

    
2005
  ToStdout("Running post-upgrade hooks")
2006
  if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
2007
    returnvalue = 1
2008

    
2009
  ToStdout("Verifying cluster.")
2010
  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2011
    returnvalue = 1
2012

    
2013
  return returnvalue
2014

    
2015

    
2016
def UpgradeGanetiCommand(opts, args):
2017
  """Upgrade a cluster to a new ganeti version.
2018

2019
  @param opts: the command line options selected by the user
2020
  @type args: list
2021
  @param args: should be an empty list
2022
  @rtype: int
2023
  @return: the desired exit code
2024

2025
  """
2026
  if ((not opts.resume and opts.to is None)
2027
      or (opts.resume and opts.to is not None)):
2028
    ToStderr("Precisely one of the options --to and --resume"
2029
             " has to be given")
2030
    return 1
2031

    
2032
  oldversion = constants.RELEASE_VERSION
2033

    
2034
  if opts.resume:
2035
    ssconf.CheckMaster(False)
2036
    oldversion, versionstring = _ReadIntentToUpgrade()
2037
    if versionstring is None:
2038
      return 0
2039
    version = utils.version.ParseVersion(versionstring)
2040
    if version is None:
2041
      return 1
2042
    configversion = _GetConfigVersion()
2043
    if configversion is None:
2044
      return 1
2045
    # If the upgrade we resume was an upgrade between compatible
2046
    # versions (like 2.10.0 to 2.10.1), the correct configversion
2047
    # does not guarantee that the config has been updated.
2048
    # However, in the case of a compatible update with the configuration
2049
    # not touched, we are running a different dirversion with the same
2050
    # config version.
2051
    config_already_modified = \
2052
      (utils.IsCorrectConfigVersion(version, configversion) and
2053
       not (versionstring != constants.DIR_VERSION and
2054
            configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2055
                              constants.CONFIG_REVISION)))
2056
    if not config_already_modified:
2057
      # We have to start from the beginning; however, some daemons might have
2058
      # already been stopped, so the only way to get into a well-defined state
2059
      # is by starting all daemons again.
2060
      _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2061
  else:
2062
    versionstring = opts.to
2063
    config_already_modified = False
2064
    version = utils.version.ParseVersion(versionstring)
2065
    if version is None:
2066
      ToStderr("Could not parse version string %s" % versionstring)
2067
      return 1
2068

    
2069
  msg = utils.version.UpgradeRange(version)
2070
  if msg is not None:
2071
    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2072
    return 1
2073

    
2074
  if not config_already_modified:
2075
    success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2076
    if not success:
2077
      _ExecuteCommands(rollback)
2078
      return 1
2079
  else:
2080
    rollback = []
2081

    
2082
  downgrade = utils.version.ShouldCfgdowngrade(version)
2083

    
2084
  success, additionalrollback =  \
2085
      _SwitchVersionAndConfig(versionstring, downgrade)
2086
  if not success:
2087
    rollback.extend(additionalrollback)
2088
    _ExecuteCommands(rollback)
2089
    return 1
2090

    
2091
  return _UpgradeAfterConfigurationChange(oldversion)
2092

    
2093

    
2094
commands = {
2095
  "init": (
2096
    InitCluster, [ArgHost(min=1, max=1)],
2097
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2098
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2099
     NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2100
     SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2101
     DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2102
     PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2103
     GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2104
     HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2105
     IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT]
2106
     + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2107
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2108
  "destroy": (
2109
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2110
    "", "Destroy cluster"),
2111
  "rename": (
2112
    RenameCluster, [ArgHost(min=1, max=1)],
2113
    [FORCE_OPT, DRY_RUN_OPT],
2114
    "<new_name>",
2115
    "Renames the cluster"),
2116
  "redist-conf": (
2117
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2118
    [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2119
    "", "Forces a push of the configuration file and ssconf files"
2120
    " to the nodes in the cluster"),
2121
  "verify": (
2122
    VerifyCluster, ARGS_NONE,
2123
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2124
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2125
    "", "Does a check on the cluster configuration"),
2126
  "verify-disks": (
2127
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2128
    "", "Does a check on the cluster disk status"),
2129
  "repair-disk-sizes": (
2130
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2131
    "[instance...]", "Updates mismatches in recorded disk sizes"),
2132
  "master-failover": (
2133
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2134
    "", "Makes the current node the master"),
2135
  "master-ping": (
2136
    MasterPing, ARGS_NONE, [],
2137
    "", "Checks if the master is alive"),
2138
  "version": (
2139
    ShowClusterVersion, ARGS_NONE, [],
2140
    "", "Shows the cluster version"),
2141
  "getmaster": (
2142
    ShowClusterMaster, ARGS_NONE, [],
2143
    "", "Shows the cluster master"),
2144
  "copyfile": (
2145
    ClusterCopyFile, [ArgFile(min=1, max=1)],
2146
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2147
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2148
  "command": (
2149
    RunClusterCommand, [ArgCommand(min=1)],
2150
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2151
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2152
  "info": (
2153
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2154
    "[--roman]", "Show cluster configuration"),
2155
  "list-tags": (
2156
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2157
  "add-tags": (
2158
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2159
    "tag...", "Add tags to the cluster"),
2160
  "remove-tags": (
2161
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2162
    "tag...", "Remove tags from the cluster"),
2163
  "search-tags": (
2164
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2165
    "Searches the tags on all objects on"
2166
    " the cluster for a given pattern (regex)"),
2167
  "queue": (
2168
    QueueOps,
2169
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2170
    [], "drain|undrain|info", "Change queue properties"),
2171
  "watcher": (
2172
    WatcherOps,
2173
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2174
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2175
    [],
2176
    "{pause <timespec>|continue|info}", "Change watcher properties"),
2177
  "modify": (
2178
    SetClusterParams, ARGS_NONE,
2179
    [FORCE_OPT,
2180
     BACKEND_OPT, CP_SIZE_OPT, RQL_OPT,
2181
     ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
2182
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT,
2183
     UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT, DRBD_HELPER_OPT,
2184
     DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT,
2185
     DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2186
     USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] +
2187
     SUBMIT_OPTS +
2188
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
2189
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
2190
    "[opts...]",
2191
    "Alters the parameters of the cluster"),
2192
  "renew-crypto": (
2193
    RenewCrypto, ARGS_NONE,
2194
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2195
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2196
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2197
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2198
     NEW_NODE_CERT_OPT],
2199
    "[opts...]",
2200
    "Renews cluster certificates, keys and secrets"),
2201
  "epo": (
2202
    Epo, [ArgUnknown()],
2203
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2204
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2205
    "[opts...] [args]",
2206
    "Performs an emergency power-off on given args"),
2207
  "activate-master-ip": (
2208
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2209
  "deactivate-master-ip": (
2210
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2211
    "Deactivates the master IP"),
2212
  "show-ispecs-cmd": (
2213
    ShowCreateCommand, ARGS_NONE, [], "",
2214
    "Show the command line to re-create the cluster"),
2215
  "upgrade": (
2216
    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2217
    "Upgrade (or downgrade) to a new Ganeti version"),
2218
  }
2219

    
2220

    
2221
#: dictionary with aliases for commands
2222
aliases = {
2223
  "masterfailover": "master-failover",
2224
  "show": "info",
2225
}
2226

    
2227

    
2228
def Main():
2229
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2230
                     aliases=aliases)