Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 0cffcdb1

History | View | Annotate | Download (71.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import bootstrap
37
from ganeti import compat
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import qlang
45
from ganeti import serializer
46
from ganeti import ssconf
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti.client import base
51

    
52

    
53
ON_OPT = cli_option("--on", default=False,
54
                    action="store_true", dest="on",
55
                    help="Recover from an EPO")
56

    
57
GROUPS_OPT = cli_option("--groups", default=False,
58
                        action="store_true", dest="groups",
59
                        help="Arguments are node groups instead of nodes")
60

    
61
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
62
                            help="Override interactive check for --no-voting",
63
                            default=False, action="store_true")
64

    
65
FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
66
                                help="Unconditionally distribute the"
67
                                " configuration, even if the queue"
68
                                " is drained",
69
                                default=False, action="store_true")
70

    
71
TO_OPT = cli_option("--to", default=None, type="string",
72
                    help="The Ganeti version to upgrade to")
73

    
74
RESUME_OPT = cli_option("--resume", default=False, action="store_true",
75
                        help="Resume any pending Ganeti upgrades")
76

    
77
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
78
_EPO_PING_TIMEOUT = 1 # 1 second
79
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
80

    
81

    
82
def _InitEnabledDiskTemplates(opts):
83
  """Initialize the list of enabled disk templates.
84

85
  """
86
  if opts.enabled_disk_templates:
87
    return opts.enabled_disk_templates.split(",")
88
  else:
89
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
90

    
91

    
92
def _InitVgName(opts, enabled_disk_templates):
93
  """Initialize the volume group name.
94

95
  @type enabled_disk_templates: list of strings
96
  @param enabled_disk_templates: cluster-wide enabled disk templates
97

98
  """
99
  vg_name = None
100
  if opts.vg_name is not None:
101
    vg_name = opts.vg_name
102
    if vg_name:
103
      if not utils.IsLvmEnabled(enabled_disk_templates):
104
        ToStdout("You specified a volume group with --vg-name, but you did not"
105
                 " enable any disk template that uses lvm.")
106
    elif utils.IsLvmEnabled(enabled_disk_templates):
107
      raise errors.OpPrereqError(
108
          "LVM disk templates are enabled, but vg name not set.")
109
  elif utils.IsLvmEnabled(enabled_disk_templates):
110
    vg_name = constants.DEFAULT_VG
111
  return vg_name
112

    
113

    
114
def _InitDrbdHelper(opts, enabled_disk_templates):
115
  """Initialize the DRBD usermode helper.
116

117
  """
118
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
119

    
120
  if not drbd_enabled and opts.drbd_helper is not None:
121
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
122
             " is not enabled.")
123

    
124
  if drbd_enabled:
125
    if opts.drbd_helper is None:
126
      return constants.DEFAULT_DRBD_HELPER
127
    if opts.drbd_helper == '':
128
      raise errors.OpPrereqError(
129
          "Unsetting the drbd usermode helper while enabling DRBD is not"
130
          " allowed.")
131

    
132
  return opts.drbd_helper
133

    
134

    
135
@UsesRPC
136
def InitCluster(opts, args):
137
  """Initialize the cluster.
138

139
  @param opts: the command line options selected by the user
140
  @type args: list
141
  @param args: should contain only one element, the desired
142
      cluster name
143
  @rtype: int
144
  @return: the desired exit code
145

146
  """
147
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
148

    
149
  try:
150
    vg_name = _InitVgName(opts, enabled_disk_templates)
151
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
152
  except errors.OpPrereqError, e:
153
    ToStderr(str(e))
154
    return 1
155

    
156
  master_netdev = opts.master_netdev
157
  if master_netdev is None:
158
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
159
    if not nic_mode:
160
      # default case, use bridging
161
      master_netdev = constants.DEFAULT_BRIDGE
162
    elif nic_mode == constants.NIC_MODE_OVS:
163
      # default ovs is different from default bridge
164
      master_netdev = constants.DEFAULT_OVS
165
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
166

    
167
  hvlist = opts.enabled_hypervisors
168
  if hvlist is None:
169
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
170
  hvlist = hvlist.split(",")
171

    
172
  hvparams = dict(opts.hvparams)
173
  beparams = opts.beparams
174
  nicparams = opts.nicparams
175

    
176
  diskparams = dict(opts.diskparams)
177

    
178
  # check the disk template types here, as we cannot rely on the type check done
179
  # by the opcode parameter types
180
  diskparams_keys = set(diskparams.keys())
181
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
182
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
183
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
184
    return 1
185

    
186
  # prepare beparams dict
187
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
188
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
189

    
190
  # prepare nicparams dict
191
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
192
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
193

    
194
  # prepare ndparams dict
195
  if opts.ndparams is None:
196
    ndparams = dict(constants.NDC_DEFAULTS)
197
  else:
198
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
199
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
200

    
201
  # prepare hvparams dict
202
  for hv in constants.HYPER_TYPES:
203
    if hv not in hvparams:
204
      hvparams[hv] = {}
205
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
206
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
207

    
208
  # prepare diskparams dict
209
  for templ in constants.DISK_TEMPLATES:
210
    if templ not in diskparams:
211
      diskparams[templ] = {}
212
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
213
                                         diskparams[templ])
214
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
215

    
216
  # prepare ipolicy dict
217
  ipolicy = CreateIPolicyFromOpts(
218
    ispecs_mem_size=opts.ispecs_mem_size,
219
    ispecs_cpu_count=opts.ispecs_cpu_count,
220
    ispecs_disk_count=opts.ispecs_disk_count,
221
    ispecs_disk_size=opts.ispecs_disk_size,
222
    ispecs_nic_count=opts.ispecs_nic_count,
223
    minmax_ispecs=opts.ipolicy_bounds_specs,
224
    std_ispecs=opts.ipolicy_std_specs,
225
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
226
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
227
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
228
    fill_all=True)
229

    
230
  if opts.candidate_pool_size is None:
231
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
232

    
233
  if opts.mac_prefix is None:
234
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
235

    
236
  uid_pool = opts.uid_pool
237
  if uid_pool is not None:
238
    uid_pool = uidpool.ParseUidPool(uid_pool)
239

    
240
  if opts.prealloc_wipe_disks is None:
241
    opts.prealloc_wipe_disks = False
242

    
243
  external_ip_setup_script = opts.use_external_mip_script
244
  if external_ip_setup_script is None:
245
    external_ip_setup_script = False
246

    
247
  try:
248
    primary_ip_version = int(opts.primary_ip_version)
249
  except (ValueError, TypeError), err:
250
    ToStderr("Invalid primary ip version value: %s" % str(err))
251
    return 1
252

    
253
  master_netmask = opts.master_netmask
254
  try:
255
    if master_netmask is not None:
256
      master_netmask = int(master_netmask)
257
  except (ValueError, TypeError), err:
258
    ToStderr("Invalid master netmask value: %s" % str(err))
259
    return 1
260

    
261
  if opts.disk_state:
262
    disk_state = utils.FlatToDict(opts.disk_state)
263
  else:
264
    disk_state = {}
265

    
266
  hv_state = dict(opts.hv_state)
267

    
268
  default_ialloc_params = opts.default_iallocator_params
269
  bootstrap.InitCluster(cluster_name=args[0],
270
                        secondary_ip=opts.secondary_ip,
271
                        vg_name=vg_name,
272
                        mac_prefix=opts.mac_prefix,
273
                        master_netmask=master_netmask,
274
                        master_netdev=master_netdev,
275
                        file_storage_dir=opts.file_storage_dir,
276
                        shared_file_storage_dir=opts.shared_file_storage_dir,
277
                        gluster_storage_dir=opts.gluster_storage_dir,
278
                        enabled_hypervisors=hvlist,
279
                        hvparams=hvparams,
280
                        beparams=beparams,
281
                        nicparams=nicparams,
282
                        ndparams=ndparams,
283
                        diskparams=diskparams,
284
                        ipolicy=ipolicy,
285
                        candidate_pool_size=opts.candidate_pool_size,
286
                        modify_etc_hosts=opts.modify_etc_hosts,
287
                        modify_ssh_setup=opts.modify_ssh_setup,
288
                        maintain_node_health=opts.maintain_node_health,
289
                        drbd_helper=drbd_helper,
290
                        uid_pool=uid_pool,
291
                        default_iallocator=opts.default_iallocator,
292
                        default_iallocator_params=default_ialloc_params,
293
                        primary_ip_version=primary_ip_version,
294
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
295
                        use_external_mip_script=external_ip_setup_script,
296
                        hv_state=hv_state,
297
                        disk_state=disk_state,
298
                        enabled_disk_templates=enabled_disk_templates,
299
                        )
300
  op = opcodes.OpClusterPostInit()
301
  SubmitOpCode(op, opts=opts)
302
  return 0
303

    
304

    
305
@UsesRPC
306
def DestroyCluster(opts, args):
307
  """Destroy the cluster.
308

309
  @param opts: the command line options selected by the user
310
  @type args: list
311
  @param args: should be an empty list
312
  @rtype: int
313
  @return: the desired exit code
314

315
  """
316
  if not opts.yes_do_it:
317
    ToStderr("Destroying a cluster is irreversible. If you really want"
318
             " destroy this cluster, supply the --yes-do-it option.")
319
    return 1
320

    
321
  op = opcodes.OpClusterDestroy()
322
  master_uuid = SubmitOpCode(op, opts=opts)
323
  # if we reached this, the opcode didn't fail; we can proceed to
324
  # shutdown all the daemons
325
  bootstrap.FinalizeClusterDestroy(master_uuid)
326
  return 0
327

    
328

    
329
def RenameCluster(opts, args):
330
  """Rename the cluster.
331

332
  @param opts: the command line options selected by the user
333
  @type args: list
334
  @param args: should contain only one element, the new cluster name
335
  @rtype: int
336
  @return: the desired exit code
337

338
  """
339
  cl = GetClient()
340

    
341
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
342

    
343
  new_name = args[0]
344
  if not opts.force:
345
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
346
                " connected over the network to the cluster name, the"
347
                " operation is very dangerous as the IP address will be"
348
                " removed from the node and the change may not go through."
349
                " Continue?") % (cluster_name, new_name)
350
    if not AskUser(usertext):
351
      return 1
352

    
353
  op = opcodes.OpClusterRename(name=new_name)
354
  result = SubmitOpCode(op, opts=opts, cl=cl)
355

    
356
  if result:
357
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
358

    
359
  return 0
360

    
361

    
362
def ActivateMasterIp(opts, args):
363
  """Activates the master IP.
364

365
  """
366
  op = opcodes.OpClusterActivateMasterIp()
367
  SubmitOpCode(op)
368
  return 0
369

    
370

    
371
def DeactivateMasterIp(opts, args):
372
  """Deactivates the master IP.
373

374
  """
375
  if not opts.confirm:
376
    usertext = ("This will disable the master IP. All the open connections to"
377
                " the master IP will be closed. To reach the master you will"
378
                " need to use its node IP."
379
                " Continue?")
380
    if not AskUser(usertext):
381
      return 1
382

    
383
  op = opcodes.OpClusterDeactivateMasterIp()
384
  SubmitOpCode(op)
385
  return 0
386

    
387

    
388
def RedistributeConfig(opts, args):
389
  """Forces push of the cluster configuration.
390

391
  @param opts: the command line options selected by the user
392
  @type args: list
393
  @param args: empty list
394
  @rtype: int
395
  @return: the desired exit code
396

397
  """
398
  op = opcodes.OpClusterRedistConf()
399
  if opts.yes_do_it:
400
    SubmitOpCodeToDrainedQueue(op)
401
  else:
402
    SubmitOrSend(op, opts)
403
  return 0
404

    
405

    
406
def ShowClusterVersion(opts, args):
407
  """Write version of ganeti software to the standard output.
408

409
  @param opts: the command line options selected by the user
410
  @type args: list
411
  @param args: should be an empty list
412
  @rtype: int
413
  @return: the desired exit code
414

415
  """
416
  cl = GetClient()
417
  result = cl.QueryClusterInfo()
418
  ToStdout("Software version: %s", result["software_version"])
419
  ToStdout("Internode protocol: %s", result["protocol_version"])
420
  ToStdout("Configuration format: %s", result["config_version"])
421
  ToStdout("OS api version: %s", result["os_api_version"])
422
  ToStdout("Export interface: %s", result["export_version"])
423
  ToStdout("VCS version: %s", result["vcs_version"])
424
  return 0
425

    
426

    
427
def ShowClusterMaster(opts, args):
428
  """Write name of master node to the standard output.
429

430
  @param opts: the command line options selected by the user
431
  @type args: list
432
  @param args: should be an empty list
433
  @rtype: int
434
  @return: the desired exit code
435

436
  """
437
  master = bootstrap.GetMaster()
438
  ToStdout(master)
439
  return 0
440

    
441

    
442
def _FormatGroupedParams(paramsdict, roman=False):
443
  """Format Grouped parameters (be, nic, disk) by group.
444

445
  @type paramsdict: dict of dicts
446
  @param paramsdict: {group: {param: value, ...}, ...}
447
  @rtype: dict of dicts
448
  @return: copy of the input dictionaries with strings as values
449

450
  """
451
  ret = {}
452
  for (item, val) in paramsdict.items():
453
    if isinstance(val, dict):
454
      ret[item] = _FormatGroupedParams(val, roman=roman)
455
    elif roman and isinstance(val, int):
456
      ret[item] = compat.TryToRoman(val)
457
    else:
458
      ret[item] = str(val)
459
  return ret
460

    
461

    
462
def ShowClusterConfig(opts, args):
463
  """Shows cluster information.
464

465
  @param opts: the command line options selected by the user
466
  @type args: list
467
  @param args: should be an empty list
468
  @rtype: int
469
  @return: the desired exit code
470

471
  """
472
  cl = GetClient()
473
  result = cl.QueryClusterInfo()
474

    
475
  if result["tags"]:
476
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
477
  else:
478
    tags = "(none)"
479
  if result["reserved_lvs"]:
480
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
481
  else:
482
    reserved_lvs = "(none)"
483

    
484
  enabled_hv = result["enabled_hypervisors"]
485
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
486
                  if k in enabled_hv)
487

    
488
  info = [
489
    ("Cluster name", result["name"]),
490
    ("Cluster UUID", result["uuid"]),
491

    
492
    ("Creation time", utils.FormatTime(result["ctime"])),
493
    ("Modification time", utils.FormatTime(result["mtime"])),
494

    
495
    ("Master node", result["master"]),
496

    
497
    ("Architecture (this node)",
498
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
499

    
500
    ("Tags", tags),
501

    
502
    ("Default hypervisor", result["default_hypervisor"]),
503
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
504

    
505
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
506

    
507
    ("OS-specific hypervisor parameters",
508
     _FormatGroupedParams(result["os_hvp"])),
509

    
510
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
511

    
512
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
513
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
514

    
515
    ("Cluster parameters", [
516
      ("candidate pool size",
517
       compat.TryToRoman(result["candidate_pool_size"],
518
                         convert=opts.roman_integers)),
519
      ("maximal number of jobs running simultaneously",
520
       compat.TryToRoman(result["max_running_jobs"],
521
                         convert=opts.roman_integers)),
522
      ("mac prefix", result["mac_prefix"]),
523
      ("master netdev", result["master_netdev"]),
524
      ("master netmask", result["master_netmask"]),
525
      ("use external master IP address setup script",
526
       result["use_external_mip_script"]),
527
      ("lvm volume group", result["volume_group_name"]),
528
      ("lvm reserved volumes", reserved_lvs),
529
      ("drbd usermode helper", result["drbd_usermode_helper"]),
530
      ("file storage path", result["file_storage_dir"]),
531
      ("shared file storage path", result["shared_file_storage_dir"]),
532
      ("gluster storage path", result["gluster_storage_dir"]),
533
      ("maintenance of node health", result["maintain_node_health"]),
534
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
535
      ("default instance allocator", result["default_iallocator"]),
536
      ("default instance allocator parameters",
537
       result["default_iallocator_params"]),
538
      ("primary ip version", result["primary_ip_version"]),
539
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
540
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
541
      ("ExtStorage Providers search path",
542
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
543
      ("enabled disk templates",
544
       utils.CommaJoin(result["enabled_disk_templates"])),
545
      ("instance communication network",
546
       result["instance_communication_network"]),
547
      ]),
548

    
549
    ("Default node parameters",
550
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
551

    
552
    ("Default instance parameters",
553
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
554

    
555
    ("Default nic parameters",
556
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
557

    
558
    ("Default disk parameters",
559
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
560

    
561
    ("Instance policy - limits for instances",
562
     FormatPolicyInfo(result["ipolicy"], None, True)),
563
    ]
564

    
565
  PrintGenericInfo(info)
566
  return 0
567

    
568

    
569
def ClusterCopyFile(opts, args):
570
  """Copy a file from master to some nodes.
571

572
  @param opts: the command line options selected by the user
573
  @type args: list
574
  @param args: should contain only one element, the path of
575
      the file to be copied
576
  @rtype: int
577
  @return: the desired exit code
578

579
  """
580
  filename = args[0]
581
  if not os.path.exists(filename):
582
    raise errors.OpPrereqError("No such filename '%s'" % filename,
583
                               errors.ECODE_INVAL)
584

    
585
  cl = GetClient()
586
  qcl = GetClient()
587
  try:
588
    cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
589

    
590
    results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
591
                             secondary_ips=opts.use_replication_network,
592
                             nodegroup=opts.nodegroup)
593
    ports = GetNodesSshPorts(opts.nodes, qcl)
594
  finally:
595
    cl.Close()
596
    qcl.Close()
597

    
598
  srun = ssh.SshRunner(cluster_name)
599
  for (node, port) in zip(results, ports):
600
    if not srun.CopyFileToNode(node, port, filename):
601
      ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
602

    
603
  return 0
604

    
605

    
606
def RunClusterCommand(opts, args):
607
  """Run a command on some nodes.
608

609
  @param opts: the command line options selected by the user
610
  @type args: list
611
  @param args: should contain the command to be run and its arguments
612
  @rtype: int
613
  @return: the desired exit code
614

615
  """
616
  cl = GetClient()
617
  qcl = GetClient()
618

    
619
  command = " ".join(args)
620

    
621
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
622
  ports = GetNodesSshPorts(nodes, qcl)
623

    
624
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
625
                                                    "master_node"])
626

    
627
  srun = ssh.SshRunner(cluster_name=cluster_name)
628

    
629
  # Make sure master node is at list end
630
  if master_node in nodes:
631
    nodes.remove(master_node)
632
    nodes.append(master_node)
633

    
634
  for (name, port) in zip(nodes, ports):
635
    result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
636

    
637
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
638
      # Do not output anything for successful commands
639
      continue
640

    
641
    ToStdout("------------------------------------------------")
642
    if opts.show_machine_names:
643
      for line in result.output.splitlines():
644
        ToStdout("%s: %s", name, line)
645
    else:
646
      ToStdout("node: %s", name)
647
      ToStdout("%s", result.output)
648
    ToStdout("return code = %s", result.exit_code)
649

    
650
  return 0
651

    
652

    
653
def VerifyCluster(opts, args):
654
  """Verify integrity of cluster, performing various test on nodes.
655

656
  @param opts: the command line options selected by the user
657
  @type args: list
658
  @param args: should be an empty list
659
  @rtype: int
660
  @return: the desired exit code
661

662
  """
663
  skip_checks = []
664

    
665
  if opts.skip_nplusone_mem:
666
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
667

    
668
  cl = GetClient()
669

    
670
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
671
                               error_codes=opts.error_codes,
672
                               debug_simulate_errors=opts.simulate_errors,
673
                               skip_checks=skip_checks,
674
                               ignore_errors=opts.ignore_errors,
675
                               group_name=opts.nodegroup)
676
  result = SubmitOpCode(op, cl=cl, opts=opts)
677

    
678
  # Keep track of submitted jobs
679
  jex = JobExecutor(cl=cl, opts=opts)
680

    
681
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
682
    jex.AddJobId(None, status, job_id)
683

    
684
  results = jex.GetResults()
685

    
686
  (bad_jobs, bad_results) = \
687
    map(len,
688
        # Convert iterators to lists
689
        map(list,
690
            # Count errors
691
            map(compat.partial(itertools.ifilterfalse, bool),
692
                # Convert result to booleans in a tuple
693
                zip(*((job_success, len(op_results) == 1 and op_results[0])
694
                      for (job_success, op_results) in results)))))
695

    
696
  if bad_jobs == 0 and bad_results == 0:
697
    rcode = constants.EXIT_SUCCESS
698
  else:
699
    rcode = constants.EXIT_FAILURE
700
    if bad_jobs > 0:
701
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
702

    
703
  return rcode
704

    
705

    
706
def VerifyDisks(opts, args):
707
  """Verify integrity of cluster disks.
708

709
  @param opts: the command line options selected by the user
710
  @type args: list
711
  @param args: should be an empty list
712
  @rtype: int
713
  @return: the desired exit code
714

715
  """
716
  cl = GetClient()
717

    
718
  op = opcodes.OpClusterVerifyDisks()
719

    
720
  result = SubmitOpCode(op, cl=cl, opts=opts)
721

    
722
  # Keep track of submitted jobs
723
  jex = JobExecutor(cl=cl, opts=opts)
724

    
725
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
726
    jex.AddJobId(None, status, job_id)
727

    
728
  retcode = constants.EXIT_SUCCESS
729

    
730
  for (status, result) in jex.GetResults():
731
    if not status:
732
      ToStdout("Job failed: %s", result)
733
      continue
734

    
735
    ((bad_nodes, instances, missing), ) = result
736

    
737
    for node, text in bad_nodes.items():
738
      ToStdout("Error gathering data on node %s: %s",
739
               node, utils.SafeEncode(text[-400:]))
740
      retcode = constants.EXIT_FAILURE
741
      ToStdout("You need to fix these nodes first before fixing instances")
742

    
743
    for iname in instances:
744
      if iname in missing:
745
        continue
746
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
747
      try:
748
        ToStdout("Activating disks for instance '%s'", iname)
749
        SubmitOpCode(op, opts=opts, cl=cl)
750
      except errors.GenericError, err:
751
        nret, msg = FormatError(err)
752
        retcode |= nret
753
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
754

    
755
    if missing:
756
      for iname, ival in missing.iteritems():
757
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
758
        if all_missing:
759
          ToStdout("Instance %s cannot be verified as it lives on"
760
                   " broken nodes", iname)
761
        else:
762
          ToStdout("Instance %s has missing logical volumes:", iname)
763
          ival.sort()
764
          for node, vol in ival:
765
            if node in bad_nodes:
766
              ToStdout("\tbroken node %s /dev/%s", node, vol)
767
            else:
768
              ToStdout("\t%s /dev/%s", node, vol)
769

    
770
      ToStdout("You need to replace or recreate disks for all the above"
771
               " instances if this message persists after fixing broken nodes.")
772
      retcode = constants.EXIT_FAILURE
773
    elif not instances:
774
      ToStdout("No disks need to be activated.")
775

    
776
  return retcode
777

    
778

    
779
def RepairDiskSizes(opts, args):
780
  """Verify sizes of cluster disks.
781

782
  @param opts: the command line options selected by the user
783
  @type args: list
784
  @param args: optional list of instances to restrict check to
785
  @rtype: int
786
  @return: the desired exit code
787

788
  """
789
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
790
  SubmitOpCode(op, opts=opts)
791

    
792

    
793
@UsesRPC
794
def MasterFailover(opts, args):
795
  """Failover the master node.
796

797
  This command, when run on a non-master node, will cause the current
798
  master to cease being master, and the non-master to become new
799
  master.
800

801
  @param opts: the command line options selected by the user
802
  @type args: list
803
  @param args: should be an empty list
804
  @rtype: int
805
  @return: the desired exit code
806

807
  """
808
  if opts.no_voting and not opts.yes_do_it:
809
    usertext = ("This will perform the failover even if most other nodes"
810
                " are down, or if this node is outdated. This is dangerous"
811
                " as it can lead to a non-consistent cluster. Check the"
812
                " gnt-cluster(8) man page before proceeding. Continue?")
813
    if not AskUser(usertext):
814
      return 1
815

    
816
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
817

    
818

    
819
def MasterPing(opts, args):
820
  """Checks if the master is alive.
821

822
  @param opts: the command line options selected by the user
823
  @type args: list
824
  @param args: should be an empty list
825
  @rtype: int
826
  @return: the desired exit code
827

828
  """
829
  try:
830
    cl = GetClient()
831
    cl.QueryClusterInfo()
832
    return 0
833
  except Exception: # pylint: disable=W0703
834
    return 1
835

    
836

    
837
def SearchTags(opts, args):
838
  """Searches the tags on all the cluster.
839

840
  @param opts: the command line options selected by the user
841
  @type args: list
842
  @param args: should contain only one element, the tag pattern
843
  @rtype: int
844
  @return: the desired exit code
845

846
  """
847
  op = opcodes.OpTagsSearch(pattern=args[0])
848
  result = SubmitOpCode(op, opts=opts)
849
  if not result:
850
    return 1
851
  result = list(result)
852
  result.sort()
853
  for path, tag in result:
854
    ToStdout("%s %s", path, tag)
855

    
856

    
857
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
858
  """Reads and verifies an X509 certificate.
859

860
  @type cert_filename: string
861
  @param cert_filename: the path of the file containing the certificate to
862
                        verify encoded in PEM format
863
  @type verify_private_key: bool
864
  @param verify_private_key: whether to verify the private key in addition to
865
                             the public certificate
866
  @rtype: string
867
  @return: a string containing the PEM-encoded certificate.
868

869
  """
870
  try:
871
    pem = utils.ReadFile(cert_filename)
872
  except IOError, err:
873
    raise errors.X509CertError(cert_filename,
874
                               "Unable to read certificate: %s" % str(err))
875

    
876
  try:
877
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
878
  except Exception, err:
879
    raise errors.X509CertError(cert_filename,
880
                               "Unable to load certificate: %s" % str(err))
881

    
882
  if verify_private_key:
883
    try:
884
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
885
    except Exception, err:
886
      raise errors.X509CertError(cert_filename,
887
                                 "Unable to load private key: %s" % str(err))
888

    
889
  return pem
890

    
891

    
892
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
893
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
894
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
895
                 cds_filename, force, new_node_cert):
896
  """Renews cluster certificates, keys and secrets.
897

898
  @type new_cluster_cert: bool
899
  @param new_cluster_cert: Whether to generate a new cluster certificate
900
  @type new_rapi_cert: bool
901
  @param new_rapi_cert: Whether to generate a new RAPI certificate
902
  @type rapi_cert_filename: string
903
  @param rapi_cert_filename: Path to file containing new RAPI certificate
904
  @type new_spice_cert: bool
905
  @param new_spice_cert: Whether to generate a new SPICE certificate
906
  @type spice_cert_filename: string
907
  @param spice_cert_filename: Path to file containing new SPICE certificate
908
  @type spice_cacert_filename: string
909
  @param spice_cacert_filename: Path to file containing the certificate of the
910
                                CA that signed the SPICE certificate
911
  @type new_confd_hmac_key: bool
912
  @param new_confd_hmac_key: Whether to generate a new HMAC key
913
  @type new_cds: bool
914
  @param new_cds: Whether to generate a new cluster domain secret
915
  @type cds_filename: string
916
  @param cds_filename: Path to file containing new cluster domain secret
917
  @type force: bool
918
  @param force: Whether to ask user for confirmation
919
  @type new_node_cert: string
920
  @param new_node_cert: Whether to generate new node certificates
921

922
  """
923
  if new_rapi_cert and rapi_cert_filename:
924
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
925
             " options can be specified at the same time.")
926
    return 1
927

    
928
  if new_cds and cds_filename:
929
    ToStderr("Only one of the --new-cluster-domain-secret and"
930
             " --cluster-domain-secret options can be specified at"
931
             " the same time.")
932
    return 1
933

    
934
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
935
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
936
             " and --spice-ca-certificate must not be used.")
937
    return 1
938

    
939
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
940
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
941
             " specified.")
942
    return 1
943

    
944
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
945
  try:
946
    if rapi_cert_filename:
947
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
948
    if spice_cert_filename:
949
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
950
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
951
  except errors.X509CertError, err:
952
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
953
    return 1
954

    
955
  if cds_filename:
956
    try:
957
      cds = utils.ReadFile(cds_filename)
958
    except Exception, err: # pylint: disable=W0703
959
      ToStderr("Can't load new cluster domain secret from %s: %s" %
960
               (cds_filename, str(err)))
961
      return 1
962
  else:
963
    cds = None
964

    
965
  if not force:
966
    usertext = ("This requires all daemons on all nodes to be restarted and"
967
                " may take some time. Continue?")
968
    if not AskUser(usertext):
969
      return 1
970

    
971
  def _RenewCryptoInner(ctx):
972
    ctx.feedback_fn("Updating certificates and keys")
973
    # Note: the node certificate will be generated in the LU
974
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
975
                                    new_rapi_cert,
976
                                    new_spice_cert,
977
                                    new_confd_hmac_key,
978
                                    new_cds,
979
                                    rapi_cert_pem=rapi_cert_pem,
980
                                    spice_cert_pem=spice_cert_pem,
981
                                    spice_cacert_pem=spice_cacert_pem,
982
                                    cds=cds)
983

    
984
    files_to_copy = []
985

    
986
    if new_cluster_cert:
987
      files_to_copy.append(pathutils.NODED_CERT_FILE)
988

    
989
    if new_rapi_cert or rapi_cert_pem:
990
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
991

    
992
    if new_spice_cert or spice_cert_pem:
993
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
994
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
995

    
996
    if new_confd_hmac_key:
997
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
998

    
999
    if new_cds or cds:
1000
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
1001

    
1002
    if files_to_copy:
1003
      for node_name in ctx.nonmaster_nodes:
1004
        port = ctx.ssh_ports[node_name]
1005
        ctx.feedback_fn("Copying %s to %s:%d" %
1006
                        (", ".join(files_to_copy), node_name, port))
1007
        for file_name in files_to_copy:
1008
          ctx.ssh.CopyFileToNode(node_name, port, file_name)
1009

    
1010
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1011

    
1012
  ToStdout("All requested certificates and keys have been replaced."
1013
           " Running \"gnt-cluster verify\" now is recommended.")
1014

    
1015
  if new_node_cert:
1016
    cl = GetClient()
1017
    renew_op = opcodes.OpClusterRenewCrypto()
1018
    SubmitOpCode(renew_op, cl=cl)
1019

    
1020
  return 0
1021

    
1022

    
1023
def RenewCrypto(opts, args):
1024
  """Renews cluster certificates, keys and secrets.
1025

1026
  """
1027
  return _RenewCrypto(opts.new_cluster_cert,
1028
                      opts.new_rapi_cert,
1029
                      opts.rapi_cert,
1030
                      opts.new_spice_cert,
1031
                      opts.spice_cert,
1032
                      opts.spice_cacert,
1033
                      opts.new_confd_hmac_key,
1034
                      opts.new_cluster_domain_secret,
1035
                      opts.cluster_domain_secret,
1036
                      opts.force,
1037
                      opts.new_node_cert)
1038

    
1039

    
1040
def _GetEnabledDiskTemplates(opts):
1041
  """Determine the list of enabled disk templates.
1042

1043
  """
1044
  if opts.enabled_disk_templates:
1045
    return opts.enabled_disk_templates.split(",")
1046
  else:
1047
    return None
1048

    
1049

    
1050
def _GetVgName(opts, enabled_disk_templates):
1051
  """Determine the volume group name.
1052

1053
  @type enabled_disk_templates: list of strings
1054
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1055

1056
  """
1057
  # consistency between vg name and enabled disk templates
1058
  vg_name = None
1059
  if opts.vg_name is not None:
1060
    vg_name = opts.vg_name
1061
  if enabled_disk_templates:
1062
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1063
      ToStdout("You specified a volume group with --vg-name, but you did not"
1064
               " enable any of the following lvm-based disk templates: %s" %
1065
               utils.CommaJoin(constants.DTS_LVM))
1066
  return vg_name
1067

    
1068

    
1069
def _GetDrbdHelper(opts, enabled_disk_templates):
1070
  """Determine the DRBD usermode helper.
1071

1072
  """
1073
  drbd_helper = opts.drbd_helper
1074
  if enabled_disk_templates:
1075
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1076
    if not drbd_enabled and opts.drbd_helper:
1077
      ToStdout("You specified a DRBD usermode helper with "
1078
               " --drbd-usermode-helper while DRBD is not enabled.")
1079
  return drbd_helper
1080

    
1081

    
1082
def SetClusterParams(opts, args):
1083
  """Modify the cluster.
1084

1085
  @param opts: the command line options selected by the user
1086
  @type args: list
1087
  @param args: should be an empty list
1088
  @rtype: int
1089
  @return: the desired exit code
1090

1091
  """
1092
  if not (opts.vg_name is not None or
1093
          opts.drbd_helper is not None or
1094
          opts.enabled_hypervisors or opts.hvparams or
1095
          opts.beparams or opts.nicparams or
1096
          opts.ndparams or opts.diskparams or
1097
          opts.candidate_pool_size is not None or
1098
          opts.max_running_jobs is not None or
1099
          opts.uid_pool is not None or
1100
          opts.maintain_node_health is not None or
1101
          opts.add_uids is not None or
1102
          opts.remove_uids is not None or
1103
          opts.default_iallocator is not None or
1104
          opts.default_iallocator_params or
1105
          opts.reserved_lvs is not None or
1106
          opts.mac_prefix is not None or
1107
          opts.master_netdev is not None or
1108
          opts.master_netmask is not None or
1109
          opts.use_external_mip_script is not None or
1110
          opts.prealloc_wipe_disks is not None or
1111
          opts.hv_state or
1112
          opts.enabled_disk_templates or
1113
          opts.disk_state or
1114
          opts.ipolicy_bounds_specs is not None or
1115
          opts.ipolicy_std_specs is not None or
1116
          opts.ipolicy_disk_templates is not None or
1117
          opts.ipolicy_vcpu_ratio is not None or
1118
          opts.ipolicy_spindle_ratio is not None or
1119
          opts.modify_etc_hosts is not None or
1120
          opts.file_storage_dir is not None or
1121
          opts.instance_communication_network is not None):
1122
    ToStderr("Please give at least one of the parameters.")
1123
    return 1
1124

    
1125
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1126
  vg_name = _GetVgName(opts, enabled_disk_templates)
1127

    
1128
  try:
1129
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1130
  except errors.OpPrereqError, e:
1131
    ToStderr(str(e))
1132
    return 1
1133

    
1134
  hvlist = opts.enabled_hypervisors
1135
  if hvlist is not None:
1136
    hvlist = hvlist.split(",")
1137

    
1138
  # a list of (name, dict) we can pass directly to dict() (or [])
1139
  hvparams = dict(opts.hvparams)
1140
  for hv_params in hvparams.values():
1141
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1142

    
1143
  diskparams = dict(opts.diskparams)
1144

    
1145
  for dt_params in diskparams.values():
1146
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1147

    
1148
  beparams = opts.beparams
1149
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1150

    
1151
  nicparams = opts.nicparams
1152
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1153

    
1154
  ndparams = opts.ndparams
1155
  if ndparams is not None:
1156
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1157

    
1158
  ipolicy = CreateIPolicyFromOpts(
1159
    minmax_ispecs=opts.ipolicy_bounds_specs,
1160
    std_ispecs=opts.ipolicy_std_specs,
1161
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1162
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1163
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1164
    )
1165

    
1166
  mnh = opts.maintain_node_health
1167

    
1168
  uid_pool = opts.uid_pool
1169
  if uid_pool is not None:
1170
    uid_pool = uidpool.ParseUidPool(uid_pool)
1171

    
1172
  add_uids = opts.add_uids
1173
  if add_uids is not None:
1174
    add_uids = uidpool.ParseUidPool(add_uids)
1175

    
1176
  remove_uids = opts.remove_uids
1177
  if remove_uids is not None:
1178
    remove_uids = uidpool.ParseUidPool(remove_uids)
1179

    
1180
  if opts.reserved_lvs is not None:
1181
    if opts.reserved_lvs == "":
1182
      opts.reserved_lvs = []
1183
    else:
1184
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1185

    
1186
  if opts.master_netmask is not None:
1187
    try:
1188
      opts.master_netmask = int(opts.master_netmask)
1189
    except ValueError:
1190
      ToStderr("The --master-netmask option expects an int parameter.")
1191
      return 1
1192

    
1193
  ext_ip_script = opts.use_external_mip_script
1194

    
1195
  if opts.disk_state:
1196
    disk_state = utils.FlatToDict(opts.disk_state)
1197
  else:
1198
    disk_state = {}
1199

    
1200
  hv_state = dict(opts.hv_state)
1201

    
1202
  op = opcodes.OpClusterSetParams(
1203
    vg_name=vg_name,
1204
    drbd_helper=drbd_helper,
1205
    enabled_hypervisors=hvlist,
1206
    hvparams=hvparams,
1207
    os_hvp=None,
1208
    beparams=beparams,
1209
    nicparams=nicparams,
1210
    ndparams=ndparams,
1211
    diskparams=diskparams,
1212
    ipolicy=ipolicy,
1213
    candidate_pool_size=opts.candidate_pool_size,
1214
    max_running_jobs=opts.max_running_jobs,
1215
    maintain_node_health=mnh,
1216
    modify_etc_hosts=opts.modify_etc_hosts,
1217
    uid_pool=uid_pool,
1218
    add_uids=add_uids,
1219
    remove_uids=remove_uids,
1220
    default_iallocator=opts.default_iallocator,
1221
    default_iallocator_params=opts.default_iallocator_params,
1222
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1223
    mac_prefix=opts.mac_prefix,
1224
    master_netdev=opts.master_netdev,
1225
    master_netmask=opts.master_netmask,
1226
    reserved_lvs=opts.reserved_lvs,
1227
    use_external_mip_script=ext_ip_script,
1228
    hv_state=hv_state,
1229
    disk_state=disk_state,
1230
    enabled_disk_templates=enabled_disk_templates,
1231
    force=opts.force,
1232
    file_storage_dir=opts.file_storage_dir,
1233
    instance_communication_network=opts.instance_communication_network
1234
    )
1235
  return base.GetResult(None, opts, SubmitOrSend(op, opts))
1236

    
1237

    
1238
def QueueOps(opts, args):
1239
  """Queue operations.
1240

1241
  @param opts: the command line options selected by the user
1242
  @type args: list
1243
  @param args: should contain only one element, the subcommand
1244
  @rtype: int
1245
  @return: the desired exit code
1246

1247
  """
1248
  command = args[0]
1249
  client = GetClient()
1250
  if command in ("drain", "undrain"):
1251
    drain_flag = command == "drain"
1252
    client.SetQueueDrainFlag(drain_flag)
1253
  elif command == "info":
1254
    result = client.QueryConfigValues(["drain_flag"])
1255
    if result[0]:
1256
      val = "set"
1257
    else:
1258
      val = "unset"
1259
    ToStdout("The drain flag is %s" % val)
1260
  else:
1261
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1262
                               errors.ECODE_INVAL)
1263

    
1264
  return 0
1265

    
1266

    
1267
def _ShowWatcherPause(until):
1268
  if until is None or until < time.time():
1269
    ToStdout("The watcher is not paused.")
1270
  else:
1271
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1272

    
1273

    
1274
def WatcherOps(opts, args):
1275
  """Watcher operations.
1276

1277
  @param opts: the command line options selected by the user
1278
  @type args: list
1279
  @param args: should contain only one element, the subcommand
1280
  @rtype: int
1281
  @return: the desired exit code
1282

1283
  """
1284
  command = args[0]
1285
  client = GetClient()
1286

    
1287
  if command == "continue":
1288
    client.SetWatcherPause(None)
1289
    ToStdout("The watcher is no longer paused.")
1290

    
1291
  elif command == "pause":
1292
    if len(args) < 2:
1293
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1294

    
1295
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1296
    _ShowWatcherPause(result)
1297

    
1298
  elif command == "info":
1299
    result = client.QueryConfigValues(["watcher_pause"])
1300
    _ShowWatcherPause(result[0])
1301

    
1302
  else:
1303
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1304
                               errors.ECODE_INVAL)
1305

    
1306
  return 0
1307

    
1308

    
1309
def _OobPower(opts, node_list, power):
1310
  """Puts the node in the list to desired power state.
1311

1312
  @param opts: The command line options selected by the user
1313
  @param node_list: The list of nodes to operate on
1314
  @param power: True if they should be powered on, False otherwise
1315
  @return: The success of the operation (none failed)
1316

1317
  """
1318
  if power:
1319
    command = constants.OOB_POWER_ON
1320
  else:
1321
    command = constants.OOB_POWER_OFF
1322

    
1323
  op = opcodes.OpOobCommand(node_names=node_list,
1324
                            command=command,
1325
                            ignore_status=True,
1326
                            timeout=opts.oob_timeout,
1327
                            power_delay=opts.power_delay)
1328
  result = SubmitOpCode(op, opts=opts)
1329
  errs = 0
1330
  for node_result in result:
1331
    (node_tuple, data_tuple) = node_result
1332
    (_, node_name) = node_tuple
1333
    (data_status, _) = data_tuple
1334
    if data_status != constants.RS_NORMAL:
1335
      assert data_status != constants.RS_UNAVAIL
1336
      errs += 1
1337
      ToStderr("There was a problem changing power for %s, please investigate",
1338
               node_name)
1339

    
1340
  if errs > 0:
1341
    return False
1342

    
1343
  return True
1344

    
1345

    
1346
def _InstanceStart(opts, inst_list, start, no_remember=False):
1347
  """Puts the instances in the list to desired state.
1348

1349
  @param opts: The command line options selected by the user
1350
  @param inst_list: The list of instances to operate on
1351
  @param start: True if they should be started, False for shutdown
1352
  @param no_remember: If the instance state should be remembered
1353
  @return: The success of the operation (none failed)
1354

1355
  """
1356
  if start:
1357
    opcls = opcodes.OpInstanceStartup
1358
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1359
  else:
1360
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1361
                           timeout=opts.shutdown_timeout,
1362
                           no_remember=no_remember)
1363
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1364

    
1365
  jex = JobExecutor(opts=opts)
1366

    
1367
  for inst in inst_list:
1368
    ToStdout("Submit %s of instance %s", text_submit, inst)
1369
    op = opcls(instance_name=inst)
1370
    jex.QueueJob(inst, op)
1371

    
1372
  results = jex.GetResults()
1373
  bad_cnt = len([1 for (success, _) in results if not success])
1374

    
1375
  if bad_cnt == 0:
1376
    ToStdout("All instances have been %s successfully", text_success)
1377
  else:
1378
    ToStderr("There were errors while %s instances:\n"
1379
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1380
             len(results))
1381
    return False
1382

    
1383
  return True
1384

    
1385

    
1386
class _RunWhenNodesReachableHelper:
1387
  """Helper class to make shared internal state sharing easier.
1388

1389
  @ivar success: Indicates if all action_cb calls were successful
1390

1391
  """
1392
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1393
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1394
    """Init the object.
1395

1396
    @param node_list: The list of nodes to be reachable
1397
    @param action_cb: Callback called when a new host is reachable
1398
    @type node2ip: dict
1399
    @param node2ip: Node to ip mapping
1400
    @param port: The port to use for the TCP ping
1401
    @param feedback_fn: The function used for feedback
1402
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1403
    @param _sleep_fn: Function to sleep (for unittest use only)
1404

1405
    """
1406
    self.down = set(node_list)
1407
    self.up = set()
1408
    self.node2ip = node2ip
1409
    self.success = True
1410
    self.action_cb = action_cb
1411
    self.port = port
1412
    self.feedback_fn = feedback_fn
1413
    self._ping_fn = _ping_fn
1414
    self._sleep_fn = _sleep_fn
1415

    
1416
  def __call__(self):
1417
    """When called we run action_cb.
1418

1419
    @raises utils.RetryAgain: When there are still down nodes
1420

1421
    """
1422
    if not self.action_cb(self.up):
1423
      self.success = False
1424

    
1425
    if self.down:
1426
      raise utils.RetryAgain()
1427
    else:
1428
      return self.success
1429

    
1430
  def Wait(self, secs):
1431
    """Checks if a host is up or waits remaining seconds.
1432

1433
    @param secs: The secs remaining
1434

1435
    """
1436
    start = time.time()
1437
    for node in self.down:
1438
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1439
                       live_port_needed=True):
1440
        self.feedback_fn("Node %s became available" % node)
1441
        self.up.add(node)
1442
        self.down -= self.up
1443
        # If we have a node available there is the possibility to run the
1444
        # action callback successfully, therefore we don't wait and return
1445
        return
1446

    
1447
    self._sleep_fn(max(0.0, start + secs - time.time()))
1448

    
1449

    
1450
def _RunWhenNodesReachable(node_list, action_cb, interval):
1451
  """Run action_cb when nodes become reachable.
1452

1453
  @param node_list: The list of nodes to be reachable
1454
  @param action_cb: Callback called when a new host is reachable
1455
  @param interval: The earliest time to retry
1456

1457
  """
1458
  client = GetClient()
1459
  cluster_info = client.QueryClusterInfo()
1460
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1461
    family = netutils.IPAddress.family
1462
  else:
1463
    family = netutils.IP6Address.family
1464

    
1465
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1466
                 for node in node_list)
1467

    
1468
  port = netutils.GetDaemonPort(constants.NODED)
1469
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1470
                                        ToStdout)
1471

    
1472
  try:
1473
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1474
                       wait_fn=helper.Wait)
1475
  except utils.RetryTimeout:
1476
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1477
             " again:\n  - %s", "  - ".join(helper.down))
1478
    return False
1479

    
1480

    
1481
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1482
                          _instance_start_fn=_InstanceStart):
1483
  """Start the instances conditional based on node_states.
1484

1485
  @param opts: The command line options selected by the user
1486
  @param inst_map: A dict of inst -> nodes mapping
1487
  @param nodes_online: A list of nodes online
1488
  @param _instance_start_fn: Callback to start instances (unittest use only)
1489
  @return: Success of the operation on all instances
1490

1491
  """
1492
  start_inst_list = []
1493
  for (inst, nodes) in inst_map.items():
1494
    if not (nodes - nodes_online):
1495
      # All nodes the instance lives on are back online
1496
      start_inst_list.append(inst)
1497

    
1498
  for inst in start_inst_list:
1499
    del inst_map[inst]
1500

    
1501
  if start_inst_list:
1502
    return _instance_start_fn(opts, start_inst_list, True)
1503

    
1504
  return True
1505

    
1506

    
1507
def _EpoOn(opts, full_node_list, node_list, inst_map):
1508
  """Does the actual power on.
1509

1510
  @param opts: The command line options selected by the user
1511
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1512
                         OOB)
1513
  @param node_list: The list of nodes to operate on (all need to support OOB)
1514
  @param inst_map: A dict of inst -> nodes mapping
1515
  @return: The desired exit status
1516

1517
  """
1518
  if node_list and not _OobPower(opts, node_list, False):
1519
    ToStderr("Not all nodes seem to get back up, investigate and start"
1520
             " manually if needed")
1521

    
1522
  # Wait for the nodes to be back up
1523
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1524

    
1525
  ToStdout("Waiting until all nodes are available again")
1526
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1527
    ToStderr("Please investigate and start stopped instances manually")
1528
    return constants.EXIT_FAILURE
1529

    
1530
  return constants.EXIT_SUCCESS
1531

    
1532

    
1533
def _EpoOff(opts, node_list, inst_map):
1534
  """Does the actual power off.
1535

1536
  @param opts: The command line options selected by the user
1537
  @param node_list: The list of nodes to operate on (all need to support OOB)
1538
  @param inst_map: A dict of inst -> nodes mapping
1539
  @return: The desired exit status
1540

1541
  """
1542
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1543
    ToStderr("Please investigate and stop instances manually before continuing")
1544
    return constants.EXIT_FAILURE
1545

    
1546
  if not node_list:
1547
    return constants.EXIT_SUCCESS
1548

    
1549
  if _OobPower(opts, node_list, False):
1550
    return constants.EXIT_SUCCESS
1551
  else:
1552
    return constants.EXIT_FAILURE
1553

    
1554

    
1555
def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1556
        _confirm_fn=ConfirmOperation,
1557
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1558
  """EPO operations.
1559

1560
  @param opts: the command line options selected by the user
1561
  @type args: list
1562
  @param args: should contain only one element, the subcommand
1563
  @rtype: int
1564
  @return: the desired exit code
1565

1566
  """
1567
  if opts.groups and opts.show_all:
1568
    _stderr_fn("Only one of --groups or --all are allowed")
1569
    return constants.EXIT_FAILURE
1570
  elif args and opts.show_all:
1571
    _stderr_fn("Arguments in combination with --all are not allowed")
1572
    return constants.EXIT_FAILURE
1573

    
1574
  if qcl is None:
1575
    # Query client
1576
    qcl = GetClient()
1577

    
1578
  if opts.groups:
1579
    node_query_list = \
1580
      itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1581
  else:
1582
    node_query_list = args
1583

    
1584
  result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1585
                                            "sinst_list", "powered", "offline"],
1586
                          False)
1587

    
1588
  all_nodes = map(compat.fst, result)
1589
  node_list = []
1590
  inst_map = {}
1591
  for (node, master, pinsts, sinsts, powered, offline) in result:
1592
    if not offline:
1593
      for inst in (pinsts + sinsts):
1594
        if inst in inst_map:
1595
          if not master:
1596
            inst_map[inst].add(node)
1597
        elif master:
1598
          inst_map[inst] = set()
1599
        else:
1600
          inst_map[inst] = set([node])
1601

    
1602
    if master and opts.on:
1603
      # We ignore the master for turning on the machines, in fact we are
1604
      # already operating on the master at this point :)
1605
      continue
1606
    elif master and not opts.show_all:
1607
      _stderr_fn("%s is the master node, please do a master-failover to another"
1608
                 " node not affected by the EPO or use --all if you intend to"
1609
                 " shutdown the whole cluster", node)
1610
      return constants.EXIT_FAILURE
1611
    elif powered is None:
1612
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1613
                 " handled in a fully automated manner", node)
1614
    elif powered == opts.on:
1615
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1616
    elif not offline or (offline and powered):
1617
      node_list.append(node)
1618

    
1619
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1620
    return constants.EXIT_FAILURE
1621

    
1622
  if opts.on:
1623
    return _on_fn(opts, all_nodes, node_list, inst_map)
1624
  else:
1625
    return _off_fn(opts, node_list, inst_map)
1626

    
1627

    
1628
def _GetCreateCommand(info):
1629
  buf = StringIO()
1630
  buf.write("gnt-cluster init")
1631
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1632
  buf.write(" ")
1633
  buf.write(info["name"])
1634
  return buf.getvalue()
1635

    
1636

    
1637
def ShowCreateCommand(opts, args):
1638
  """Shows the command that can be used to re-create the cluster.
1639

1640
  Currently it works only for ipolicy specs.
1641

1642
  """
1643
  cl = GetClient()
1644
  result = cl.QueryClusterInfo()
1645
  ToStdout(_GetCreateCommand(result))
1646

    
1647

    
1648
def _RunCommandAndReport(cmd):
1649
  """Run a command and report its output, iff it failed.
1650

1651
  @param cmd: the command to execute
1652
  @type cmd: list
1653
  @rtype: bool
1654
  @return: False, if the execution failed.
1655

1656
  """
1657
  result = utils.RunCmd(cmd)
1658
  if result.failed:
1659
    ToStderr("Command %s failed: %s; Output %s" %
1660
             (cmd, result.fail_reason, result.output))
1661
    return False
1662
  return True
1663

    
1664

    
1665
def _VerifyCommand(cmd):
1666
  """Verify that a given command succeeds on all online nodes.
1667

1668
  As this function is intended to run during upgrades, it
1669
  is implemented in such a way that it still works, if all Ganeti
1670
  daemons are down.
1671

1672
  @param cmd: the command to execute
1673
  @type cmd: list
1674
  @rtype: list
1675
  @return: the list of node names that are online where
1676
      the command failed.
1677

1678
  """
1679
  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1680

    
1681
  nodes = ssconf.SimpleStore().GetOnlineNodeList()
1682
  master_node = ssconf.SimpleStore().GetMasterNode()
1683
  cluster_name = ssconf.SimpleStore().GetClusterName()
1684

    
1685
  # If master node is in 'nodes', make sure master node is at list end
1686
  if master_node in nodes:
1687
    nodes.remove(master_node)
1688
    nodes.append(master_node)
1689

    
1690
  failed = []
1691

    
1692
  srun = ssh.SshRunner(cluster_name=cluster_name)
1693
  for name in nodes:
1694
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1695
    if result.exit_code != 0:
1696
      failed.append(name)
1697

    
1698
  return failed
1699

    
1700

    
1701
def _VerifyVersionInstalled(versionstring):
1702
  """Verify that the given version of ganeti is installed on all online nodes.
1703

1704
  Do nothing, if this is the case, otherwise print an appropriate
1705
  message to stderr.
1706

1707
  @param versionstring: the version to check for
1708
  @type versionstring: string
1709
  @rtype: bool
1710
  @return: True, if the version is installed on all online nodes
1711

1712
  """
1713
  badnodes = _VerifyCommand(["test", "-d",
1714
                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
1715
  if badnodes:
1716
    ToStderr("Ganeti version %s not installed on nodes %s"
1717
             % (versionstring, ", ".join(badnodes)))
1718
    return False
1719

    
1720
  return True
1721

    
1722

    
1723
def _GetRunning():
1724
  """Determine the list of running jobs.
1725

1726
  @rtype: list
1727
  @return: the number of jobs still running
1728

1729
  """
1730
  cl = GetClient()
1731
  qfilter = qlang.MakeSimpleFilter("status",
1732
                                   frozenset([constants.JOB_STATUS_RUNNING]))
1733
  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1734

    
1735

    
1736
def _SetGanetiVersion(versionstring):
1737
  """Set the active version of ganeti to the given versionstring
1738

1739
  @type versionstring: string
1740
  @rtype: list
1741
  @return: the list of nodes where the version change failed
1742

1743
  """
1744
  failed = []
1745
  if constants.HAS_GNU_LN:
1746
    failed.extend(_VerifyCommand(
1747
        ["ln", "-s", "-f", "-T",
1748
         os.path.join(pathutils.PKGLIBDIR, versionstring),
1749
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1750
    failed.extend(_VerifyCommand(
1751
        ["ln", "-s", "-f", "-T",
1752
         os.path.join(pathutils.SHAREDIR, versionstring),
1753
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1754
  else:
1755
    failed.extend(_VerifyCommand(
1756
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1757
    failed.extend(_VerifyCommand(
1758
        ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1759
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1760
    failed.extend(_VerifyCommand(
1761
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1762
    failed.extend(_VerifyCommand(
1763
        ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1764
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1765
  return list(set(failed))
1766

    
1767

    
1768
def _ExecuteCommands(fns):
1769
  """Execute a list of functions, in reverse order.
1770

1771
  @type fns: list of functions.
1772
  @param fns: the functions to be executed.
1773

1774
  """
1775
  for fn in reversed(fns):
1776
    fn()
1777

    
1778

    
1779
def _GetConfigVersion():
1780
  """Determine the version the configuration file currently has.
1781

1782
  @rtype: tuple or None
1783
  @return: (major, minor, revision) if the version can be determined,
1784
      None otherwise
1785

1786
  """
1787
  config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1788
  try:
1789
    config_version = config_data["version"]
1790
  except KeyError:
1791
    return None
1792
  return utils.SplitVersion(config_version)
1793

    
1794

    
1795
def _ReadIntentToUpgrade():
1796
  """Read the file documenting the intent to upgrade the cluster.
1797

1798
  @rtype: (string, string) or (None, None)
1799
  @return: (old version, version to upgrade to), if the file exists,
1800
      and (None, None) otherwise.
1801

1802
  """
1803
  if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1804
    return (None, None)
1805

    
1806
  contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1807
  contents = utils.UnescapeAndSplit(contentstring)
1808
  if len(contents) != 3:
1809
    # file syntactically mal-formed
1810
    return (None, None)
1811
  return (contents[0], contents[1])
1812

    
1813

    
1814
def _WriteIntentToUpgrade(version):
1815
  """Write file documenting the intent to upgrade the cluster.
1816

1817
  @type version: string
1818
  @param version: the version we intent to upgrade to
1819

1820
  """
1821
  utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1822
                  data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1823
                                            "%d" % os.getpid()]))
1824

    
1825

    
1826
def _UpgradeBeforeConfigurationChange(versionstring):
1827
  """
1828
  Carry out all the tasks necessary for an upgrade that happen before
1829
  the configuration file, or Ganeti version, changes.
1830

1831
  @type versionstring: string
1832
  @param versionstring: the version to upgrade to
1833
  @rtype: (bool, list)
1834
  @return: tuple of a bool indicating success and a list of rollback tasks
1835

1836
  """
1837
  rollback = []
1838

    
1839
  if not _VerifyVersionInstalled(versionstring):
1840
    return (False, rollback)
1841

    
1842
  _WriteIntentToUpgrade(versionstring)
1843
  rollback.append(
1844
    lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1845

    
1846
  ToStdout("Draining queue")
1847
  client = GetClient()
1848
  client.SetQueueDrainFlag(True)
1849

    
1850
  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1851

    
1852
  if utils.SimpleRetry(0, _GetRunning,
1853
                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
1854
                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1855
    ToStderr("Failed to completely empty the queue.")
1856
    return (False, rollback)
1857

    
1858
  ToStdout("Stopping daemons on master node.")
1859
  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1860
    return (False, rollback)
1861

    
1862
  if not _VerifyVersionInstalled(versionstring):
1863
    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1864
    return (False, rollback)
1865

    
1866
  ToStdout("Stopping daemons everywhere.")
1867
  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1868
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1869
  if badnodes:
1870
    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
1871
    return (False, rollback)
1872

    
1873
  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
1874
                           "lib/ganeti%d.tar" % time.time())
1875
  ToStdout("Backing up configuration as %s" % backuptar)
1876
  if not _RunCommandAndReport(["tar", "cf", backuptar,
1877
                               pathutils.DATA_DIR]):
1878
    return (False, rollback)
1879

    
1880
  return (True, rollback)
1881

    
1882

    
1883
def _VersionSpecificDowngrade():
1884
  """
1885
  Perform any additional downrade tasks that are version specific
1886
  and need to be done just after the configuration downgrade. This
1887
  function needs to be idempotent, so that it can be redone if the
1888
  downgrade procedure gets interrupted after changing the
1889
  configuration.
1890

1891
  Note that this function has to be reset with every version bump.
1892

1893
  @return: True upon success
1894
  """
1895
  ToStdout("Performing version-specific downgrade tasks.")
1896
  return True
1897

    
1898

    
1899
def _SwitchVersionAndConfig(versionstring, downgrade):
1900
  """
1901
  Switch to the new Ganeti version and change the configuration,
1902
  in correct order.
1903

1904
  @type versionstring: string
1905
  @param versionstring: the version to change to
1906
  @type downgrade: bool
1907
  @param downgrade: True, if the configuration should be downgraded
1908
  @rtype: (bool, list)
1909
  @return: tupe of a bool indicating success, and a list of
1910
      additional rollback tasks
1911

1912
  """
1913
  rollback = []
1914
  if downgrade:
1915
    ToStdout("Downgrading configuration")
1916
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
1917
      return (False, rollback)
1918
    # Note: version specific downgrades need to be done before switching
1919
    # binaries, so that we still have the knowledgeable binary if the downgrade
1920
    # process gets interrupted at this point.
1921
    if not _VersionSpecificDowngrade():
1922
      return (False, rollback)
1923

    
1924
  # Configuration change is the point of no return. From then onwards, it is
1925
  # safer to push through the up/dowgrade than to try to roll it back.
1926

    
1927
  ToStdout("Switching to version %s on all nodes" % versionstring)
1928
  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
1929
  badnodes = _SetGanetiVersion(versionstring)
1930
  if badnodes:
1931
    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
1932
             % (versionstring, ", ".join(badnodes)))
1933
    if not downgrade:
1934
      return (False, rollback)
1935

    
1936
  # Now that we have changed to the new version of Ganeti we should
1937
  # not communicate over luxi any more, as luxi might have changed in
1938
  # incompatible ways. Therefore, manually call the corresponding ganeti
1939
  # commands using their canonical (version independent) path.
1940

    
1941
  if not downgrade:
1942
    ToStdout("Upgrading configuration")
1943
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
1944
      return (False, rollback)
1945

    
1946
  return (True, rollback)
1947

    
1948

    
1949
def _UpgradeAfterConfigurationChange(oldversion):
1950
  """
1951
  Carry out the upgrade actions necessary after switching to the new
1952
  Ganeti version and updating the configuration.
1953

1954
  As this part is run at a time where the new version of Ganeti is already
1955
  running, no communication should happen via luxi, as this is not a stable
1956
  interface. Also, as the configuration change is the point of no return,
1957
  all actions are pushed trough, even if some of them fail.
1958

1959
  @param oldversion: the version the upgrade started from
1960
  @type oldversion: string
1961
  @rtype: int
1962
  @return: the intended return value
1963

1964
  """
1965
  returnvalue = 0
1966

    
1967
  ToStdout("Ensuring directories everywhere.")
1968
  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
1969
  if badnodes:
1970
    ToStderr("Warning: failed to ensure directories on %s." %
1971
             (", ".join(badnodes)))
1972
    returnvalue = 1
1973

    
1974
  ToStdout("Starting daemons everywhere.")
1975
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1976
  if badnodes:
1977
    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
1978
    returnvalue = 1
1979

    
1980
  ToStdout("Redistributing the configuration.")
1981
  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
1982
    returnvalue = 1
1983

    
1984
  ToStdout("Restarting daemons everywhere.")
1985
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1986
  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1987
  if badnodes:
1988
    ToStderr("Warning: failed to start daemons on %s." %
1989
             (", ".join(list(set(badnodes))),))
1990
    returnvalue = 1
1991

    
1992
  ToStdout("Undraining the queue.")
1993
  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
1994
    returnvalue = 1
1995

    
1996
  _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
1997

    
1998
  ToStdout("Running post-upgrade hooks")
1999
  if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
2000
    returnvalue = 1
2001

    
2002
  ToStdout("Verifying cluster.")
2003
  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2004
    returnvalue = 1
2005

    
2006
  return returnvalue
2007

    
2008

    
2009
def UpgradeGanetiCommand(opts, args):
2010
  """Upgrade a cluster to a new ganeti version.
2011

2012
  @param opts: the command line options selected by the user
2013
  @type args: list
2014
  @param args: should be an empty list
2015
  @rtype: int
2016
  @return: the desired exit code
2017

2018
  """
2019
  if ((not opts.resume and opts.to is None)
2020
      or (opts.resume and opts.to is not None)):
2021
    ToStderr("Precisely one of the options --to and --resume"
2022
             " has to be given")
2023
    return 1
2024

    
2025
  oldversion = constants.RELEASE_VERSION
2026

    
2027
  if opts.resume:
2028
    ssconf.CheckMaster(False)
2029
    oldversion, versionstring = _ReadIntentToUpgrade()
2030
    if versionstring is None:
2031
      return 0
2032
    version = utils.version.ParseVersion(versionstring)
2033
    if version is None:
2034
      return 1
2035
    configversion = _GetConfigVersion()
2036
    if configversion is None:
2037
      return 1
2038
    # If the upgrade we resume was an upgrade between compatible
2039
    # versions (like 2.10.0 to 2.10.1), the correct configversion
2040
    # does not guarantee that the config has been updated.
2041
    # However, in the case of a compatible update with the configuration
2042
    # not touched, we are running a different dirversion with the same
2043
    # config version.
2044
    config_already_modified = \
2045
      (utils.IsCorrectConfigVersion(version, configversion) and
2046
       not (versionstring != constants.DIR_VERSION and
2047
            configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2048
                              constants.CONFIG_REVISION)))
2049
    if not config_already_modified:
2050
      # We have to start from the beginning; however, some daemons might have
2051
      # already been stopped, so the only way to get into a well-defined state
2052
      # is by starting all daemons again.
2053
      _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2054
  else:
2055
    versionstring = opts.to
2056
    config_already_modified = False
2057
    version = utils.version.ParseVersion(versionstring)
2058
    if version is None:
2059
      ToStderr("Could not parse version string %s" % versionstring)
2060
      return 1
2061

    
2062
  msg = utils.version.UpgradeRange(version)
2063
  if msg is not None:
2064
    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2065
    return 1
2066

    
2067
  if not config_already_modified:
2068
    success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2069
    if not success:
2070
      _ExecuteCommands(rollback)
2071
      return 1
2072
  else:
2073
    rollback = []
2074

    
2075
  downgrade = utils.version.ShouldCfgdowngrade(version)
2076

    
2077
  success, additionalrollback =  \
2078
      _SwitchVersionAndConfig(versionstring, downgrade)
2079
  if not success:
2080
    rollback.extend(additionalrollback)
2081
    _ExecuteCommands(rollback)
2082
    return 1
2083

    
2084
  return _UpgradeAfterConfigurationChange(oldversion)
2085

    
2086

    
2087
commands = {
2088
  "init": (
2089
    InitCluster, [ArgHost(min=1, max=1)],
2090
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2091
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2092
     NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2093
     SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2094
     DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2095
     PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2096
     GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2097
     HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2098
     IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT]
2099
     + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2100
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2101
  "destroy": (
2102
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2103
    "", "Destroy cluster"),
2104
  "rename": (
2105
    RenameCluster, [ArgHost(min=1, max=1)],
2106
    [FORCE_OPT, DRY_RUN_OPT],
2107
    "<new_name>",
2108
    "Renames the cluster"),
2109
  "redist-conf": (
2110
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2111
    [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2112
    "", "Forces a push of the configuration file and ssconf files"
2113
    " to the nodes in the cluster"),
2114
  "verify": (
2115
    VerifyCluster, ARGS_NONE,
2116
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2117
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2118
    "", "Does a check on the cluster configuration"),
2119
  "verify-disks": (
2120
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2121
    "", "Does a check on the cluster disk status"),
2122
  "repair-disk-sizes": (
2123
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2124
    "[instance...]", "Updates mismatches in recorded disk sizes"),
2125
  "master-failover": (
2126
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2127
    "", "Makes the current node the master"),
2128
  "master-ping": (
2129
    MasterPing, ARGS_NONE, [],
2130
    "", "Checks if the master is alive"),
2131
  "version": (
2132
    ShowClusterVersion, ARGS_NONE, [],
2133
    "", "Shows the cluster version"),
2134
  "getmaster": (
2135
    ShowClusterMaster, ARGS_NONE, [],
2136
    "", "Shows the cluster master"),
2137
  "copyfile": (
2138
    ClusterCopyFile, [ArgFile(min=1, max=1)],
2139
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2140
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2141
  "command": (
2142
    RunClusterCommand, [ArgCommand(min=1)],
2143
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2144
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2145
  "info": (
2146
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2147
    "[--roman]", "Show cluster configuration"),
2148
  "list-tags": (
2149
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2150
  "add-tags": (
2151
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2152
    "tag...", "Add tags to the cluster"),
2153
  "remove-tags": (
2154
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2155
    "tag...", "Remove tags from the cluster"),
2156
  "search-tags": (
2157
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2158
    "Searches the tags on all objects on"
2159
    " the cluster for a given pattern (regex)"),
2160
  "queue": (
2161
    QueueOps,
2162
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2163
    [], "drain|undrain|info", "Change queue properties"),
2164
  "watcher": (
2165
    WatcherOps,
2166
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2167
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2168
    [],
2169
    "{pause <timespec>|continue|info}", "Change watcher properties"),
2170
  "modify": (
2171
    SetClusterParams, ARGS_NONE,
2172
    [FORCE_OPT,
2173
     BACKEND_OPT, CP_SIZE_OPT, RQL_OPT, INSTANCE_COMMUNICATION_NETWORK_OPT,
2174
     ENABLED_HV_OPT, HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT,
2175
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT,
2176
     UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT, DRBD_HELPER_OPT,
2177
     DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT,
2178
     DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2179
     USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] +
2180
     SUBMIT_OPTS +
2181
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
2182
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
2183
    "[opts...]",
2184
    "Alters the parameters of the cluster"),
2185
  "renew-crypto": (
2186
    RenewCrypto, ARGS_NONE,
2187
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2188
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2189
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2190
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2191
     NEW_NODE_CERT_OPT],
2192
    "[opts...]",
2193
    "Renews cluster certificates, keys and secrets"),
2194
  "epo": (
2195
    Epo, [ArgUnknown()],
2196
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2197
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2198
    "[opts...] [args]",
2199
    "Performs an emergency power-off on given args"),
2200
  "activate-master-ip": (
2201
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2202
  "deactivate-master-ip": (
2203
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2204
    "Deactivates the master IP"),
2205
  "show-ispecs-cmd": (
2206
    ShowCreateCommand, ARGS_NONE, [], "",
2207
    "Show the command line to re-create the cluster"),
2208
  "upgrade": (
2209
    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2210
    "Upgrade (or downgrade) to a new Ganeti version"),
2211
  }
2212

    
2213

    
2214
#: dictionary with aliases for commands
2215
aliases = {
2216
  "masterfailover": "master-failover",
2217
  "show": "info",
2218
}
2219

    
2220

    
2221
def Main():
2222
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2223
                     aliases=aliases)