Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 353bd75b

History | View | Annotate | Download (71.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import bootstrap
37
from ganeti import compat
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import qlang
45
from ganeti import serializer
46
from ganeti import ssconf
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti.client import base
51

    
52

    
53
ON_OPT = cli_option("--on", default=False,
54
                    action="store_true", dest="on",
55
                    help="Recover from an EPO")
56

    
57
GROUPS_OPT = cli_option("--groups", default=False,
58
                        action="store_true", dest="groups",
59
                        help="Arguments are node groups instead of nodes")
60

    
61
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
62
                            help="Override interactive check for --no-voting",
63
                            default=False, action="store_true")
64

    
65
FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
66
                                help="Unconditionally distribute the"
67
                                " configuration, even if the queue"
68
                                " is drained",
69
                                default=False, action="store_true")
70

    
71
TO_OPT = cli_option("--to", default=None, type="string",
72
                    help="The Ganeti version to upgrade to")
73

    
74
RESUME_OPT = cli_option("--resume", default=False, action="store_true",
75
                        help="Resume any pending Ganeti upgrades")
76

    
77
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
78
_EPO_PING_TIMEOUT = 1 # 1 second
79
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
80

    
81

    
82
def _InitEnabledDiskTemplates(opts):
83
  """Initialize the list of enabled disk templates.
84

85
  """
86
  if opts.enabled_disk_templates:
87
    return opts.enabled_disk_templates.split(",")
88
  else:
89
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
90

    
91

    
92
def _InitVgName(opts, enabled_disk_templates):
93
  """Initialize the volume group name.
94

95
  @type enabled_disk_templates: list of strings
96
  @param enabled_disk_templates: cluster-wide enabled disk templates
97

98
  """
99
  vg_name = None
100
  if opts.vg_name is not None:
101
    vg_name = opts.vg_name
102
    if vg_name:
103
      if not utils.IsLvmEnabled(enabled_disk_templates):
104
        ToStdout("You specified a volume group with --vg-name, but you did not"
105
                 " enable any disk template that uses lvm.")
106
    elif utils.IsLvmEnabled(enabled_disk_templates):
107
      raise errors.OpPrereqError(
108
          "LVM disk templates are enabled, but vg name not set.")
109
  elif utils.IsLvmEnabled(enabled_disk_templates):
110
    vg_name = constants.DEFAULT_VG
111
  return vg_name
112

    
113

    
114
def _InitDrbdHelper(opts, enabled_disk_templates):
115
  """Initialize the DRBD usermode helper.
116

117
  """
118
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
119

    
120
  if not drbd_enabled and opts.drbd_helper is not None:
121
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
122
             " is not enabled.")
123

    
124
  if drbd_enabled:
125
    if opts.drbd_helper is None:
126
      return constants.DEFAULT_DRBD_HELPER
127
    if opts.drbd_helper == '':
128
      raise errors.OpPrereqError(
129
          "Unsetting the drbd usermode helper while enabling DRBD is not"
130
          " allowed.")
131

    
132
  return opts.drbd_helper
133

    
134

    
135
@UsesRPC
136
def InitCluster(opts, args):
137
  """Initialize the cluster.
138

139
  @param opts: the command line options selected by the user
140
  @type args: list
141
  @param args: should contain only one element, the desired
142
      cluster name
143
  @rtype: int
144
  @return: the desired exit code
145

146
  """
147
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
148

    
149
  try:
150
    vg_name = _InitVgName(opts, enabled_disk_templates)
151
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
152
  except errors.OpPrereqError, e:
153
    ToStderr(str(e))
154
    return 1
155

    
156
  master_netdev = opts.master_netdev
157
  if master_netdev is None:
158
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
159
    if not nic_mode:
160
      # default case, use bridging
161
      master_netdev = constants.DEFAULT_BRIDGE
162
    elif nic_mode == constants.NIC_MODE_OVS:
163
      # default ovs is different from default bridge
164
      master_netdev = constants.DEFAULT_OVS
165
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
166

    
167
  hvlist = opts.enabled_hypervisors
168
  if hvlist is None:
169
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
170
  hvlist = hvlist.split(",")
171

    
172
  hvparams = dict(opts.hvparams)
173
  beparams = opts.beparams
174
  nicparams = opts.nicparams
175

    
176
  diskparams = dict(opts.diskparams)
177

    
178
  # check the disk template types here, as we cannot rely on the type check done
179
  # by the opcode parameter types
180
  diskparams_keys = set(diskparams.keys())
181
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
182
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
183
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
184
    return 1
185

    
186
  # prepare beparams dict
187
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
188
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
189

    
190
  # prepare nicparams dict
191
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
192
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
193

    
194
  # prepare ndparams dict
195
  if opts.ndparams is None:
196
    ndparams = dict(constants.NDC_DEFAULTS)
197
  else:
198
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
199
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
200

    
201
  # prepare hvparams dict
202
  for hv in constants.HYPER_TYPES:
203
    if hv not in hvparams:
204
      hvparams[hv] = {}
205
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
206
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
207

    
208
  # prepare diskparams dict
209
  for templ in constants.DISK_TEMPLATES:
210
    if templ not in diskparams:
211
      diskparams[templ] = {}
212
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
213
                                         diskparams[templ])
214
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
215

    
216
  # prepare ipolicy dict
217
  ipolicy = CreateIPolicyFromOpts(
218
    ispecs_mem_size=opts.ispecs_mem_size,
219
    ispecs_cpu_count=opts.ispecs_cpu_count,
220
    ispecs_disk_count=opts.ispecs_disk_count,
221
    ispecs_disk_size=opts.ispecs_disk_size,
222
    ispecs_nic_count=opts.ispecs_nic_count,
223
    minmax_ispecs=opts.ipolicy_bounds_specs,
224
    std_ispecs=opts.ipolicy_std_specs,
225
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
226
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
227
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
228
    fill_all=True)
229

    
230
  if opts.candidate_pool_size is None:
231
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
232

    
233
  if opts.mac_prefix is None:
234
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
235

    
236
  uid_pool = opts.uid_pool
237
  if uid_pool is not None:
238
    uid_pool = uidpool.ParseUidPool(uid_pool)
239

    
240
  if opts.prealloc_wipe_disks is None:
241
    opts.prealloc_wipe_disks = False
242

    
243
  external_ip_setup_script = opts.use_external_mip_script
244
  if external_ip_setup_script is None:
245
    external_ip_setup_script = False
246

    
247
  try:
248
    primary_ip_version = int(opts.primary_ip_version)
249
  except (ValueError, TypeError), err:
250
    ToStderr("Invalid primary ip version value: %s" % str(err))
251
    return 1
252

    
253
  master_netmask = opts.master_netmask
254
  try:
255
    if master_netmask is not None:
256
      master_netmask = int(master_netmask)
257
  except (ValueError, TypeError), err:
258
    ToStderr("Invalid master netmask value: %s" % str(err))
259
    return 1
260

    
261
  if opts.disk_state:
262
    disk_state = utils.FlatToDict(opts.disk_state)
263
  else:
264
    disk_state = {}
265

    
266
  hv_state = dict(opts.hv_state)
267

    
268
  default_ialloc_params = opts.default_iallocator_params
269
  bootstrap.InitCluster(cluster_name=args[0],
270
                        secondary_ip=opts.secondary_ip,
271
                        vg_name=vg_name,
272
                        mac_prefix=opts.mac_prefix,
273
                        master_netmask=master_netmask,
274
                        master_netdev=master_netdev,
275
                        file_storage_dir=opts.file_storage_dir,
276
                        shared_file_storage_dir=opts.shared_file_storage_dir,
277
                        gluster_storage_dir=opts.gluster_storage_dir,
278
                        enabled_hypervisors=hvlist,
279
                        hvparams=hvparams,
280
                        beparams=beparams,
281
                        nicparams=nicparams,
282
                        ndparams=ndparams,
283
                        diskparams=diskparams,
284
                        ipolicy=ipolicy,
285
                        candidate_pool_size=opts.candidate_pool_size,
286
                        modify_etc_hosts=opts.modify_etc_hosts,
287
                        modify_ssh_setup=opts.modify_ssh_setup,
288
                        maintain_node_health=opts.maintain_node_health,
289
                        drbd_helper=drbd_helper,
290
                        uid_pool=uid_pool,
291
                        default_iallocator=opts.default_iallocator,
292
                        default_iallocator_params=default_ialloc_params,
293
                        primary_ip_version=primary_ip_version,
294
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
295
                        use_external_mip_script=external_ip_setup_script,
296
                        hv_state=hv_state,
297
                        disk_state=disk_state,
298
                        enabled_disk_templates=enabled_disk_templates,
299
                        )
300
  op = opcodes.OpClusterPostInit()
301
  SubmitOpCode(op, opts=opts)
302
  return 0
303

    
304

    
305
@UsesRPC
306
def DestroyCluster(opts, args):
307
  """Destroy the cluster.
308

309
  @param opts: the command line options selected by the user
310
  @type args: list
311
  @param args: should be an empty list
312
  @rtype: int
313
  @return: the desired exit code
314

315
  """
316
  if not opts.yes_do_it:
317
    ToStderr("Destroying a cluster is irreversible. If you really want"
318
             " destroy this cluster, supply the --yes-do-it option.")
319
    return 1
320

    
321
  op = opcodes.OpClusterDestroy()
322
  master_uuid = SubmitOpCode(op, opts=opts)
323
  # if we reached this, the opcode didn't fail; we can proceed to
324
  # shutdown all the daemons
325
  bootstrap.FinalizeClusterDestroy(master_uuid)
326
  return 0
327

    
328

    
329
def RenameCluster(opts, args):
330
  """Rename the cluster.
331

332
  @param opts: the command line options selected by the user
333
  @type args: list
334
  @param args: should contain only one element, the new cluster name
335
  @rtype: int
336
  @return: the desired exit code
337

338
  """
339
  cl = GetClient()
340

    
341
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
342

    
343
  new_name = args[0]
344
  if not opts.force:
345
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
346
                " connected over the network to the cluster name, the"
347
                " operation is very dangerous as the IP address will be"
348
                " removed from the node and the change may not go through."
349
                " Continue?") % (cluster_name, new_name)
350
    if not AskUser(usertext):
351
      return 1
352

    
353
  op = opcodes.OpClusterRename(name=new_name)
354
  result = SubmitOpCode(op, opts=opts, cl=cl)
355

    
356
  if result:
357
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
358

    
359
  return 0
360

    
361

    
362
def ActivateMasterIp(opts, args):
363
  """Activates the master IP.
364

365
  """
366
  op = opcodes.OpClusterActivateMasterIp()
367
  SubmitOpCode(op)
368
  return 0
369

    
370

    
371
def DeactivateMasterIp(opts, args):
372
  """Deactivates the master IP.
373

374
  """
375
  if not opts.confirm:
376
    usertext = ("This will disable the master IP. All the open connections to"
377
                " the master IP will be closed. To reach the master you will"
378
                " need to use its node IP."
379
                " Continue?")
380
    if not AskUser(usertext):
381
      return 1
382

    
383
  op = opcodes.OpClusterDeactivateMasterIp()
384
  SubmitOpCode(op)
385
  return 0
386

    
387

    
388
def RedistributeConfig(opts, args):
389
  """Forces push of the cluster configuration.
390

391
  @param opts: the command line options selected by the user
392
  @type args: list
393
  @param args: empty list
394
  @rtype: int
395
  @return: the desired exit code
396

397
  """
398
  op = opcodes.OpClusterRedistConf()
399
  if opts.yes_do_it:
400
    SubmitOpCodeToDrainedQueue(op)
401
  else:
402
    SubmitOrSend(op, opts)
403
  return 0
404

    
405

    
406
def ShowClusterVersion(opts, args):
407
  """Write version of ganeti software to the standard output.
408

409
  @param opts: the command line options selected by the user
410
  @type args: list
411
  @param args: should be an empty list
412
  @rtype: int
413
  @return: the desired exit code
414

415
  """
416
  cl = GetClient()
417
  result = cl.QueryClusterInfo()
418
  ToStdout("Software version: %s", result["software_version"])
419
  ToStdout("Internode protocol: %s", result["protocol_version"])
420
  ToStdout("Configuration format: %s", result["config_version"])
421
  ToStdout("OS api version: %s", result["os_api_version"])
422
  ToStdout("Export interface: %s", result["export_version"])
423
  ToStdout("VCS version: %s", result["vcs_version"])
424
  return 0
425

    
426

    
427
def ShowClusterMaster(opts, args):
428
  """Write name of master node to the standard output.
429

430
  @param opts: the command line options selected by the user
431
  @type args: list
432
  @param args: should be an empty list
433
  @rtype: int
434
  @return: the desired exit code
435

436
  """
437
  master = bootstrap.GetMaster()
438
  ToStdout(master)
439
  return 0
440

    
441

    
442
def _FormatGroupedParams(paramsdict, roman=False):
443
  """Format Grouped parameters (be, nic, disk) by group.
444

445
  @type paramsdict: dict of dicts
446
  @param paramsdict: {group: {param: value, ...}, ...}
447
  @rtype: dict of dicts
448
  @return: copy of the input dictionaries with strings as values
449

450
  """
451
  ret = {}
452
  for (item, val) in paramsdict.items():
453
    if isinstance(val, dict):
454
      ret[item] = _FormatGroupedParams(val, roman=roman)
455
    elif roman and isinstance(val, int):
456
      ret[item] = compat.TryToRoman(val)
457
    else:
458
      ret[item] = str(val)
459
  return ret
460

    
461

    
462
def ShowClusterConfig(opts, args):
463
  """Shows cluster information.
464

465
  @param opts: the command line options selected by the user
466
  @type args: list
467
  @param args: should be an empty list
468
  @rtype: int
469
  @return: the desired exit code
470

471
  """
472
  cl = GetClient()
473
  result = cl.QueryClusterInfo()
474

    
475
  if result["tags"]:
476
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
477
  else:
478
    tags = "(none)"
479
  if result["reserved_lvs"]:
480
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
481
  else:
482
    reserved_lvs = "(none)"
483

    
484
  enabled_hv = result["enabled_hypervisors"]
485
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
486
                  if k in enabled_hv)
487

    
488
  info = [
489
    ("Cluster name", result["name"]),
490
    ("Cluster UUID", result["uuid"]),
491

    
492
    ("Creation time", utils.FormatTime(result["ctime"])),
493
    ("Modification time", utils.FormatTime(result["mtime"])),
494

    
495
    ("Master node", result["master"]),
496

    
497
    ("Architecture (this node)",
498
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
499

    
500
    ("Tags", tags),
501

    
502
    ("Default hypervisor", result["default_hypervisor"]),
503
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
504

    
505
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
506

    
507
    ("OS-specific hypervisor parameters",
508
     _FormatGroupedParams(result["os_hvp"])),
509

    
510
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
511

    
512
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
513
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
514

    
515
    ("Cluster parameters", [
516
      ("candidate pool size",
517
       compat.TryToRoman(result["candidate_pool_size"],
518
                         convert=opts.roman_integers)),
519
      ("maximal number of jobs running simultaneously",
520
       compat.TryToRoman(result["max_running_jobs"],
521
                         convert=opts.roman_integers)),
522
      ("mac prefix", result["mac_prefix"]),
523
      ("master netdev", result["master_netdev"]),
524
      ("master netmask", result["master_netmask"]),
525
      ("use external master IP address setup script",
526
       result["use_external_mip_script"]),
527
      ("lvm volume group", result["volume_group_name"]),
528
      ("lvm reserved volumes", reserved_lvs),
529
      ("drbd usermode helper", result["drbd_usermode_helper"]),
530
      ("file storage path", result["file_storage_dir"]),
531
      ("shared file storage path", result["shared_file_storage_dir"]),
532
      ("gluster storage path", result["gluster_storage_dir"]),
533
      ("maintenance of node health", result["maintain_node_health"]),
534
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
535
      ("default instance allocator", result["default_iallocator"]),
536
      ("default instance allocator parameters",
537
       result["default_iallocator_params"]),
538
      ("primary ip version", result["primary_ip_version"]),
539
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
540
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
541
      ("ExtStorage Providers search path",
542
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
543
      ("enabled disk templates",
544
       utils.CommaJoin(result["enabled_disk_templates"])),
545
      ("instance communication network",
546
       result["instance_communication_network"]),
547
      ]),
548

    
549
    ("Default node parameters",
550
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
551

    
552
    ("Default instance parameters",
553
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
554

    
555
    ("Default nic parameters",
556
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
557

    
558
    ("Default disk parameters",
559
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
560

    
561
    ("Instance policy - limits for instances",
562
     FormatPolicyInfo(result["ipolicy"], None, True)),
563
    ]
564

    
565
  PrintGenericInfo(info)
566
  return 0
567

    
568

    
569
def ClusterCopyFile(opts, args):
570
  """Copy a file from master to some nodes.
571

572
  @param opts: the command line options selected by the user
573
  @type args: list
574
  @param args: should contain only one element, the path of
575
      the file to be copied
576
  @rtype: int
577
  @return: the desired exit code
578

579
  """
580
  filename = args[0]
581
  if not os.path.exists(filename):
582
    raise errors.OpPrereqError("No such filename '%s'" % filename,
583
                               errors.ECODE_INVAL)
584

    
585
  cl = GetClient()
586
  qcl = GetClient()
587
  try:
588
    cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
589

    
590
    results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
591
                             secondary_ips=opts.use_replication_network,
592
                             nodegroup=opts.nodegroup)
593
    ports = GetNodesSshPorts(opts.nodes, qcl)
594
  finally:
595
    cl.Close()
596
    qcl.Close()
597

    
598
  srun = ssh.SshRunner(cluster_name)
599
  for (node, port) in zip(results, ports):
600
    if not srun.CopyFileToNode(node, port, filename):
601
      ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
602

    
603
  return 0
604

    
605

    
606
def RunClusterCommand(opts, args):
607
  """Run a command on some nodes.
608

609
  @param opts: the command line options selected by the user
610
  @type args: list
611
  @param args: should contain the command to be run and its arguments
612
  @rtype: int
613
  @return: the desired exit code
614

615
  """
616
  cl = GetClient()
617
  qcl = GetClient()
618

    
619
  command = " ".join(args)
620

    
621
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
622
  ports = GetNodesSshPorts(nodes, qcl)
623

    
624
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
625
                                                    "master_node"])
626

    
627
  srun = ssh.SshRunner(cluster_name=cluster_name)
628

    
629
  # Make sure master node is at list end
630
  if master_node in nodes:
631
    nodes.remove(master_node)
632
    nodes.append(master_node)
633

    
634
  for (name, port) in zip(nodes, ports):
635
    result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
636

    
637
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
638
      # Do not output anything for successful commands
639
      continue
640

    
641
    ToStdout("------------------------------------------------")
642
    if opts.show_machine_names:
643
      for line in result.output.splitlines():
644
        ToStdout("%s: %s", name, line)
645
    else:
646
      ToStdout("node: %s", name)
647
      ToStdout("%s", result.output)
648
    ToStdout("return code = %s", result.exit_code)
649

    
650
  return 0
651

    
652

    
653
def VerifyCluster(opts, args):
654
  """Verify integrity of cluster, performing various test on nodes.
655

656
  @param opts: the command line options selected by the user
657
  @type args: list
658
  @param args: should be an empty list
659
  @rtype: int
660
  @return: the desired exit code
661

662
  """
663
  skip_checks = []
664

    
665
  if opts.skip_nplusone_mem:
666
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
667

    
668
  cl = GetClient()
669

    
670
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
671
                               error_codes=opts.error_codes,
672
                               debug_simulate_errors=opts.simulate_errors,
673
                               skip_checks=skip_checks,
674
                               ignore_errors=opts.ignore_errors,
675
                               group_name=opts.nodegroup)
676
  result = SubmitOpCode(op, cl=cl, opts=opts)
677

    
678
  # Keep track of submitted jobs
679
  jex = JobExecutor(cl=cl, opts=opts)
680

    
681
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
682
    jex.AddJobId(None, status, job_id)
683

    
684
  results = jex.GetResults()
685

    
686
  (bad_jobs, bad_results) = \
687
    map(len,
688
        # Convert iterators to lists
689
        map(list,
690
            # Count errors
691
            map(compat.partial(itertools.ifilterfalse, bool),
692
                # Convert result to booleans in a tuple
693
                zip(*((job_success, len(op_results) == 1 and op_results[0])
694
                      for (job_success, op_results) in results)))))
695

    
696
  if bad_jobs == 0 and bad_results == 0:
697
    rcode = constants.EXIT_SUCCESS
698
  else:
699
    rcode = constants.EXIT_FAILURE
700
    if bad_jobs > 0:
701
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
702

    
703
  return rcode
704

    
705

    
706
def VerifyDisks(opts, args):
707
  """Verify integrity of cluster disks.
708

709
  @param opts: the command line options selected by the user
710
  @type args: list
711
  @param args: should be an empty list
712
  @rtype: int
713
  @return: the desired exit code
714

715
  """
716
  cl = GetClient()
717

    
718
  op = opcodes.OpClusterVerifyDisks()
719

    
720
  result = SubmitOpCode(op, cl=cl, opts=opts)
721

    
722
  # Keep track of submitted jobs
723
  jex = JobExecutor(cl=cl, opts=opts)
724

    
725
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
726
    jex.AddJobId(None, status, job_id)
727

    
728
  retcode = constants.EXIT_SUCCESS
729

    
730
  for (status, result) in jex.GetResults():
731
    if not status:
732
      ToStdout("Job failed: %s", result)
733
      continue
734

    
735
    ((bad_nodes, instances, missing), ) = result
736

    
737
    for node, text in bad_nodes.items():
738
      ToStdout("Error gathering data on node %s: %s",
739
               node, utils.SafeEncode(text[-400:]))
740
      retcode = constants.EXIT_FAILURE
741
      ToStdout("You need to fix these nodes first before fixing instances")
742

    
743
    for iname in instances:
744
      if iname in missing:
745
        continue
746
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
747
      try:
748
        ToStdout("Activating disks for instance '%s'", iname)
749
        SubmitOpCode(op, opts=opts, cl=cl)
750
      except errors.GenericError, err:
751
        nret, msg = FormatError(err)
752
        retcode |= nret
753
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
754

    
755
    if missing:
756
      for iname, ival in missing.iteritems():
757
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
758
        if all_missing:
759
          ToStdout("Instance %s cannot be verified as it lives on"
760
                   " broken nodes", iname)
761
        else:
762
          ToStdout("Instance %s has missing logical volumes:", iname)
763
          ival.sort()
764
          for node, vol in ival:
765
            if node in bad_nodes:
766
              ToStdout("\tbroken node %s /dev/%s", node, vol)
767
            else:
768
              ToStdout("\t%s /dev/%s", node, vol)
769

    
770
      ToStdout("You need to replace or recreate disks for all the above"
771
               " instances if this message persists after fixing broken nodes.")
772
      retcode = constants.EXIT_FAILURE
773
    elif not instances:
774
      ToStdout("No disks need to be activated.")
775

    
776
  return retcode
777

    
778

    
779
def RepairDiskSizes(opts, args):
780
  """Verify sizes of cluster disks.
781

782
  @param opts: the command line options selected by the user
783
  @type args: list
784
  @param args: optional list of instances to restrict check to
785
  @rtype: int
786
  @return: the desired exit code
787

788
  """
789
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
790
  SubmitOpCode(op, opts=opts)
791

    
792

    
793
@UsesRPC
794
def MasterFailover(opts, args):
795
  """Failover the master node.
796

797
  This command, when run on a non-master node, will cause the current
798
  master to cease being master, and the non-master to become new
799
  master.
800

801
  @param opts: the command line options selected by the user
802
  @type args: list
803
  @param args: should be an empty list
804
  @rtype: int
805
  @return: the desired exit code
806

807
  """
808
  if opts.no_voting and not opts.yes_do_it:
809
    usertext = ("This will perform the failover even if most other nodes"
810
                " are down, or if this node is outdated. This is dangerous"
811
                " as it can lead to a non-consistent cluster. Check the"
812
                " gnt-cluster(8) man page before proceeding. Continue?")
813
    if not AskUser(usertext):
814
      return 1
815

    
816
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
817

    
818

    
819
def MasterPing(opts, args):
820
  """Checks if the master is alive.
821

822
  @param opts: the command line options selected by the user
823
  @type args: list
824
  @param args: should be an empty list
825
  @rtype: int
826
  @return: the desired exit code
827

828
  """
829
  try:
830
    cl = GetClient()
831
    cl.QueryClusterInfo()
832
    return 0
833
  except Exception: # pylint: disable=W0703
834
    return 1
835

    
836

    
837
def SearchTags(opts, args):
838
  """Searches the tags on all the cluster.
839

840
  @param opts: the command line options selected by the user
841
  @type args: list
842
  @param args: should contain only one element, the tag pattern
843
  @rtype: int
844
  @return: the desired exit code
845

846
  """
847
  op = opcodes.OpTagsSearch(pattern=args[0])
848
  result = SubmitOpCode(op, opts=opts)
849
  if not result:
850
    return 1
851
  result = list(result)
852
  result.sort()
853
  for path, tag in result:
854
    ToStdout("%s %s", path, tag)
855

    
856

    
857
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
858
  """Reads and verifies an X509 certificate.
859

860
  @type cert_filename: string
861
  @param cert_filename: the path of the file containing the certificate to
862
                        verify encoded in PEM format
863
  @type verify_private_key: bool
864
  @param verify_private_key: whether to verify the private key in addition to
865
                             the public certificate
866
  @rtype: string
867
  @return: a string containing the PEM-encoded certificate.
868

869
  """
870
  try:
871
    pem = utils.ReadFile(cert_filename)
872
  except IOError, err:
873
    raise errors.X509CertError(cert_filename,
874
                               "Unable to read certificate: %s" % str(err))
875

    
876
  try:
877
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
878
  except Exception, err:
879
    raise errors.X509CertError(cert_filename,
880
                               "Unable to load certificate: %s" % str(err))
881

    
882
  if verify_private_key:
883
    try:
884
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
885
    except Exception, err:
886
      raise errors.X509CertError(cert_filename,
887
                                 "Unable to load private key: %s" % str(err))
888

    
889
  return pem
890

    
891

    
892
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
893
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
894
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
895
                 cds_filename, force, new_node_cert):
896
  """Renews cluster certificates, keys and secrets.
897

898
  @type new_cluster_cert: bool
899
  @param new_cluster_cert: Whether to generate a new cluster certificate
900
  @type new_rapi_cert: bool
901
  @param new_rapi_cert: Whether to generate a new RAPI certificate
902
  @type rapi_cert_filename: string
903
  @param rapi_cert_filename: Path to file containing new RAPI certificate
904
  @type new_spice_cert: bool
905
  @param new_spice_cert: Whether to generate a new SPICE certificate
906
  @type spice_cert_filename: string
907
  @param spice_cert_filename: Path to file containing new SPICE certificate
908
  @type spice_cacert_filename: string
909
  @param spice_cacert_filename: Path to file containing the certificate of the
910
                                CA that signed the SPICE certificate
911
  @type new_confd_hmac_key: bool
912
  @param new_confd_hmac_key: Whether to generate a new HMAC key
913
  @type new_cds: bool
914
  @param new_cds: Whether to generate a new cluster domain secret
915
  @type cds_filename: string
916
  @param cds_filename: Path to file containing new cluster domain secret
917
  @type force: bool
918
  @param force: Whether to ask user for confirmation
919
  @type new_node_cert: string
920
  @param new_node_cert: Whether to generate new node certificates
921

922
  """
923
  if new_rapi_cert and rapi_cert_filename:
924
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
925
             " options can be specified at the same time.")
926
    return 1
927

    
928
  if new_cds and cds_filename:
929
    ToStderr("Only one of the --new-cluster-domain-secret and"
930
             " --cluster-domain-secret options can be specified at"
931
             " the same time.")
932
    return 1
933

    
934
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
935
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
936
             " and --spice-ca-certificate must not be used.")
937
    return 1
938

    
939
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
940
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
941
             " specified.")
942
    return 1
943

    
944
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
945
  try:
946
    if rapi_cert_filename:
947
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
948
    if spice_cert_filename:
949
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
950
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
951
  except errors.X509CertError, err:
952
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
953
    return 1
954

    
955
  if cds_filename:
956
    try:
957
      cds = utils.ReadFile(cds_filename)
958
    except Exception, err: # pylint: disable=W0703
959
      ToStderr("Can't load new cluster domain secret from %s: %s" %
960
               (cds_filename, str(err)))
961
      return 1
962
  else:
963
    cds = None
964

    
965
  if not force:
966
    usertext = ("This requires all daemons on all nodes to be restarted and"
967
                " may take some time. Continue?")
968
    if not AskUser(usertext):
969
      return 1
970

    
971
  def _RenewCryptoInner(ctx):
972
    ctx.feedback_fn("Updating certificates and keys")
973
    # Note: the node certificate will be generated in the LU
974
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
975
                                    new_rapi_cert,
976
                                    new_spice_cert,
977
                                    new_confd_hmac_key,
978
                                    new_cds,
979
                                    rapi_cert_pem=rapi_cert_pem,
980
                                    spice_cert_pem=spice_cert_pem,
981
                                    spice_cacert_pem=spice_cacert_pem,
982
                                    cds=cds)
983

    
984
    files_to_copy = []
985

    
986
    if new_cluster_cert:
987
      files_to_copy.append(pathutils.NODED_CERT_FILE)
988

    
989
    if new_rapi_cert or rapi_cert_pem:
990
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
991

    
992
    if new_spice_cert or spice_cert_pem:
993
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
994
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
995

    
996
    if new_confd_hmac_key:
997
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
998

    
999
    if new_cds or cds:
1000
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
1001

    
1002
    if files_to_copy:
1003
      for node_name in ctx.nonmaster_nodes:
1004
        port = ctx.ssh_ports[node_name]
1005
        ctx.feedback_fn("Copying %s to %s:%d" %
1006
                        (", ".join(files_to_copy), node_name, port))
1007
        for file_name in files_to_copy:
1008
          ctx.ssh.CopyFileToNode(node_name, port, file_name)
1009

    
1010
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1011

    
1012
  ToStdout("All requested certificates and keys have been replaced."
1013
           " Running \"gnt-cluster verify\" now is recommended.")
1014

    
1015
  if new_node_cert:
1016
    cl = GetClient()
1017
    renew_op = opcodes.OpClusterRenewCrypto()
1018
    SubmitOpCode(renew_op, cl=cl)
1019

    
1020
  return 0
1021

    
1022

    
1023
def RenewCrypto(opts, args):
1024
  """Renews cluster certificates, keys and secrets.
1025

1026
  """
1027
  return _RenewCrypto(opts.new_cluster_cert,
1028
                      opts.new_rapi_cert,
1029
                      opts.rapi_cert,
1030
                      opts.new_spice_cert,
1031
                      opts.spice_cert,
1032
                      opts.spice_cacert,
1033
                      opts.new_confd_hmac_key,
1034
                      opts.new_cluster_domain_secret,
1035
                      opts.cluster_domain_secret,
1036
                      opts.force,
1037
                      opts.new_node_cert)
1038

    
1039

    
1040
def _GetEnabledDiskTemplates(opts):
1041
  """Determine the list of enabled disk templates.
1042

1043
  """
1044
  if opts.enabled_disk_templates:
1045
    return opts.enabled_disk_templates.split(",")
1046
  else:
1047
    return None
1048

    
1049

    
1050
def _GetVgName(opts, enabled_disk_templates):
1051
  """Determine the volume group name.
1052

1053
  @type enabled_disk_templates: list of strings
1054
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1055

1056
  """
1057
  # consistency between vg name and enabled disk templates
1058
  vg_name = None
1059
  if opts.vg_name is not None:
1060
    vg_name = opts.vg_name
1061
  if enabled_disk_templates:
1062
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1063
      ToStdout("You specified a volume group with --vg-name, but you did not"
1064
               " enable any of the following lvm-based disk templates: %s" %
1065
               utils.CommaJoin(constants.DTS_LVM))
1066
  return vg_name
1067

    
1068

    
1069
def _GetDrbdHelper(opts, enabled_disk_templates):
1070
  """Determine the DRBD usermode helper.
1071

1072
  """
1073
  drbd_helper = opts.drbd_helper
1074
  if enabled_disk_templates:
1075
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1076
    if not drbd_enabled and opts.drbd_helper:
1077
      ToStdout("You specified a DRBD usermode helper with "
1078
               " --drbd-usermode-helper while DRBD is not enabled.")
1079
  return drbd_helper
1080

    
1081

    
1082
def SetClusterParams(opts, args):
1083
  """Modify the cluster.
1084

1085
  @param opts: the command line options selected by the user
1086
  @type args: list
1087
  @param args: should be an empty list
1088
  @rtype: int
1089
  @return: the desired exit code
1090

1091
  """
1092
  if not (opts.vg_name is not None or
1093
          opts.drbd_helper is not None or
1094
          opts.enabled_hypervisors or opts.hvparams or
1095
          opts.beparams or opts.nicparams or
1096
          opts.ndparams or opts.diskparams or
1097
          opts.candidate_pool_size is not None or
1098
          opts.max_running_jobs is not None or
1099
          opts.uid_pool is not None or
1100
          opts.maintain_node_health is not None or
1101
          opts.add_uids is not None or
1102
          opts.remove_uids is not None or
1103
          opts.default_iallocator is not None or
1104
          opts.default_iallocator_params or
1105
          opts.reserved_lvs is not None or
1106
          opts.master_netdev is not None or
1107
          opts.master_netmask is not None or
1108
          opts.use_external_mip_script is not None or
1109
          opts.prealloc_wipe_disks is not None or
1110
          opts.hv_state or
1111
          opts.enabled_disk_templates or
1112
          opts.disk_state or
1113
          opts.ipolicy_bounds_specs is not None or
1114
          opts.ipolicy_std_specs is not None or
1115
          opts.ipolicy_disk_templates is not None or
1116
          opts.ipolicy_vcpu_ratio is not None or
1117
          opts.ipolicy_spindle_ratio is not None or
1118
          opts.modify_etc_hosts is not None or
1119
          opts.file_storage_dir is not None or
1120
          opts.instance_communication_network is not None):
1121
    ToStderr("Please give at least one of the parameters.")
1122
    return 1
1123

    
1124
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1125
  vg_name = _GetVgName(opts, enabled_disk_templates)
1126

    
1127
  try:
1128
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1129
  except errors.OpPrereqError, e:
1130
    ToStderr(str(e))
1131
    return 1
1132

    
1133
  hvlist = opts.enabled_hypervisors
1134
  if hvlist is not None:
1135
    hvlist = hvlist.split(",")
1136

    
1137
  # a list of (name, dict) we can pass directly to dict() (or [])
1138
  hvparams = dict(opts.hvparams)
1139
  for hv_params in hvparams.values():
1140
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1141

    
1142
  diskparams = dict(opts.diskparams)
1143

    
1144
  for dt_params in diskparams.values():
1145
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1146

    
1147
  beparams = opts.beparams
1148
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1149

    
1150
  nicparams = opts.nicparams
1151
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1152

    
1153
  ndparams = opts.ndparams
1154
  if ndparams is not None:
1155
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1156

    
1157
  ipolicy = CreateIPolicyFromOpts(
1158
    minmax_ispecs=opts.ipolicy_bounds_specs,
1159
    std_ispecs=opts.ipolicy_std_specs,
1160
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1161
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1162
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1163
    )
1164

    
1165
  mnh = opts.maintain_node_health
1166

    
1167
  uid_pool = opts.uid_pool
1168
  if uid_pool is not None:
1169
    uid_pool = uidpool.ParseUidPool(uid_pool)
1170

    
1171
  add_uids = opts.add_uids
1172
  if add_uids is not None:
1173
    add_uids = uidpool.ParseUidPool(add_uids)
1174

    
1175
  remove_uids = opts.remove_uids
1176
  if remove_uids is not None:
1177
    remove_uids = uidpool.ParseUidPool(remove_uids)
1178

    
1179
  if opts.reserved_lvs is not None:
1180
    if opts.reserved_lvs == "":
1181
      opts.reserved_lvs = []
1182
    else:
1183
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1184

    
1185
  if opts.master_netmask is not None:
1186
    try:
1187
      opts.master_netmask = int(opts.master_netmask)
1188
    except ValueError:
1189
      ToStderr("The --master-netmask option expects an int parameter.")
1190
      return 1
1191

    
1192
  ext_ip_script = opts.use_external_mip_script
1193

    
1194
  if opts.disk_state:
1195
    disk_state = utils.FlatToDict(opts.disk_state)
1196
  else:
1197
    disk_state = {}
1198

    
1199
  hv_state = dict(opts.hv_state)
1200

    
1201
  op = opcodes.OpClusterSetParams(
1202
    vg_name=vg_name,
1203
    drbd_helper=drbd_helper,
1204
    enabled_hypervisors=hvlist,
1205
    hvparams=hvparams,
1206
    os_hvp=None,
1207
    beparams=beparams,
1208
    nicparams=nicparams,
1209
    ndparams=ndparams,
1210
    diskparams=diskparams,
1211
    ipolicy=ipolicy,
1212
    candidate_pool_size=opts.candidate_pool_size,
1213
    max_running_jobs=opts.max_running_jobs,
1214
    maintain_node_health=mnh,
1215
    modify_etc_hosts=opts.modify_etc_hosts,
1216
    uid_pool=uid_pool,
1217
    add_uids=add_uids,
1218
    remove_uids=remove_uids,
1219
    default_iallocator=opts.default_iallocator,
1220
    default_iallocator_params=opts.default_iallocator_params,
1221
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1222
    master_netdev=opts.master_netdev,
1223
    master_netmask=opts.master_netmask,
1224
    reserved_lvs=opts.reserved_lvs,
1225
    use_external_mip_script=ext_ip_script,
1226
    hv_state=hv_state,
1227
    disk_state=disk_state,
1228
    enabled_disk_templates=enabled_disk_templates,
1229
    force=opts.force,
1230
    file_storage_dir=opts.file_storage_dir,
1231
    instance_communication_network=opts.instance_communication_network
1232
    )
1233
  return base.GetResult(None, opts, SubmitOrSend(op, opts))
1234

    
1235

    
1236
def QueueOps(opts, args):
1237
  """Queue operations.
1238

1239
  @param opts: the command line options selected by the user
1240
  @type args: list
1241
  @param args: should contain only one element, the subcommand
1242
  @rtype: int
1243
  @return: the desired exit code
1244

1245
  """
1246
  command = args[0]
1247
  client = GetClient()
1248
  if command in ("drain", "undrain"):
1249
    drain_flag = command == "drain"
1250
    client.SetQueueDrainFlag(drain_flag)
1251
  elif command == "info":
1252
    result = client.QueryConfigValues(["drain_flag"])
1253
    if result[0]:
1254
      val = "set"
1255
    else:
1256
      val = "unset"
1257
    ToStdout("The drain flag is %s" % val)
1258
  else:
1259
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1260
                               errors.ECODE_INVAL)
1261

    
1262
  return 0
1263

    
1264

    
1265
def _ShowWatcherPause(until):
1266
  if until is None or until < time.time():
1267
    ToStdout("The watcher is not paused.")
1268
  else:
1269
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1270

    
1271

    
1272
def WatcherOps(opts, args):
1273
  """Watcher operations.
1274

1275
  @param opts: the command line options selected by the user
1276
  @type args: list
1277
  @param args: should contain only one element, the subcommand
1278
  @rtype: int
1279
  @return: the desired exit code
1280

1281
  """
1282
  command = args[0]
1283
  client = GetClient()
1284

    
1285
  if command == "continue":
1286
    client.SetWatcherPause(None)
1287
    ToStdout("The watcher is no longer paused.")
1288

    
1289
  elif command == "pause":
1290
    if len(args) < 2:
1291
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1292

    
1293
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1294
    _ShowWatcherPause(result)
1295

    
1296
  elif command == "info":
1297
    result = client.QueryConfigValues(["watcher_pause"])
1298
    _ShowWatcherPause(result[0])
1299

    
1300
  else:
1301
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1302
                               errors.ECODE_INVAL)
1303

    
1304
  return 0
1305

    
1306

    
1307
def _OobPower(opts, node_list, power):
1308
  """Puts the node in the list to desired power state.
1309

1310
  @param opts: The command line options selected by the user
1311
  @param node_list: The list of nodes to operate on
1312
  @param power: True if they should be powered on, False otherwise
1313
  @return: The success of the operation (none failed)
1314

1315
  """
1316
  if power:
1317
    command = constants.OOB_POWER_ON
1318
  else:
1319
    command = constants.OOB_POWER_OFF
1320

    
1321
  op = opcodes.OpOobCommand(node_names=node_list,
1322
                            command=command,
1323
                            ignore_status=True,
1324
                            timeout=opts.oob_timeout,
1325
                            power_delay=opts.power_delay)
1326
  result = SubmitOpCode(op, opts=opts)
1327
  errs = 0
1328
  for node_result in result:
1329
    (node_tuple, data_tuple) = node_result
1330
    (_, node_name) = node_tuple
1331
    (data_status, _) = data_tuple
1332
    if data_status != constants.RS_NORMAL:
1333
      assert data_status != constants.RS_UNAVAIL
1334
      errs += 1
1335
      ToStderr("There was a problem changing power for %s, please investigate",
1336
               node_name)
1337

    
1338
  if errs > 0:
1339
    return False
1340

    
1341
  return True
1342

    
1343

    
1344
def _InstanceStart(opts, inst_list, start, no_remember=False):
1345
  """Puts the instances in the list to desired state.
1346

1347
  @param opts: The command line options selected by the user
1348
  @param inst_list: The list of instances to operate on
1349
  @param start: True if they should be started, False for shutdown
1350
  @param no_remember: If the instance state should be remembered
1351
  @return: The success of the operation (none failed)
1352

1353
  """
1354
  if start:
1355
    opcls = opcodes.OpInstanceStartup
1356
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1357
  else:
1358
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1359
                           timeout=opts.shutdown_timeout,
1360
                           no_remember=no_remember)
1361
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1362

    
1363
  jex = JobExecutor(opts=opts)
1364

    
1365
  for inst in inst_list:
1366
    ToStdout("Submit %s of instance %s", text_submit, inst)
1367
    op = opcls(instance_name=inst)
1368
    jex.QueueJob(inst, op)
1369

    
1370
  results = jex.GetResults()
1371
  bad_cnt = len([1 for (success, _) in results if not success])
1372

    
1373
  if bad_cnt == 0:
1374
    ToStdout("All instances have been %s successfully", text_success)
1375
  else:
1376
    ToStderr("There were errors while %s instances:\n"
1377
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1378
             len(results))
1379
    return False
1380

    
1381
  return True
1382

    
1383

    
1384
class _RunWhenNodesReachableHelper:
1385
  """Helper class to make shared internal state sharing easier.
1386

1387
  @ivar success: Indicates if all action_cb calls were successful
1388

1389
  """
1390
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1391
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1392
    """Init the object.
1393

1394
    @param node_list: The list of nodes to be reachable
1395
    @param action_cb: Callback called when a new host is reachable
1396
    @type node2ip: dict
1397
    @param node2ip: Node to ip mapping
1398
    @param port: The port to use for the TCP ping
1399
    @param feedback_fn: The function used for feedback
1400
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1401
    @param _sleep_fn: Function to sleep (for unittest use only)
1402

1403
    """
1404
    self.down = set(node_list)
1405
    self.up = set()
1406
    self.node2ip = node2ip
1407
    self.success = True
1408
    self.action_cb = action_cb
1409
    self.port = port
1410
    self.feedback_fn = feedback_fn
1411
    self._ping_fn = _ping_fn
1412
    self._sleep_fn = _sleep_fn
1413

    
1414
  def __call__(self):
1415
    """When called we run action_cb.
1416

1417
    @raises utils.RetryAgain: When there are still down nodes
1418

1419
    """
1420
    if not self.action_cb(self.up):
1421
      self.success = False
1422

    
1423
    if self.down:
1424
      raise utils.RetryAgain()
1425
    else:
1426
      return self.success
1427

    
1428
  def Wait(self, secs):
1429
    """Checks if a host is up or waits remaining seconds.
1430

1431
    @param secs: The secs remaining
1432

1433
    """
1434
    start = time.time()
1435
    for node in self.down:
1436
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1437
                       live_port_needed=True):
1438
        self.feedback_fn("Node %s became available" % node)
1439
        self.up.add(node)
1440
        self.down -= self.up
1441
        # If we have a node available there is the possibility to run the
1442
        # action callback successfully, therefore we don't wait and return
1443
        return
1444

    
1445
    self._sleep_fn(max(0.0, start + secs - time.time()))
1446

    
1447

    
1448
def _RunWhenNodesReachable(node_list, action_cb, interval):
1449
  """Run action_cb when nodes become reachable.
1450

1451
  @param node_list: The list of nodes to be reachable
1452
  @param action_cb: Callback called when a new host is reachable
1453
  @param interval: The earliest time to retry
1454

1455
  """
1456
  client = GetClient()
1457
  cluster_info = client.QueryClusterInfo()
1458
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1459
    family = netutils.IPAddress.family
1460
  else:
1461
    family = netutils.IP6Address.family
1462

    
1463
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1464
                 for node in node_list)
1465

    
1466
  port = netutils.GetDaemonPort(constants.NODED)
1467
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1468
                                        ToStdout)
1469

    
1470
  try:
1471
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1472
                       wait_fn=helper.Wait)
1473
  except utils.RetryTimeout:
1474
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1475
             " again:\n  - %s", "  - ".join(helper.down))
1476
    return False
1477

    
1478

    
1479
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1480
                          _instance_start_fn=_InstanceStart):
1481
  """Start the instances conditional based on node_states.
1482

1483
  @param opts: The command line options selected by the user
1484
  @param inst_map: A dict of inst -> nodes mapping
1485
  @param nodes_online: A list of nodes online
1486
  @param _instance_start_fn: Callback to start instances (unittest use only)
1487
  @return: Success of the operation on all instances
1488

1489
  """
1490
  start_inst_list = []
1491
  for (inst, nodes) in inst_map.items():
1492
    if not (nodes - nodes_online):
1493
      # All nodes the instance lives on are back online
1494
      start_inst_list.append(inst)
1495

    
1496
  for inst in start_inst_list:
1497
    del inst_map[inst]
1498

    
1499
  if start_inst_list:
1500
    return _instance_start_fn(opts, start_inst_list, True)
1501

    
1502
  return True
1503

    
1504

    
1505
def _EpoOn(opts, full_node_list, node_list, inst_map):
1506
  """Does the actual power on.
1507

1508
  @param opts: The command line options selected by the user
1509
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1510
                         OOB)
1511
  @param node_list: The list of nodes to operate on (all need to support OOB)
1512
  @param inst_map: A dict of inst -> nodes mapping
1513
  @return: The desired exit status
1514

1515
  """
1516
  if node_list and not _OobPower(opts, node_list, False):
1517
    ToStderr("Not all nodes seem to get back up, investigate and start"
1518
             " manually if needed")
1519

    
1520
  # Wait for the nodes to be back up
1521
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1522

    
1523
  ToStdout("Waiting until all nodes are available again")
1524
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1525
    ToStderr("Please investigate and start stopped instances manually")
1526
    return constants.EXIT_FAILURE
1527

    
1528
  return constants.EXIT_SUCCESS
1529

    
1530

    
1531
def _EpoOff(opts, node_list, inst_map):
1532
  """Does the actual power off.
1533

1534
  @param opts: The command line options selected by the user
1535
  @param node_list: The list of nodes to operate on (all need to support OOB)
1536
  @param inst_map: A dict of inst -> nodes mapping
1537
  @return: The desired exit status
1538

1539
  """
1540
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1541
    ToStderr("Please investigate and stop instances manually before continuing")
1542
    return constants.EXIT_FAILURE
1543

    
1544
  if not node_list:
1545
    return constants.EXIT_SUCCESS
1546

    
1547
  if _OobPower(opts, node_list, False):
1548
    return constants.EXIT_SUCCESS
1549
  else:
1550
    return constants.EXIT_FAILURE
1551

    
1552

    
1553
def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1554
        _confirm_fn=ConfirmOperation,
1555
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1556
  """EPO operations.
1557

1558
  @param opts: the command line options selected by the user
1559
  @type args: list
1560
  @param args: should contain only one element, the subcommand
1561
  @rtype: int
1562
  @return: the desired exit code
1563

1564
  """
1565
  if opts.groups and opts.show_all:
1566
    _stderr_fn("Only one of --groups or --all are allowed")
1567
    return constants.EXIT_FAILURE
1568
  elif args and opts.show_all:
1569
    _stderr_fn("Arguments in combination with --all are not allowed")
1570
    return constants.EXIT_FAILURE
1571

    
1572
  if qcl is None:
1573
    # Query client
1574
    qcl = GetClient()
1575

    
1576
  if opts.groups:
1577
    node_query_list = \
1578
      itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1579
  else:
1580
    node_query_list = args
1581

    
1582
  result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1583
                                            "sinst_list", "powered", "offline"],
1584
                          False)
1585

    
1586
  all_nodes = map(compat.fst, result)
1587
  node_list = []
1588
  inst_map = {}
1589
  for (node, master, pinsts, sinsts, powered, offline) in result:
1590
    if not offline:
1591
      for inst in (pinsts + sinsts):
1592
        if inst in inst_map:
1593
          if not master:
1594
            inst_map[inst].add(node)
1595
        elif master:
1596
          inst_map[inst] = set()
1597
        else:
1598
          inst_map[inst] = set([node])
1599

    
1600
    if master and opts.on:
1601
      # We ignore the master for turning on the machines, in fact we are
1602
      # already operating on the master at this point :)
1603
      continue
1604
    elif master and not opts.show_all:
1605
      _stderr_fn("%s is the master node, please do a master-failover to another"
1606
                 " node not affected by the EPO or use --all if you intend to"
1607
                 " shutdown the whole cluster", node)
1608
      return constants.EXIT_FAILURE
1609
    elif powered is None:
1610
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1611
                 " handled in a fully automated manner", node)
1612
    elif powered == opts.on:
1613
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1614
    elif not offline or (offline and powered):
1615
      node_list.append(node)
1616

    
1617
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1618
    return constants.EXIT_FAILURE
1619

    
1620
  if opts.on:
1621
    return _on_fn(opts, all_nodes, node_list, inst_map)
1622
  else:
1623
    return _off_fn(opts, node_list, inst_map)
1624

    
1625

    
1626
def _GetCreateCommand(info):
1627
  buf = StringIO()
1628
  buf.write("gnt-cluster init")
1629
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1630
  buf.write(" ")
1631
  buf.write(info["name"])
1632
  return buf.getvalue()
1633

    
1634

    
1635
def ShowCreateCommand(opts, args):
1636
  """Shows the command that can be used to re-create the cluster.
1637

1638
  Currently it works only for ipolicy specs.
1639

1640
  """
1641
  cl = GetClient()
1642
  result = cl.QueryClusterInfo()
1643
  ToStdout(_GetCreateCommand(result))
1644

    
1645

    
1646
def _RunCommandAndReport(cmd):
1647
  """Run a command and report its output, iff it failed.
1648

1649
  @param cmd: the command to execute
1650
  @type cmd: list
1651
  @rtype: bool
1652
  @return: False, if the execution failed.
1653

1654
  """
1655
  result = utils.RunCmd(cmd)
1656
  if result.failed:
1657
    ToStderr("Command %s failed: %s; Output %s" %
1658
             (cmd, result.fail_reason, result.output))
1659
    return False
1660
  return True
1661

    
1662

    
1663
def _VerifyCommand(cmd):
1664
  """Verify that a given command succeeds on all online nodes.
1665

1666
  As this function is intended to run during upgrades, it
1667
  is implemented in such a way that it still works, if all Ganeti
1668
  daemons are down.
1669

1670
  @param cmd: the command to execute
1671
  @type cmd: list
1672
  @rtype: list
1673
  @return: the list of node names that are online where
1674
      the command failed.
1675

1676
  """
1677
  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1678

    
1679
  nodes = ssconf.SimpleStore().GetOnlineNodeList()
1680
  master_node = ssconf.SimpleStore().GetMasterNode()
1681
  cluster_name = ssconf.SimpleStore().GetClusterName()
1682

    
1683
  # If master node is in 'nodes', make sure master node is at list end
1684
  if master_node in nodes:
1685
    nodes.remove(master_node)
1686
    nodes.append(master_node)
1687

    
1688
  failed = []
1689

    
1690
  srun = ssh.SshRunner(cluster_name=cluster_name)
1691
  for name in nodes:
1692
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1693
    if result.exit_code != 0:
1694
      failed.append(name)
1695

    
1696
  return failed
1697

    
1698

    
1699
def _VerifyVersionInstalled(versionstring):
1700
  """Verify that the given version of ganeti is installed on all online nodes.
1701

1702
  Do nothing, if this is the case, otherwise print an appropriate
1703
  message to stderr.
1704

1705
  @param versionstring: the version to check for
1706
  @type versionstring: string
1707
  @rtype: bool
1708
  @return: True, if the version is installed on all online nodes
1709

1710
  """
1711
  badnodes = _VerifyCommand(["test", "-d",
1712
                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
1713
  if badnodes:
1714
    ToStderr("Ganeti version %s not installed on nodes %s"
1715
             % (versionstring, ", ".join(badnodes)))
1716
    return False
1717

    
1718
  return True
1719

    
1720

    
1721
def _GetRunning():
1722
  """Determine the list of running jobs.
1723

1724
  @rtype: list
1725
  @return: the number of jobs still running
1726

1727
  """
1728
  cl = GetClient()
1729
  qfilter = qlang.MakeSimpleFilter("status",
1730
                                   frozenset([constants.JOB_STATUS_RUNNING]))
1731
  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1732

    
1733

    
1734
def _SetGanetiVersion(versionstring):
1735
  """Set the active version of ganeti to the given versionstring
1736

1737
  @type versionstring: string
1738
  @rtype: list
1739
  @return: the list of nodes where the version change failed
1740

1741
  """
1742
  failed = []
1743
  if constants.HAS_GNU_LN:
1744
    failed.extend(_VerifyCommand(
1745
        ["ln", "-s", "-f", "-T",
1746
         os.path.join(pathutils.PKGLIBDIR, versionstring),
1747
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1748
    failed.extend(_VerifyCommand(
1749
        ["ln", "-s", "-f", "-T",
1750
         os.path.join(pathutils.SHAREDIR, versionstring),
1751
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1752
  else:
1753
    failed.extend(_VerifyCommand(
1754
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1755
    failed.extend(_VerifyCommand(
1756
        ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1757
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1758
    failed.extend(_VerifyCommand(
1759
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1760
    failed.extend(_VerifyCommand(
1761
        ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1762
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1763
  return list(set(failed))
1764

    
1765

    
1766
def _ExecuteCommands(fns):
1767
  """Execute a list of functions, in reverse order.
1768

1769
  @type fns: list of functions.
1770
  @param fns: the functions to be executed.
1771

1772
  """
1773
  for fn in reversed(fns):
1774
    fn()
1775

    
1776

    
1777
def _GetConfigVersion():
1778
  """Determine the version the configuration file currently has.
1779

1780
  @rtype: tuple or None
1781
  @return: (major, minor, revision) if the version can be determined,
1782
      None otherwise
1783

1784
  """
1785
  config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1786
  try:
1787
    config_version = config_data["version"]
1788
  except KeyError:
1789
    return None
1790
  return utils.SplitVersion(config_version)
1791

    
1792

    
1793
def _ReadIntentToUpgrade():
1794
  """Read the file documenting the intent to upgrade the cluster.
1795

1796
  @rtype: (string, string) or (None, None)
1797
  @return: (old version, version to upgrade to), if the file exists,
1798
      and (None, None) otherwise.
1799

1800
  """
1801
  if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1802
    return (None, None)
1803

    
1804
  contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1805
  contents = utils.UnescapeAndSplit(contentstring)
1806
  if len(contents) != 3:
1807
    # file syntactically mal-formed
1808
    return (None, None)
1809
  return (contents[0], contents[1])
1810

    
1811

    
1812
def _WriteIntentToUpgrade(version):
1813
  """Write file documenting the intent to upgrade the cluster.
1814

1815
  @type version: string
1816
  @param version: the version we intent to upgrade to
1817

1818
  """
1819
  utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1820
                  data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1821
                                            "%d" % os.getpid()]))
1822

    
1823

    
1824
def _UpgradeBeforeConfigurationChange(versionstring):
1825
  """
1826
  Carry out all the tasks necessary for an upgrade that happen before
1827
  the configuration file, or Ganeti version, changes.
1828

1829
  @type versionstring: string
1830
  @param versionstring: the version to upgrade to
1831
  @rtype: (bool, list)
1832
  @return: tuple of a bool indicating success and a list of rollback tasks
1833

1834
  """
1835
  rollback = []
1836

    
1837
  if not _VerifyVersionInstalled(versionstring):
1838
    return (False, rollback)
1839

    
1840
  _WriteIntentToUpgrade(versionstring)
1841
  rollback.append(
1842
    lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1843

    
1844
  ToStdout("Draining queue")
1845
  client = GetClient()
1846
  client.SetQueueDrainFlag(True)
1847

    
1848
  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1849

    
1850
  if utils.SimpleRetry(0, _GetRunning,
1851
                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
1852
                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1853
    ToStderr("Failed to completely empty the queue.")
1854
    return (False, rollback)
1855

    
1856
  ToStdout("Stopping daemons on master node.")
1857
  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1858
    return (False, rollback)
1859

    
1860
  if not _VerifyVersionInstalled(versionstring):
1861
    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1862
    return (False, rollback)
1863

    
1864
  ToStdout("Stopping daemons everywhere.")
1865
  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1866
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1867
  if badnodes:
1868
    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
1869
    return (False, rollback)
1870

    
1871
  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
1872
                           "lib/ganeti%d.tar" % time.time())
1873
  ToStdout("Backing up configuration as %s" % backuptar)
1874
  if not _RunCommandAndReport(["tar", "cf", backuptar,
1875
                               pathutils.DATA_DIR]):
1876
    return (False, rollback)
1877

    
1878
  return (True, rollback)
1879

    
1880

    
1881
def _VersionSpecificDowngrade():
1882
  """
1883
  Perform any additional downrade tasks that are version specific
1884
  and need to be done just after the configuration downgrade. This
1885
  function needs to be idempotent, so that it can be redone if the
1886
  downgrade procedure gets interrupted after changing the
1887
  configuration.
1888

1889
  Note that this function has to be reset with every version bump.
1890

1891
  @return: True upon success
1892
  """
1893
  ToStdout("Performing version-specific downgrade tasks.")
1894
  return True
1895

    
1896

    
1897
def _SwitchVersionAndConfig(versionstring, downgrade):
1898
  """
1899
  Switch to the new Ganeti version and change the configuration,
1900
  in correct order.
1901

1902
  @type versionstring: string
1903
  @param versionstring: the version to change to
1904
  @type downgrade: bool
1905
  @param downgrade: True, if the configuration should be downgraded
1906
  @rtype: (bool, list)
1907
  @return: tupe of a bool indicating success, and a list of
1908
      additional rollback tasks
1909

1910
  """
1911
  rollback = []
1912
  if downgrade:
1913
    ToStdout("Downgrading configuration")
1914
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
1915
      return (False, rollback)
1916
    # Note: version specific downgrades need to be done before switching
1917
    # binaries, so that we still have the knowledgeable binary if the downgrade
1918
    # process gets interrupted at this point.
1919
    if not _VersionSpecificDowngrade():
1920
      return (False, rollback)
1921

    
1922
  # Configuration change is the point of no return. From then onwards, it is
1923
  # safer to push through the up/dowgrade than to try to roll it back.
1924

    
1925
  ToStdout("Switching to version %s on all nodes" % versionstring)
1926
  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
1927
  badnodes = _SetGanetiVersion(versionstring)
1928
  if badnodes:
1929
    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
1930
             % (versionstring, ", ".join(badnodes)))
1931
    if not downgrade:
1932
      return (False, rollback)
1933

    
1934
  # Now that we have changed to the new version of Ganeti we should
1935
  # not communicate over luxi any more, as luxi might have changed in
1936
  # incompatible ways. Therefore, manually call the corresponding ganeti
1937
  # commands using their canonical (version independent) path.
1938

    
1939
  if not downgrade:
1940
    ToStdout("Upgrading configuration")
1941
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
1942
      return (False, rollback)
1943

    
1944
  return (True, rollback)
1945

    
1946

    
1947
def _UpgradeAfterConfigurationChange(oldversion):
1948
  """
1949
  Carry out the upgrade actions necessary after switching to the new
1950
  Ganeti version and updating the configuration.
1951

1952
  As this part is run at a time where the new version of Ganeti is already
1953
  running, no communication should happen via luxi, as this is not a stable
1954
  interface. Also, as the configuration change is the point of no return,
1955
  all actions are pushed trough, even if some of them fail.
1956

1957
  @param oldversion: the version the upgrade started from
1958
  @type oldversion: string
1959
  @rtype: int
1960
  @return: the intended return value
1961

1962
  """
1963
  returnvalue = 0
1964

    
1965
  ToStdout("Ensuring directories everywhere.")
1966
  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
1967
  if badnodes:
1968
    ToStderr("Warning: failed to ensure directories on %s." %
1969
             (", ".join(badnodes)))
1970
    returnvalue = 1
1971

    
1972
  ToStdout("Starting daemons everywhere.")
1973
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1974
  if badnodes:
1975
    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
1976
    returnvalue = 1
1977

    
1978
  ToStdout("Redistributing the configuration.")
1979
  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
1980
    returnvalue = 1
1981

    
1982
  ToStdout("Restarting daemons everywhere.")
1983
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1984
  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1985
  if badnodes:
1986
    ToStderr("Warning: failed to start daemons on %s." %
1987
             (", ".join(list(set(badnodes))),))
1988
    returnvalue = 1
1989

    
1990
  ToStdout("Undraining the queue.")
1991
  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
1992
    returnvalue = 1
1993

    
1994
  _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
1995

    
1996
  ToStdout("Running post-upgrade hooks")
1997
  if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
1998
    returnvalue = 1
1999

    
2000
  ToStdout("Verifying cluster.")
2001
  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2002
    returnvalue = 1
2003

    
2004
  return returnvalue
2005

    
2006

    
2007
def UpgradeGanetiCommand(opts, args):
2008
  """Upgrade a cluster to a new ganeti version.
2009

2010
  @param opts: the command line options selected by the user
2011
  @type args: list
2012
  @param args: should be an empty list
2013
  @rtype: int
2014
  @return: the desired exit code
2015

2016
  """
2017
  if ((not opts.resume and opts.to is None)
2018
      or (opts.resume and opts.to is not None)):
2019
    ToStderr("Precisely one of the options --to and --resume"
2020
             " has to be given")
2021
    return 1
2022

    
2023
  oldversion = constants.RELEASE_VERSION
2024

    
2025
  if opts.resume:
2026
    ssconf.CheckMaster(False)
2027
    oldversion, versionstring = _ReadIntentToUpgrade()
2028
    if versionstring is None:
2029
      return 0
2030
    version = utils.version.ParseVersion(versionstring)
2031
    if version is None:
2032
      return 1
2033
    configversion = _GetConfigVersion()
2034
    if configversion is None:
2035
      return 1
2036
    # If the upgrade we resume was an upgrade between compatible
2037
    # versions (like 2.10.0 to 2.10.1), the correct configversion
2038
    # does not guarantee that the config has been updated.
2039
    # However, in the case of a compatible update with the configuration
2040
    # not touched, we are running a different dirversion with the same
2041
    # config version.
2042
    config_already_modified = \
2043
      (utils.IsCorrectConfigVersion(version, configversion) and
2044
       not (versionstring != constants.DIR_VERSION and
2045
            configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2046
                              constants.CONFIG_REVISION)))
2047
    if not config_already_modified:
2048
      # We have to start from the beginning; however, some daemons might have
2049
      # already been stopped, so the only way to get into a well-defined state
2050
      # is by starting all daemons again.
2051
      _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2052
  else:
2053
    versionstring = opts.to
2054
    config_already_modified = False
2055
    version = utils.version.ParseVersion(versionstring)
2056
    if version is None:
2057
      ToStderr("Could not parse version string %s" % versionstring)
2058
      return 1
2059

    
2060
  msg = utils.version.UpgradeRange(version)
2061
  if msg is not None:
2062
    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2063
    return 1
2064

    
2065
  if not config_already_modified:
2066
    success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2067
    if not success:
2068
      _ExecuteCommands(rollback)
2069
      return 1
2070
  else:
2071
    rollback = []
2072

    
2073
  downgrade = utils.version.ShouldCfgdowngrade(version)
2074

    
2075
  success, additionalrollback =  \
2076
      _SwitchVersionAndConfig(versionstring, downgrade)
2077
  if not success:
2078
    rollback.extend(additionalrollback)
2079
    _ExecuteCommands(rollback)
2080
    return 1
2081

    
2082
  return _UpgradeAfterConfigurationChange(oldversion)
2083

    
2084

    
2085
commands = {
2086
  "init": (
2087
    InitCluster, [ArgHost(min=1, max=1)],
2088
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2089
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2090
     NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2091
     SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2092
     DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2093
     PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2094
     GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2095
     HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2096
     IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT]
2097
     + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2098
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2099
  "destroy": (
2100
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2101
    "", "Destroy cluster"),
2102
  "rename": (
2103
    RenameCluster, [ArgHost(min=1, max=1)],
2104
    [FORCE_OPT, DRY_RUN_OPT],
2105
    "<new_name>",
2106
    "Renames the cluster"),
2107
  "redist-conf": (
2108
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2109
    [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2110
    "", "Forces a push of the configuration file and ssconf files"
2111
    " to the nodes in the cluster"),
2112
  "verify": (
2113
    VerifyCluster, ARGS_NONE,
2114
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2115
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2116
    "", "Does a check on the cluster configuration"),
2117
  "verify-disks": (
2118
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2119
    "", "Does a check on the cluster disk status"),
2120
  "repair-disk-sizes": (
2121
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2122
    "[instance...]", "Updates mismatches in recorded disk sizes"),
2123
  "master-failover": (
2124
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2125
    "", "Makes the current node the master"),
2126
  "master-ping": (
2127
    MasterPing, ARGS_NONE, [],
2128
    "", "Checks if the master is alive"),
2129
  "version": (
2130
    ShowClusterVersion, ARGS_NONE, [],
2131
    "", "Shows the cluster version"),
2132
  "getmaster": (
2133
    ShowClusterMaster, ARGS_NONE, [],
2134
    "", "Shows the cluster master"),
2135
  "copyfile": (
2136
    ClusterCopyFile, [ArgFile(min=1, max=1)],
2137
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2138
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2139
  "command": (
2140
    RunClusterCommand, [ArgCommand(min=1)],
2141
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2142
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2143
  "info": (
2144
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2145
    "[--roman]", "Show cluster configuration"),
2146
  "list-tags": (
2147
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2148
  "add-tags": (
2149
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2150
    "tag...", "Add tags to the cluster"),
2151
  "remove-tags": (
2152
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2153
    "tag...", "Remove tags from the cluster"),
2154
  "search-tags": (
2155
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2156
    "Searches the tags on all objects on"
2157
    " the cluster for a given pattern (regex)"),
2158
  "queue": (
2159
    QueueOps,
2160
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2161
    [], "drain|undrain|info", "Change queue properties"),
2162
  "watcher": (
2163
    WatcherOps,
2164
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2165
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2166
    [],
2167
    "{pause <timespec>|continue|info}", "Change watcher properties"),
2168
  "modify": (
2169
    SetClusterParams, ARGS_NONE,
2170
    [FORCE_OPT,
2171
     BACKEND_OPT, CP_SIZE_OPT, RQL_OPT, INSTANCE_COMMUNICATION_NETWORK_OPT,
2172
     ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
2173
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT,
2174
     UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT, DRBD_HELPER_OPT,
2175
     DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT,
2176
     DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2177
     USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] +
2178
     SUBMIT_OPTS +
2179
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
2180
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
2181
    "[opts...]",
2182
    "Alters the parameters of the cluster"),
2183
  "renew-crypto": (
2184
    RenewCrypto, ARGS_NONE,
2185
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2186
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2187
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2188
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2189
     NEW_NODE_CERT_OPT],
2190
    "[opts...]",
2191
    "Renews cluster certificates, keys and secrets"),
2192
  "epo": (
2193
    Epo, [ArgUnknown()],
2194
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2195
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2196
    "[opts...] [args]",
2197
    "Performs an emergency power-off on given args"),
2198
  "activate-master-ip": (
2199
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2200
  "deactivate-master-ip": (
2201
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2202
    "Deactivates the master IP"),
2203
  "show-ispecs-cmd": (
2204
    ShowCreateCommand, ARGS_NONE, [], "",
2205
    "Show the command line to re-create the cluster"),
2206
  "upgrade": (
2207
    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2208
    "Upgrade (or downgrade) to a new Ganeti version"),
2209
  }
2210

    
2211

    
2212
#: dictionary with aliases for commands
2213
aliases = {
2214
  "masterfailover": "master-failover",
2215
  "show": "info",
2216
}
2217

    
2218

    
2219
def Main():
2220
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2221
                     aliases=aliases)