Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 8a5d326f

History | View | Annotate | Download (70.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import pathutils
48
from ganeti import serializer
49
from ganeti import qlang
50

    
51

    
52
ON_OPT = cli_option("--on", default=False,
53
                    action="store_true", dest="on",
54
                    help="Recover from an EPO")
55

    
56
GROUPS_OPT = cli_option("--groups", default=False,
57
                        action="store_true", dest="groups",
58
                        help="Arguments are node groups instead of nodes")
59

    
60
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
61
                            help="Override interactive check for --no-voting",
62
                            default=False, action="store_true")
63

    
64
FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
65
                                help="Unconditionally distribute the"
66
                                " configuration, even if the queue"
67
                                " is drained",
68
                                default=False, action="store_true")
69

    
70
TO_OPT = cli_option("--to", default=None, type="string",
71
                    help="The Ganeti version to upgrade to")
72

    
73
RESUME_OPT = cli_option("--resume", default=False, action="store_true",
74
                        help="Resume any pending Ganeti upgrades")
75

    
76
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
77
_EPO_PING_TIMEOUT = 1 # 1 second
78
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
79

    
80

    
81
def _InitEnabledDiskTemplates(opts):
82
  """Initialize the list of enabled disk templates.
83

84
  """
85
  if opts.enabled_disk_templates:
86
    return opts.enabled_disk_templates.split(",")
87
  else:
88
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
89

    
90

    
91
def _InitVgName(opts, enabled_disk_templates):
92
  """Initialize the volume group name.
93

94
  @type enabled_disk_templates: list of strings
95
  @param enabled_disk_templates: cluster-wide enabled disk templates
96

97
  """
98
  vg_name = None
99
  if opts.vg_name is not None:
100
    vg_name = opts.vg_name
101
    if vg_name:
102
      if not utils.IsLvmEnabled(enabled_disk_templates):
103
        ToStdout("You specified a volume group with --vg-name, but you did not"
104
                 " enable any disk template that uses lvm.")
105
    elif utils.IsLvmEnabled(enabled_disk_templates):
106
      raise errors.OpPrereqError(
107
          "LVM disk templates are enabled, but vg name not set.")
108
  elif utils.IsLvmEnabled(enabled_disk_templates):
109
    vg_name = constants.DEFAULT_VG
110
  return vg_name
111

    
112

    
113
def _InitDrbdHelper(opts, enabled_disk_templates):
114
  """Initialize the DRBD usermode helper.
115

116
  """
117
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
118

    
119
  if not drbd_enabled and opts.drbd_helper is not None:
120
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
121
             " is not enabled.")
122

    
123
  if drbd_enabled:
124
    if opts.drbd_helper is None:
125
      return constants.DEFAULT_DRBD_HELPER
126
    if opts.drbd_helper == '':
127
      raise errors.OpPrereqError(
128
          "Unsetting the drbd usermode helper while enabling DRBD is not"
129
          " allowed.")
130

    
131
  return opts.drbd_helper
132

    
133

    
134
@UsesRPC
135
def InitCluster(opts, args):
136
  """Initialize the cluster.
137

138
  @param opts: the command line options selected by the user
139
  @type args: list
140
  @param args: should contain only one element, the desired
141
      cluster name
142
  @rtype: int
143
  @return: the desired exit code
144

145
  """
146
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
147

    
148
  try:
149
    vg_name = _InitVgName(opts, enabled_disk_templates)
150
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
151
  except errors.OpPrereqError, e:
152
    ToStderr(str(e))
153
    return 1
154

    
155
  master_netdev = opts.master_netdev
156
  if master_netdev is None:
157
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
158
    if not nic_mode:
159
      # default case, use bridging
160
      master_netdev = constants.DEFAULT_BRIDGE
161
    elif nic_mode == constants.NIC_MODE_OVS:
162
      # default ovs is different from default bridge
163
      master_netdev = constants.DEFAULT_OVS
164
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
165

    
166
  hvlist = opts.enabled_hypervisors
167
  if hvlist is None:
168
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
169
  hvlist = hvlist.split(",")
170

    
171
  hvparams = dict(opts.hvparams)
172
  beparams = opts.beparams
173
  nicparams = opts.nicparams
174

    
175
  diskparams = dict(opts.diskparams)
176

    
177
  # check the disk template types here, as we cannot rely on the type check done
178
  # by the opcode parameter types
179
  diskparams_keys = set(diskparams.keys())
180
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
181
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
182
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
183
    return 1
184

    
185
  # prepare beparams dict
186
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
187
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
188

    
189
  # prepare nicparams dict
190
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
191
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
192

    
193
  # prepare ndparams dict
194
  if opts.ndparams is None:
195
    ndparams = dict(constants.NDC_DEFAULTS)
196
  else:
197
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
198
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
199

    
200
  # prepare hvparams dict
201
  for hv in constants.HYPER_TYPES:
202
    if hv not in hvparams:
203
      hvparams[hv] = {}
204
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
205
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
206

    
207
  # prepare diskparams dict
208
  for templ in constants.DISK_TEMPLATES:
209
    if templ not in diskparams:
210
      diskparams[templ] = {}
211
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
212
                                         diskparams[templ])
213
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
214

    
215
  # prepare ipolicy dict
216
  ipolicy = CreateIPolicyFromOpts(
217
    ispecs_mem_size=opts.ispecs_mem_size,
218
    ispecs_cpu_count=opts.ispecs_cpu_count,
219
    ispecs_disk_count=opts.ispecs_disk_count,
220
    ispecs_disk_size=opts.ispecs_disk_size,
221
    ispecs_nic_count=opts.ispecs_nic_count,
222
    minmax_ispecs=opts.ipolicy_bounds_specs,
223
    std_ispecs=opts.ipolicy_std_specs,
224
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
225
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
226
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
227
    fill_all=True)
228

    
229
  if opts.candidate_pool_size is None:
230
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
231

    
232
  if opts.mac_prefix is None:
233
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
234

    
235
  uid_pool = opts.uid_pool
236
  if uid_pool is not None:
237
    uid_pool = uidpool.ParseUidPool(uid_pool)
238

    
239
  if opts.prealloc_wipe_disks is None:
240
    opts.prealloc_wipe_disks = False
241

    
242
  external_ip_setup_script = opts.use_external_mip_script
243
  if external_ip_setup_script is None:
244
    external_ip_setup_script = False
245

    
246
  try:
247
    primary_ip_version = int(opts.primary_ip_version)
248
  except (ValueError, TypeError), err:
249
    ToStderr("Invalid primary ip version value: %s" % str(err))
250
    return 1
251

    
252
  master_netmask = opts.master_netmask
253
  try:
254
    if master_netmask is not None:
255
      master_netmask = int(master_netmask)
256
  except (ValueError, TypeError), err:
257
    ToStderr("Invalid master netmask value: %s" % str(err))
258
    return 1
259

    
260
  if opts.disk_state:
261
    disk_state = utils.FlatToDict(opts.disk_state)
262
  else:
263
    disk_state = {}
264

    
265
  hv_state = dict(opts.hv_state)
266

    
267
  default_ialloc_params = opts.default_iallocator_params
268
  bootstrap.InitCluster(cluster_name=args[0],
269
                        secondary_ip=opts.secondary_ip,
270
                        vg_name=vg_name,
271
                        mac_prefix=opts.mac_prefix,
272
                        master_netmask=master_netmask,
273
                        master_netdev=master_netdev,
274
                        file_storage_dir=opts.file_storage_dir,
275
                        shared_file_storage_dir=opts.shared_file_storage_dir,
276
                        gluster_storage_dir=opts.gluster_storage_dir,
277
                        enabled_hypervisors=hvlist,
278
                        hvparams=hvparams,
279
                        beparams=beparams,
280
                        nicparams=nicparams,
281
                        ndparams=ndparams,
282
                        diskparams=diskparams,
283
                        ipolicy=ipolicy,
284
                        candidate_pool_size=opts.candidate_pool_size,
285
                        modify_etc_hosts=opts.modify_etc_hosts,
286
                        modify_ssh_setup=opts.modify_ssh_setup,
287
                        maintain_node_health=opts.maintain_node_health,
288
                        drbd_helper=drbd_helper,
289
                        uid_pool=uid_pool,
290
                        default_iallocator=opts.default_iallocator,
291
                        default_iallocator_params=default_ialloc_params,
292
                        primary_ip_version=primary_ip_version,
293
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
294
                        use_external_mip_script=external_ip_setup_script,
295
                        hv_state=hv_state,
296
                        disk_state=disk_state,
297
                        enabled_disk_templates=enabled_disk_templates,
298
                        )
299
  op = opcodes.OpClusterPostInit()
300
  SubmitOpCode(op, opts=opts)
301
  return 0
302

    
303

    
304
@UsesRPC
305
def DestroyCluster(opts, args):
306
  """Destroy the cluster.
307

308
  @param opts: the command line options selected by the user
309
  @type args: list
310
  @param args: should be an empty list
311
  @rtype: int
312
  @return: the desired exit code
313

314
  """
315
  if not opts.yes_do_it:
316
    ToStderr("Destroying a cluster is irreversible. If you really want"
317
             " destroy this cluster, supply the --yes-do-it option.")
318
    return 1
319

    
320
  op = opcodes.OpClusterDestroy()
321
  master_uuid = SubmitOpCode(op, opts=opts)
322
  # if we reached this, the opcode didn't fail; we can proceed to
323
  # shutdown all the daemons
324
  bootstrap.FinalizeClusterDestroy(master_uuid)
325
  return 0
326

    
327

    
328
def RenameCluster(opts, args):
329
  """Rename the cluster.
330

331
  @param opts: the command line options selected by the user
332
  @type args: list
333
  @param args: should contain only one element, the new cluster name
334
  @rtype: int
335
  @return: the desired exit code
336

337
  """
338
  cl = GetClient()
339

    
340
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
341

    
342
  new_name = args[0]
343
  if not opts.force:
344
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
345
                " connected over the network to the cluster name, the"
346
                " operation is very dangerous as the IP address will be"
347
                " removed from the node and the change may not go through."
348
                " Continue?") % (cluster_name, new_name)
349
    if not AskUser(usertext):
350
      return 1
351

    
352
  op = opcodes.OpClusterRename(name=new_name)
353
  result = SubmitOpCode(op, opts=opts, cl=cl)
354

    
355
  if result:
356
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
357

    
358
  return 0
359

    
360

    
361
def ActivateMasterIp(opts, args):
362
  """Activates the master IP.
363

364
  """
365
  op = opcodes.OpClusterActivateMasterIp()
366
  SubmitOpCode(op)
367
  return 0
368

    
369

    
370
def DeactivateMasterIp(opts, args):
371
  """Deactivates the master IP.
372

373
  """
374
  if not opts.confirm:
375
    usertext = ("This will disable the master IP. All the open connections to"
376
                " the master IP will be closed. To reach the master you will"
377
                " need to use its node IP."
378
                " Continue?")
379
    if not AskUser(usertext):
380
      return 1
381

    
382
  op = opcodes.OpClusterDeactivateMasterIp()
383
  SubmitOpCode(op)
384
  return 0
385

    
386

    
387
def RedistributeConfig(opts, args):
388
  """Forces push of the cluster configuration.
389

390
  @param opts: the command line options selected by the user
391
  @type args: list
392
  @param args: empty list
393
  @rtype: int
394
  @return: the desired exit code
395

396
  """
397
  op = opcodes.OpClusterRedistConf()
398
  if opts.yes_do_it:
399
    SubmitOpCodeToDrainedQueue(op)
400
  else:
401
    SubmitOrSend(op, opts)
402
  return 0
403

    
404

    
405
def ShowClusterVersion(opts, args):
406
  """Write version of ganeti software to the standard output.
407

408
  @param opts: the command line options selected by the user
409
  @type args: list
410
  @param args: should be an empty list
411
  @rtype: int
412
  @return: the desired exit code
413

414
  """
415
  cl = GetClient()
416
  result = cl.QueryClusterInfo()
417
  ToStdout("Software version: %s", result["software_version"])
418
  ToStdout("Internode protocol: %s", result["protocol_version"])
419
  ToStdout("Configuration format: %s", result["config_version"])
420
  ToStdout("OS api version: %s", result["os_api_version"])
421
  ToStdout("Export interface: %s", result["export_version"])
422
  ToStdout("VCS version: %s", result["vcs_version"])
423
  return 0
424

    
425

    
426
def ShowClusterMaster(opts, args):
427
  """Write name of master node to the standard output.
428

429
  @param opts: the command line options selected by the user
430
  @type args: list
431
  @param args: should be an empty list
432
  @rtype: int
433
  @return: the desired exit code
434

435
  """
436
  master = bootstrap.GetMaster()
437
  ToStdout(master)
438
  return 0
439

    
440

    
441
def _FormatGroupedParams(paramsdict, roman=False):
442
  """Format Grouped parameters (be, nic, disk) by group.
443

444
  @type paramsdict: dict of dicts
445
  @param paramsdict: {group: {param: value, ...}, ...}
446
  @rtype: dict of dicts
447
  @return: copy of the input dictionaries with strings as values
448

449
  """
450
  ret = {}
451
  for (item, val) in paramsdict.items():
452
    if isinstance(val, dict):
453
      ret[item] = _FormatGroupedParams(val, roman=roman)
454
    elif roman and isinstance(val, int):
455
      ret[item] = compat.TryToRoman(val)
456
    else:
457
      ret[item] = str(val)
458
  return ret
459

    
460

    
461
def ShowClusterConfig(opts, args):
462
  """Shows cluster information.
463

464
  @param opts: the command line options selected by the user
465
  @type args: list
466
  @param args: should be an empty list
467
  @rtype: int
468
  @return: the desired exit code
469

470
  """
471
  cl = GetClient()
472
  result = cl.QueryClusterInfo()
473

    
474
  if result["tags"]:
475
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
476
  else:
477
    tags = "(none)"
478
  if result["reserved_lvs"]:
479
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
480
  else:
481
    reserved_lvs = "(none)"
482

    
483
  enabled_hv = result["enabled_hypervisors"]
484
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
485
                  if k in enabled_hv)
486

    
487
  info = [
488
    ("Cluster name", result["name"]),
489
    ("Cluster UUID", result["uuid"]),
490

    
491
    ("Creation time", utils.FormatTime(result["ctime"])),
492
    ("Modification time", utils.FormatTime(result["mtime"])),
493

    
494
    ("Master node", result["master"]),
495

    
496
    ("Architecture (this node)",
497
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
498

    
499
    ("Tags", tags),
500

    
501
    ("Default hypervisor", result["default_hypervisor"]),
502
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
503

    
504
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
505

    
506
    ("OS-specific hypervisor parameters",
507
     _FormatGroupedParams(result["os_hvp"])),
508

    
509
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
510

    
511
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
512
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
513

    
514
    ("Cluster parameters", [
515
      ("candidate pool size",
516
       compat.TryToRoman(result["candidate_pool_size"],
517
                         convert=opts.roman_integers)),
518
      ("maximal number of jobs running simultaneously",
519
       compat.TryToRoman(result["max_running_jobs"],
520
                         convert=opts.roman_integers)),
521
      ("master netdev", result["master_netdev"]),
522
      ("master netmask", result["master_netmask"]),
523
      ("use external master IP address setup script",
524
       result["use_external_mip_script"]),
525
      ("lvm volume group", result["volume_group_name"]),
526
      ("lvm reserved volumes", reserved_lvs),
527
      ("drbd usermode helper", result["drbd_usermode_helper"]),
528
      ("file storage path", result["file_storage_dir"]),
529
      ("shared file storage path", result["shared_file_storage_dir"]),
530
      ("gluster storage path", result["gluster_storage_dir"]),
531
      ("maintenance of node health", result["maintain_node_health"]),
532
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
533
      ("default instance allocator", result["default_iallocator"]),
534
      ("default instance allocator parameters",
535
       result["default_iallocator_params"]),
536
      ("primary ip version", result["primary_ip_version"]),
537
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
538
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
539
      ("ExtStorage Providers search path",
540
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
541
      ("enabled disk templates",
542
       utils.CommaJoin(result["enabled_disk_templates"])),
543
      ("instance communication network",
544
       result["instance_communication_network"]),
545
      ]),
546

    
547
    ("Default node parameters",
548
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
549

    
550
    ("Default instance parameters",
551
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
552

    
553
    ("Default nic parameters",
554
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
555

    
556
    ("Default disk parameters",
557
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
558

    
559
    ("Instance policy - limits for instances",
560
     FormatPolicyInfo(result["ipolicy"], None, True)),
561
    ]
562

    
563
  PrintGenericInfo(info)
564
  return 0
565

    
566

    
567
def ClusterCopyFile(opts, args):
568
  """Copy a file from master to some nodes.
569

570
  @param opts: the command line options selected by the user
571
  @type args: list
572
  @param args: should contain only one element, the path of
573
      the file to be copied
574
  @rtype: int
575
  @return: the desired exit code
576

577
  """
578
  filename = args[0]
579
  if not os.path.exists(filename):
580
    raise errors.OpPrereqError("No such filename '%s'" % filename,
581
                               errors.ECODE_INVAL)
582

    
583
  cl = GetClient()
584
  qcl = GetClient()
585
  try:
586
    cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
587

    
588
    results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
589
                             secondary_ips=opts.use_replication_network,
590
                             nodegroup=opts.nodegroup)
591
    ports = GetNodesSshPorts(opts.nodes, qcl)
592
  finally:
593
    cl.Close()
594
    qcl.Close()
595

    
596
  srun = ssh.SshRunner(cluster_name)
597
  for (node, port) in zip(results, ports):
598
    if not srun.CopyFileToNode(node, port, filename):
599
      ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
600

    
601
  return 0
602

    
603

    
604
def RunClusterCommand(opts, args):
605
  """Run a command on some nodes.
606

607
  @param opts: the command line options selected by the user
608
  @type args: list
609
  @param args: should contain the command to be run and its arguments
610
  @rtype: int
611
  @return: the desired exit code
612

613
  """
614
  cl = GetClient()
615
  qcl = GetClient()
616

    
617
  command = " ".join(args)
618

    
619
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
620
  ports = GetNodesSshPorts(nodes, qcl)
621

    
622
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
623
                                                    "master_node"])
624

    
625
  srun = ssh.SshRunner(cluster_name=cluster_name)
626

    
627
  # Make sure master node is at list end
628
  if master_node in nodes:
629
    nodes.remove(master_node)
630
    nodes.append(master_node)
631

    
632
  for (name, port) in zip(nodes, ports):
633
    result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
634

    
635
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
636
      # Do not output anything for successful commands
637
      continue
638

    
639
    ToStdout("------------------------------------------------")
640
    if opts.show_machine_names:
641
      for line in result.output.splitlines():
642
        ToStdout("%s: %s", name, line)
643
    else:
644
      ToStdout("node: %s", name)
645
      ToStdout("%s", result.output)
646
    ToStdout("return code = %s", result.exit_code)
647

    
648
  return 0
649

    
650

    
651
def VerifyCluster(opts, args):
652
  """Verify integrity of cluster, performing various test on nodes.
653

654
  @param opts: the command line options selected by the user
655
  @type args: list
656
  @param args: should be an empty list
657
  @rtype: int
658
  @return: the desired exit code
659

660
  """
661
  skip_checks = []
662

    
663
  if opts.skip_nplusone_mem:
664
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
665

    
666
  cl = GetClient()
667

    
668
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
669
                               error_codes=opts.error_codes,
670
                               debug_simulate_errors=opts.simulate_errors,
671
                               skip_checks=skip_checks,
672
                               ignore_errors=opts.ignore_errors,
673
                               group_name=opts.nodegroup)
674
  result = SubmitOpCode(op, cl=cl, opts=opts)
675

    
676
  # Keep track of submitted jobs
677
  jex = JobExecutor(cl=cl, opts=opts)
678

    
679
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
680
    jex.AddJobId(None, status, job_id)
681

    
682
  results = jex.GetResults()
683

    
684
  (bad_jobs, bad_results) = \
685
    map(len,
686
        # Convert iterators to lists
687
        map(list,
688
            # Count errors
689
            map(compat.partial(itertools.ifilterfalse, bool),
690
                # Convert result to booleans in a tuple
691
                zip(*((job_success, len(op_results) == 1 and op_results[0])
692
                      for (job_success, op_results) in results)))))
693

    
694
  if bad_jobs == 0 and bad_results == 0:
695
    rcode = constants.EXIT_SUCCESS
696
  else:
697
    rcode = constants.EXIT_FAILURE
698
    if bad_jobs > 0:
699
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
700

    
701
  return rcode
702

    
703

    
704
def VerifyDisks(opts, args):
705
  """Verify integrity of cluster disks.
706

707
  @param opts: the command line options selected by the user
708
  @type args: list
709
  @param args: should be an empty list
710
  @rtype: int
711
  @return: the desired exit code
712

713
  """
714
  cl = GetClient()
715

    
716
  op = opcodes.OpClusterVerifyDisks()
717

    
718
  result = SubmitOpCode(op, cl=cl, opts=opts)
719

    
720
  # Keep track of submitted jobs
721
  jex = JobExecutor(cl=cl, opts=opts)
722

    
723
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
724
    jex.AddJobId(None, status, job_id)
725

    
726
  retcode = constants.EXIT_SUCCESS
727

    
728
  for (status, result) in jex.GetResults():
729
    if not status:
730
      ToStdout("Job failed: %s", result)
731
      continue
732

    
733
    ((bad_nodes, instances, missing), ) = result
734

    
735
    for node, text in bad_nodes.items():
736
      ToStdout("Error gathering data on node %s: %s",
737
               node, utils.SafeEncode(text[-400:]))
738
      retcode = constants.EXIT_FAILURE
739
      ToStdout("You need to fix these nodes first before fixing instances")
740

    
741
    for iname in instances:
742
      if iname in missing:
743
        continue
744
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
745
      try:
746
        ToStdout("Activating disks for instance '%s'", iname)
747
        SubmitOpCode(op, opts=opts, cl=cl)
748
      except errors.GenericError, err:
749
        nret, msg = FormatError(err)
750
        retcode |= nret
751
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
752

    
753
    if missing:
754
      for iname, ival in missing.iteritems():
755
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
756
        if all_missing:
757
          ToStdout("Instance %s cannot be verified as it lives on"
758
                   " broken nodes", iname)
759
        else:
760
          ToStdout("Instance %s has missing logical volumes:", iname)
761
          ival.sort()
762
          for node, vol in ival:
763
            if node in bad_nodes:
764
              ToStdout("\tbroken node %s /dev/%s", node, vol)
765
            else:
766
              ToStdout("\t%s /dev/%s", node, vol)
767

    
768
      ToStdout("You need to replace or recreate disks for all the above"
769
               " instances if this message persists after fixing broken nodes.")
770
      retcode = constants.EXIT_FAILURE
771
    elif not instances:
772
      ToStdout("No disks need to be activated.")
773

    
774
  return retcode
775

    
776

    
777
def RepairDiskSizes(opts, args):
778
  """Verify sizes of cluster disks.
779

780
  @param opts: the command line options selected by the user
781
  @type args: list
782
  @param args: optional list of instances to restrict check to
783
  @rtype: int
784
  @return: the desired exit code
785

786
  """
787
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
788
  SubmitOpCode(op, opts=opts)
789

    
790

    
791
@UsesRPC
792
def MasterFailover(opts, args):
793
  """Failover the master node.
794

795
  This command, when run on a non-master node, will cause the current
796
  master to cease being master, and the non-master to become new
797
  master.
798

799
  @param opts: the command line options selected by the user
800
  @type args: list
801
  @param args: should be an empty list
802
  @rtype: int
803
  @return: the desired exit code
804

805
  """
806
  if opts.no_voting and not opts.yes_do_it:
807
    usertext = ("This will perform the failover even if most other nodes"
808
                " are down, or if this node is outdated. This is dangerous"
809
                " as it can lead to a non-consistent cluster. Check the"
810
                " gnt-cluster(8) man page before proceeding. Continue?")
811
    if not AskUser(usertext):
812
      return 1
813

    
814
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
815

    
816

    
817
def MasterPing(opts, args):
818
  """Checks if the master is alive.
819

820
  @param opts: the command line options selected by the user
821
  @type args: list
822
  @param args: should be an empty list
823
  @rtype: int
824
  @return: the desired exit code
825

826
  """
827
  try:
828
    cl = GetClient()
829
    cl.QueryClusterInfo()
830
    return 0
831
  except Exception: # pylint: disable=W0703
832
    return 1
833

    
834

    
835
def SearchTags(opts, args):
836
  """Searches the tags on all the cluster.
837

838
  @param opts: the command line options selected by the user
839
  @type args: list
840
  @param args: should contain only one element, the tag pattern
841
  @rtype: int
842
  @return: the desired exit code
843

844
  """
845
  op = opcodes.OpTagsSearch(pattern=args[0])
846
  result = SubmitOpCode(op, opts=opts)
847
  if not result:
848
    return 1
849
  result = list(result)
850
  result.sort()
851
  for path, tag in result:
852
    ToStdout("%s %s", path, tag)
853

    
854

    
855
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
856
  """Reads and verifies an X509 certificate.
857

858
  @type cert_filename: string
859
  @param cert_filename: the path of the file containing the certificate to
860
                        verify encoded in PEM format
861
  @type verify_private_key: bool
862
  @param verify_private_key: whether to verify the private key in addition to
863
                             the public certificate
864
  @rtype: string
865
  @return: a string containing the PEM-encoded certificate.
866

867
  """
868
  try:
869
    pem = utils.ReadFile(cert_filename)
870
  except IOError, err:
871
    raise errors.X509CertError(cert_filename,
872
                               "Unable to read certificate: %s" % str(err))
873

    
874
  try:
875
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
876
  except Exception, err:
877
    raise errors.X509CertError(cert_filename,
878
                               "Unable to load certificate: %s" % str(err))
879

    
880
  if verify_private_key:
881
    try:
882
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
883
    except Exception, err:
884
      raise errors.X509CertError(cert_filename,
885
                                 "Unable to load private key: %s" % str(err))
886

    
887
  return pem
888

    
889

    
890
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
891
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
892
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
893
                 cds_filename, force, new_node_cert):
894
  """Renews cluster certificates, keys and secrets.
895

896
  @type new_cluster_cert: bool
897
  @param new_cluster_cert: Whether to generate a new cluster certificate
898
  @type new_rapi_cert: bool
899
  @param new_rapi_cert: Whether to generate a new RAPI certificate
900
  @type rapi_cert_filename: string
901
  @param rapi_cert_filename: Path to file containing new RAPI certificate
902
  @type new_spice_cert: bool
903
  @param new_spice_cert: Whether to generate a new SPICE certificate
904
  @type spice_cert_filename: string
905
  @param spice_cert_filename: Path to file containing new SPICE certificate
906
  @type spice_cacert_filename: string
907
  @param spice_cacert_filename: Path to file containing the certificate of the
908
                                CA that signed the SPICE certificate
909
  @type new_confd_hmac_key: bool
910
  @param new_confd_hmac_key: Whether to generate a new HMAC key
911
  @type new_cds: bool
912
  @param new_cds: Whether to generate a new cluster domain secret
913
  @type cds_filename: string
914
  @param cds_filename: Path to file containing new cluster domain secret
915
  @type force: bool
916
  @param force: Whether to ask user for confirmation
917
  @type new_node_cert: string
918
  @param new_node_cert: Whether to generate new node certificates
919

920
  """
921
  if new_rapi_cert and rapi_cert_filename:
922
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
923
             " options can be specified at the same time.")
924
    return 1
925

    
926
  if new_cds and cds_filename:
927
    ToStderr("Only one of the --new-cluster-domain-secret and"
928
             " --cluster-domain-secret options can be specified at"
929
             " the same time.")
930
    return 1
931

    
932
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
933
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
934
             " and --spice-ca-certificate must not be used.")
935
    return 1
936

    
937
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
938
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
939
             " specified.")
940
    return 1
941

    
942
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
943
  try:
944
    if rapi_cert_filename:
945
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
946
    if spice_cert_filename:
947
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
948
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
949
  except errors.X509CertError, err:
950
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
951
    return 1
952

    
953
  if cds_filename:
954
    try:
955
      cds = utils.ReadFile(cds_filename)
956
    except Exception, err: # pylint: disable=W0703
957
      ToStderr("Can't load new cluster domain secret from %s: %s" %
958
               (cds_filename, str(err)))
959
      return 1
960
  else:
961
    cds = None
962

    
963
  if not force:
964
    usertext = ("This requires all daemons on all nodes to be restarted and"
965
                " may take some time. Continue?")
966
    if not AskUser(usertext):
967
      return 1
968

    
969
  def _RenewCryptoInner(ctx):
970
    ctx.feedback_fn("Updating certificates and keys")
971
    # Note: the node certificate will be generated in the LU
972
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
973
                                    new_rapi_cert,
974
                                    new_spice_cert,
975
                                    new_confd_hmac_key,
976
                                    new_cds,
977
                                    rapi_cert_pem=rapi_cert_pem,
978
                                    spice_cert_pem=spice_cert_pem,
979
                                    spice_cacert_pem=spice_cacert_pem,
980
                                    cds=cds)
981

    
982
    files_to_copy = []
983

    
984
    if new_cluster_cert:
985
      files_to_copy.append(pathutils.NODED_CERT_FILE)
986

    
987
    if new_rapi_cert or rapi_cert_pem:
988
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
989

    
990
    if new_spice_cert or spice_cert_pem:
991
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
992
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
993

    
994
    if new_confd_hmac_key:
995
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
996

    
997
    if new_cds or cds:
998
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
999

    
1000
    if files_to_copy:
1001
      for node_name in ctx.nonmaster_nodes:
1002
        port = ctx.ssh_ports[node_name]
1003
        ctx.feedback_fn("Copying %s to %s:%d" %
1004
                        (", ".join(files_to_copy), node_name, port))
1005
        for file_name in files_to_copy:
1006
          ctx.ssh.CopyFileToNode(node_name, port, file_name)
1007

    
1008
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1009

    
1010
  ToStdout("All requested certificates and keys have been replaced."
1011
           " Running \"gnt-cluster verify\" now is recommended.")
1012

    
1013
  if new_node_cert:
1014
    cl = GetClient()
1015
    renew_op = opcodes.OpClusterRenewCrypto()
1016
    SubmitOpCode(renew_op, cl=cl)
1017

    
1018
  return 0
1019

    
1020

    
1021
def RenewCrypto(opts, args):
1022
  """Renews cluster certificates, keys and secrets.
1023

1024
  """
1025
  return _RenewCrypto(opts.new_cluster_cert,
1026
                      opts.new_rapi_cert,
1027
                      opts.rapi_cert,
1028
                      opts.new_spice_cert,
1029
                      opts.spice_cert,
1030
                      opts.spice_cacert,
1031
                      opts.new_confd_hmac_key,
1032
                      opts.new_cluster_domain_secret,
1033
                      opts.cluster_domain_secret,
1034
                      opts.force,
1035
                      opts.new_node_cert)
1036

    
1037

    
1038
def _GetEnabledDiskTemplates(opts):
1039
  """Determine the list of enabled disk templates.
1040

1041
  """
1042
  if opts.enabled_disk_templates:
1043
    return opts.enabled_disk_templates.split(",")
1044
  else:
1045
    return None
1046

    
1047

    
1048
def _GetVgName(opts, enabled_disk_templates):
1049
  """Determine the volume group name.
1050

1051
  @type enabled_disk_templates: list of strings
1052
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1053

1054
  """
1055
  # consistency between vg name and enabled disk templates
1056
  vg_name = None
1057
  if opts.vg_name is not None:
1058
    vg_name = opts.vg_name
1059
  if enabled_disk_templates:
1060
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1061
      ToStdout("You specified a volume group with --vg-name, but you did not"
1062
               " enable any of the following lvm-based disk templates: %s" %
1063
               utils.CommaJoin(constants.DTS_LVM))
1064
  return vg_name
1065

    
1066

    
1067
def _GetDrbdHelper(opts, enabled_disk_templates):
1068
  """Determine the DRBD usermode helper.
1069

1070
  """
1071
  drbd_helper = opts.drbd_helper
1072
  if enabled_disk_templates:
1073
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1074
    if not drbd_enabled and opts.drbd_helper:
1075
      ToStdout("You specified a DRBD usermode helper with "
1076
               " --drbd-usermode-helper while DRBD is not enabled.")
1077
  return drbd_helper
1078

    
1079

    
1080
def SetClusterParams(opts, args):
1081
  """Modify the cluster.
1082

1083
  @param opts: the command line options selected by the user
1084
  @type args: list
1085
  @param args: should be an empty list
1086
  @rtype: int
1087
  @return: the desired exit code
1088

1089
  """
1090
  if not (opts.vg_name is not None or
1091
          opts.drbd_helper is not None or
1092
          opts.enabled_hypervisors or opts.hvparams or
1093
          opts.beparams or opts.nicparams or
1094
          opts.ndparams or opts.diskparams or
1095
          opts.candidate_pool_size is not None or
1096
          opts.max_running_jobs is not None or
1097
          opts.uid_pool is not None or
1098
          opts.maintain_node_health is not None or
1099
          opts.add_uids is not None or
1100
          opts.remove_uids is not None or
1101
          opts.default_iallocator is not None or
1102
          opts.default_iallocator_params or
1103
          opts.reserved_lvs is not None or
1104
          opts.master_netdev is not None or
1105
          opts.master_netmask is not None or
1106
          opts.use_external_mip_script is not None or
1107
          opts.prealloc_wipe_disks is not None or
1108
          opts.hv_state or
1109
          opts.enabled_disk_templates or
1110
          opts.disk_state or
1111
          opts.ipolicy_bounds_specs is not None or
1112
          opts.ipolicy_std_specs is not None or
1113
          opts.ipolicy_disk_templates is not None or
1114
          opts.ipolicy_vcpu_ratio is not None or
1115
          opts.ipolicy_spindle_ratio is not None or
1116
          opts.modify_etc_hosts is not None or
1117
          opts.file_storage_dir is not None):
1118
    ToStderr("Please give at least one of the parameters.")
1119
    return 1
1120

    
1121
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1122
  vg_name = _GetVgName(opts, enabled_disk_templates)
1123

    
1124
  try:
1125
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1126
  except errors.OpPrereqError, e:
1127
    ToStderr(str(e))
1128
    return 1
1129

    
1130
  hvlist = opts.enabled_hypervisors
1131
  if hvlist is not None:
1132
    hvlist = hvlist.split(",")
1133

    
1134
  # a list of (name, dict) we can pass directly to dict() (or [])
1135
  hvparams = dict(opts.hvparams)
1136
  for hv_params in hvparams.values():
1137
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1138

    
1139
  diskparams = dict(opts.diskparams)
1140

    
1141
  for dt_params in diskparams.values():
1142
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1143

    
1144
  beparams = opts.beparams
1145
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1146

    
1147
  nicparams = opts.nicparams
1148
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1149

    
1150
  ndparams = opts.ndparams
1151
  if ndparams is not None:
1152
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1153

    
1154
  ipolicy = CreateIPolicyFromOpts(
1155
    minmax_ispecs=opts.ipolicy_bounds_specs,
1156
    std_ispecs=opts.ipolicy_std_specs,
1157
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1158
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1159
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1160
    )
1161

    
1162
  mnh = opts.maintain_node_health
1163

    
1164
  uid_pool = opts.uid_pool
1165
  if uid_pool is not None:
1166
    uid_pool = uidpool.ParseUidPool(uid_pool)
1167

    
1168
  add_uids = opts.add_uids
1169
  if add_uids is not None:
1170
    add_uids = uidpool.ParseUidPool(add_uids)
1171

    
1172
  remove_uids = opts.remove_uids
1173
  if remove_uids is not None:
1174
    remove_uids = uidpool.ParseUidPool(remove_uids)
1175

    
1176
  if opts.reserved_lvs is not None:
1177
    if opts.reserved_lvs == "":
1178
      opts.reserved_lvs = []
1179
    else:
1180
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1181

    
1182
  if opts.master_netmask is not None:
1183
    try:
1184
      opts.master_netmask = int(opts.master_netmask)
1185
    except ValueError:
1186
      ToStderr("The --master-netmask option expects an int parameter.")
1187
      return 1
1188

    
1189
  ext_ip_script = opts.use_external_mip_script
1190

    
1191
  if opts.disk_state:
1192
    disk_state = utils.FlatToDict(opts.disk_state)
1193
  else:
1194
    disk_state = {}
1195

    
1196
  hv_state = dict(opts.hv_state)
1197

    
1198
  op = opcodes.OpClusterSetParams(
1199
    vg_name=vg_name,
1200
    drbd_helper=drbd_helper,
1201
    enabled_hypervisors=hvlist,
1202
    hvparams=hvparams,
1203
    os_hvp=None,
1204
    beparams=beparams,
1205
    nicparams=nicparams,
1206
    ndparams=ndparams,
1207
    diskparams=diskparams,
1208
    ipolicy=ipolicy,
1209
    candidate_pool_size=opts.candidate_pool_size,
1210
    max_running_jobs=opts.max_running_jobs,
1211
    maintain_node_health=mnh,
1212
    modify_etc_hosts=opts.modify_etc_hosts,
1213
    uid_pool=uid_pool,
1214
    add_uids=add_uids,
1215
    remove_uids=remove_uids,
1216
    default_iallocator=opts.default_iallocator,
1217
    default_iallocator_params=opts.default_iallocator_params,
1218
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1219
    master_netdev=opts.master_netdev,
1220
    master_netmask=opts.master_netmask,
1221
    reserved_lvs=opts.reserved_lvs,
1222
    use_external_mip_script=ext_ip_script,
1223
    hv_state=hv_state,
1224
    disk_state=disk_state,
1225
    enabled_disk_templates=enabled_disk_templates,
1226
    force=opts.force,
1227
    file_storage_dir=opts.file_storage_dir,
1228
    )
1229
  SubmitOrSend(op, opts)
1230
  return 0
1231

    
1232

    
1233
def QueueOps(opts, args):
1234
  """Queue operations.
1235

1236
  @param opts: the command line options selected by the user
1237
  @type args: list
1238
  @param args: should contain only one element, the subcommand
1239
  @rtype: int
1240
  @return: the desired exit code
1241

1242
  """
1243
  command = args[0]
1244
  client = GetClient()
1245
  if command in ("drain", "undrain"):
1246
    drain_flag = command == "drain"
1247
    client.SetQueueDrainFlag(drain_flag)
1248
  elif command == "info":
1249
    result = client.QueryConfigValues(["drain_flag"])
1250
    if result[0]:
1251
      val = "set"
1252
    else:
1253
      val = "unset"
1254
    ToStdout("The drain flag is %s" % val)
1255
  else:
1256
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1257
                               errors.ECODE_INVAL)
1258

    
1259
  return 0
1260

    
1261

    
1262
def _ShowWatcherPause(until):
1263
  if until is None or until < time.time():
1264
    ToStdout("The watcher is not paused.")
1265
  else:
1266
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1267

    
1268

    
1269
def WatcherOps(opts, args):
1270
  """Watcher operations.
1271

1272
  @param opts: the command line options selected by the user
1273
  @type args: list
1274
  @param args: should contain only one element, the subcommand
1275
  @rtype: int
1276
  @return: the desired exit code
1277

1278
  """
1279
  command = args[0]
1280
  client = GetClient()
1281

    
1282
  if command == "continue":
1283
    client.SetWatcherPause(None)
1284
    ToStdout("The watcher is no longer paused.")
1285

    
1286
  elif command == "pause":
1287
    if len(args) < 2:
1288
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1289

    
1290
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1291
    _ShowWatcherPause(result)
1292

    
1293
  elif command == "info":
1294
    result = client.QueryConfigValues(["watcher_pause"])
1295
    _ShowWatcherPause(result[0])
1296

    
1297
  else:
1298
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1299
                               errors.ECODE_INVAL)
1300

    
1301
  return 0
1302

    
1303

    
1304
def _OobPower(opts, node_list, power):
1305
  """Puts the node in the list to desired power state.
1306

1307
  @param opts: The command line options selected by the user
1308
  @param node_list: The list of nodes to operate on
1309
  @param power: True if they should be powered on, False otherwise
1310
  @return: The success of the operation (none failed)
1311

1312
  """
1313
  if power:
1314
    command = constants.OOB_POWER_ON
1315
  else:
1316
    command = constants.OOB_POWER_OFF
1317

    
1318
  op = opcodes.OpOobCommand(node_names=node_list,
1319
                            command=command,
1320
                            ignore_status=True,
1321
                            timeout=opts.oob_timeout,
1322
                            power_delay=opts.power_delay)
1323
  result = SubmitOpCode(op, opts=opts)
1324
  errs = 0
1325
  for node_result in result:
1326
    (node_tuple, data_tuple) = node_result
1327
    (_, node_name) = node_tuple
1328
    (data_status, _) = data_tuple
1329
    if data_status != constants.RS_NORMAL:
1330
      assert data_status != constants.RS_UNAVAIL
1331
      errs += 1
1332
      ToStderr("There was a problem changing power for %s, please investigate",
1333
               node_name)
1334

    
1335
  if errs > 0:
1336
    return False
1337

    
1338
  return True
1339

    
1340

    
1341
def _InstanceStart(opts, inst_list, start, no_remember=False):
1342
  """Puts the instances in the list to desired state.
1343

1344
  @param opts: The command line options selected by the user
1345
  @param inst_list: The list of instances to operate on
1346
  @param start: True if they should be started, False for shutdown
1347
  @param no_remember: If the instance state should be remembered
1348
  @return: The success of the operation (none failed)
1349

1350
  """
1351
  if start:
1352
    opcls = opcodes.OpInstanceStartup
1353
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1354
  else:
1355
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1356
                           timeout=opts.shutdown_timeout,
1357
                           no_remember=no_remember)
1358
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1359

    
1360
  jex = JobExecutor(opts=opts)
1361

    
1362
  for inst in inst_list:
1363
    ToStdout("Submit %s of instance %s", text_submit, inst)
1364
    op = opcls(instance_name=inst)
1365
    jex.QueueJob(inst, op)
1366

    
1367
  results = jex.GetResults()
1368
  bad_cnt = len([1 for (success, _) in results if not success])
1369

    
1370
  if bad_cnt == 0:
1371
    ToStdout("All instances have been %s successfully", text_success)
1372
  else:
1373
    ToStderr("There were errors while %s instances:\n"
1374
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1375
             len(results))
1376
    return False
1377

    
1378
  return True
1379

    
1380

    
1381
class _RunWhenNodesReachableHelper:
1382
  """Helper class to make shared internal state sharing easier.
1383

1384
  @ivar success: Indicates if all action_cb calls were successful
1385

1386
  """
1387
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1388
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1389
    """Init the object.
1390

1391
    @param node_list: The list of nodes to be reachable
1392
    @param action_cb: Callback called when a new host is reachable
1393
    @type node2ip: dict
1394
    @param node2ip: Node to ip mapping
1395
    @param port: The port to use for the TCP ping
1396
    @param feedback_fn: The function used for feedback
1397
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1398
    @param _sleep_fn: Function to sleep (for unittest use only)
1399

1400
    """
1401
    self.down = set(node_list)
1402
    self.up = set()
1403
    self.node2ip = node2ip
1404
    self.success = True
1405
    self.action_cb = action_cb
1406
    self.port = port
1407
    self.feedback_fn = feedback_fn
1408
    self._ping_fn = _ping_fn
1409
    self._sleep_fn = _sleep_fn
1410

    
1411
  def __call__(self):
1412
    """When called we run action_cb.
1413

1414
    @raises utils.RetryAgain: When there are still down nodes
1415

1416
    """
1417
    if not self.action_cb(self.up):
1418
      self.success = False
1419

    
1420
    if self.down:
1421
      raise utils.RetryAgain()
1422
    else:
1423
      return self.success
1424

    
1425
  def Wait(self, secs):
1426
    """Checks if a host is up or waits remaining seconds.
1427

1428
    @param secs: The secs remaining
1429

1430
    """
1431
    start = time.time()
1432
    for node in self.down:
1433
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1434
                       live_port_needed=True):
1435
        self.feedback_fn("Node %s became available" % node)
1436
        self.up.add(node)
1437
        self.down -= self.up
1438
        # If we have a node available there is the possibility to run the
1439
        # action callback successfully, therefore we don't wait and return
1440
        return
1441

    
1442
    self._sleep_fn(max(0.0, start + secs - time.time()))
1443

    
1444

    
1445
def _RunWhenNodesReachable(node_list, action_cb, interval):
1446
  """Run action_cb when nodes become reachable.
1447

1448
  @param node_list: The list of nodes to be reachable
1449
  @param action_cb: Callback called when a new host is reachable
1450
  @param interval: The earliest time to retry
1451

1452
  """
1453
  client = GetClient()
1454
  cluster_info = client.QueryClusterInfo()
1455
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1456
    family = netutils.IPAddress.family
1457
  else:
1458
    family = netutils.IP6Address.family
1459

    
1460
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1461
                 for node in node_list)
1462

    
1463
  port = netutils.GetDaemonPort(constants.NODED)
1464
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1465
                                        ToStdout)
1466

    
1467
  try:
1468
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1469
                       wait_fn=helper.Wait)
1470
  except utils.RetryTimeout:
1471
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1472
             " again:\n  - %s", "  - ".join(helper.down))
1473
    return False
1474

    
1475

    
1476
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1477
                          _instance_start_fn=_InstanceStart):
1478
  """Start the instances conditional based on node_states.
1479

1480
  @param opts: The command line options selected by the user
1481
  @param inst_map: A dict of inst -> nodes mapping
1482
  @param nodes_online: A list of nodes online
1483
  @param _instance_start_fn: Callback to start instances (unittest use only)
1484
  @return: Success of the operation on all instances
1485

1486
  """
1487
  start_inst_list = []
1488
  for (inst, nodes) in inst_map.items():
1489
    if not (nodes - nodes_online):
1490
      # All nodes the instance lives on are back online
1491
      start_inst_list.append(inst)
1492

    
1493
  for inst in start_inst_list:
1494
    del inst_map[inst]
1495

    
1496
  if start_inst_list:
1497
    return _instance_start_fn(opts, start_inst_list, True)
1498

    
1499
  return True
1500

    
1501

    
1502
def _EpoOn(opts, full_node_list, node_list, inst_map):
1503
  """Does the actual power on.
1504

1505
  @param opts: The command line options selected by the user
1506
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1507
                         OOB)
1508
  @param node_list: The list of nodes to operate on (all need to support OOB)
1509
  @param inst_map: A dict of inst -> nodes mapping
1510
  @return: The desired exit status
1511

1512
  """
1513
  if node_list and not _OobPower(opts, node_list, False):
1514
    ToStderr("Not all nodes seem to get back up, investigate and start"
1515
             " manually if needed")
1516

    
1517
  # Wait for the nodes to be back up
1518
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1519

    
1520
  ToStdout("Waiting until all nodes are available again")
1521
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1522
    ToStderr("Please investigate and start stopped instances manually")
1523
    return constants.EXIT_FAILURE
1524

    
1525
  return constants.EXIT_SUCCESS
1526

    
1527

    
1528
def _EpoOff(opts, node_list, inst_map):
1529
  """Does the actual power off.
1530

1531
  @param opts: The command line options selected by the user
1532
  @param node_list: The list of nodes to operate on (all need to support OOB)
1533
  @param inst_map: A dict of inst -> nodes mapping
1534
  @return: The desired exit status
1535

1536
  """
1537
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1538
    ToStderr("Please investigate and stop instances manually before continuing")
1539
    return constants.EXIT_FAILURE
1540

    
1541
  if not node_list:
1542
    return constants.EXIT_SUCCESS
1543

    
1544
  if _OobPower(opts, node_list, False):
1545
    return constants.EXIT_SUCCESS
1546
  else:
1547
    return constants.EXIT_FAILURE
1548

    
1549

    
1550
def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1551
        _confirm_fn=ConfirmOperation,
1552
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1553
  """EPO operations.
1554

1555
  @param opts: the command line options selected by the user
1556
  @type args: list
1557
  @param args: should contain only one element, the subcommand
1558
  @rtype: int
1559
  @return: the desired exit code
1560

1561
  """
1562
  if opts.groups and opts.show_all:
1563
    _stderr_fn("Only one of --groups or --all are allowed")
1564
    return constants.EXIT_FAILURE
1565
  elif args and opts.show_all:
1566
    _stderr_fn("Arguments in combination with --all are not allowed")
1567
    return constants.EXIT_FAILURE
1568

    
1569
  if qcl is None:
1570
    # Query client
1571
    qcl = GetClient()
1572

    
1573
  if opts.groups:
1574
    node_query_list = \
1575
      itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1576
  else:
1577
    node_query_list = args
1578

    
1579
  result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1580
                                            "sinst_list", "powered", "offline"],
1581
                          False)
1582

    
1583
  all_nodes = map(compat.fst, result)
1584
  node_list = []
1585
  inst_map = {}
1586
  for (node, master, pinsts, sinsts, powered, offline) in result:
1587
    if not offline:
1588
      for inst in (pinsts + sinsts):
1589
        if inst in inst_map:
1590
          if not master:
1591
            inst_map[inst].add(node)
1592
        elif master:
1593
          inst_map[inst] = set()
1594
        else:
1595
          inst_map[inst] = set([node])
1596

    
1597
    if master and opts.on:
1598
      # We ignore the master for turning on the machines, in fact we are
1599
      # already operating on the master at this point :)
1600
      continue
1601
    elif master and not opts.show_all:
1602
      _stderr_fn("%s is the master node, please do a master-failover to another"
1603
                 " node not affected by the EPO or use --all if you intend to"
1604
                 " shutdown the whole cluster", node)
1605
      return constants.EXIT_FAILURE
1606
    elif powered is None:
1607
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1608
                 " handled in a fully automated manner", node)
1609
    elif powered == opts.on:
1610
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1611
    elif not offline or (offline and powered):
1612
      node_list.append(node)
1613

    
1614
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1615
    return constants.EXIT_FAILURE
1616

    
1617
  if opts.on:
1618
    return _on_fn(opts, all_nodes, node_list, inst_map)
1619
  else:
1620
    return _off_fn(opts, node_list, inst_map)
1621

    
1622

    
1623
def _GetCreateCommand(info):
1624
  buf = StringIO()
1625
  buf.write("gnt-cluster init")
1626
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1627
  buf.write(" ")
1628
  buf.write(info["name"])
1629
  return buf.getvalue()
1630

    
1631

    
1632
def ShowCreateCommand(opts, args):
1633
  """Shows the command that can be used to re-create the cluster.
1634

1635
  Currently it works only for ipolicy specs.
1636

1637
  """
1638
  cl = GetClient()
1639
  result = cl.QueryClusterInfo()
1640
  ToStdout(_GetCreateCommand(result))
1641

    
1642

    
1643
def _RunCommandAndReport(cmd):
1644
  """Run a command and report its output, iff it failed.
1645

1646
  @param cmd: the command to execute
1647
  @type cmd: list
1648
  @rtype: bool
1649
  @return: False, if the execution failed.
1650

1651
  """
1652
  result = utils.RunCmd(cmd)
1653
  if result.failed:
1654
    ToStderr("Command %s failed: %s; Output %s" %
1655
             (cmd, result.fail_reason, result.output))
1656
    return False
1657
  return True
1658

    
1659

    
1660
def _VerifyCommand(cmd):
1661
  """Verify that a given command succeeds on all online nodes.
1662

1663
  As this function is intended to run during upgrades, it
1664
  is implemented in such a way that it still works, if all Ganeti
1665
  daemons are down.
1666

1667
  @param cmd: the command to execute
1668
  @type cmd: list
1669
  @rtype: list
1670
  @return: the list of node names that are online where
1671
      the command failed.
1672

1673
  """
1674
  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1675

    
1676
  nodes = ssconf.SimpleStore().GetOnlineNodeList()
1677
  master_node = ssconf.SimpleStore().GetMasterNode()
1678
  cluster_name = ssconf.SimpleStore().GetClusterName()
1679

    
1680
  # If master node is in 'nodes', make sure master node is at list end
1681
  if master_node in nodes:
1682
    nodes.remove(master_node)
1683
    nodes.append(master_node)
1684

    
1685
  failed = []
1686

    
1687
  srun = ssh.SshRunner(cluster_name=cluster_name)
1688
  for name in nodes:
1689
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1690
    if result.exit_code != 0:
1691
      failed.append(name)
1692

    
1693
  return failed
1694

    
1695

    
1696
def _VerifyVersionInstalled(versionstring):
1697
  """Verify that the given version of ganeti is installed on all online nodes.
1698

1699
  Do nothing, if this is the case, otherwise print an appropriate
1700
  message to stderr.
1701

1702
  @param versionstring: the version to check for
1703
  @type versionstring: string
1704
  @rtype: bool
1705
  @return: True, if the version is installed on all online nodes
1706

1707
  """
1708
  badnodes = _VerifyCommand(["test", "-d",
1709
                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
1710
  if badnodes:
1711
    ToStderr("Ganeti version %s not installed on nodes %s"
1712
             % (versionstring, ", ".join(badnodes)))
1713
    return False
1714

    
1715
  return True
1716

    
1717

    
1718
def _GetRunning():
1719
  """Determine the list of running jobs.
1720

1721
  @rtype: list
1722
  @return: the number of jobs still running
1723

1724
  """
1725
  cl = GetClient()
1726
  qfilter = qlang.MakeSimpleFilter("status",
1727
                                   frozenset([constants.JOB_STATUS_RUNNING]))
1728
  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1729

    
1730

    
1731
def _SetGanetiVersion(versionstring):
1732
  """Set the active version of ganeti to the given versionstring
1733

1734
  @type versionstring: string
1735
  @rtype: list
1736
  @return: the list of nodes where the version change failed
1737

1738
  """
1739
  failed = []
1740
  if constants.HAS_GNU_LN:
1741
    failed.extend(_VerifyCommand(
1742
        ["ln", "-s", "-f", "-T",
1743
         os.path.join(pathutils.PKGLIBDIR, versionstring),
1744
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1745
    failed.extend(_VerifyCommand(
1746
        ["ln", "-s", "-f", "-T",
1747
         os.path.join(pathutils.SHAREDIR, versionstring),
1748
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1749
  else:
1750
    failed.extend(_VerifyCommand(
1751
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1752
    failed.extend(_VerifyCommand(
1753
        ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1754
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1755
    failed.extend(_VerifyCommand(
1756
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1757
    failed.extend(_VerifyCommand(
1758
        ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1759
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1760
  return list(set(failed))
1761

    
1762

    
1763
def _ExecuteCommands(fns):
1764
  """Execute a list of functions, in reverse order.
1765

1766
  @type fns: list of functions.
1767
  @param fns: the functions to be executed.
1768

1769
  """
1770
  for fn in reversed(fns):
1771
    fn()
1772

    
1773

    
1774
def _GetConfigVersion():
1775
  """Determine the version the configuration file currently has.
1776

1777
  @rtype: tuple or None
1778
  @return: (major, minor, revision) if the version can be determined,
1779
      None otherwise
1780

1781
  """
1782
  config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1783
  try:
1784
    config_version = config_data["version"]
1785
  except KeyError:
1786
    return None
1787
  return utils.SplitVersion(config_version)
1788

    
1789

    
1790
def _ReadIntentToUpgrade():
1791
  """Read the file documenting the intent to upgrade the cluster.
1792

1793
  @rtype: (string, string) or (None, None)
1794
  @return: (old version, version to upgrade to), if the file exists,
1795
      and (None, None) otherwise.
1796

1797
  """
1798
  if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1799
    return (None, None)
1800

    
1801
  contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1802
  contents = utils.UnescapeAndSplit(contentstring)
1803
  if len(contents) != 3:
1804
    # file syntactically mal-formed
1805
    return (None, None)
1806
  return (contents[0], contents[1])
1807

    
1808

    
1809
def _WriteIntentToUpgrade(version):
1810
  """Write file documenting the intent to upgrade the cluster.
1811

1812
  @type version: string
1813
  @param version: the version we intent to upgrade to
1814

1815
  """
1816
  utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1817
                  data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1818
                                            "%d" % os.getpid()]))
1819

    
1820

    
1821
def _UpgradeBeforeConfigurationChange(versionstring):
1822
  """
1823
  Carry out all the tasks necessary for an upgrade that happen before
1824
  the configuration file, or Ganeti version, changes.
1825

1826
  @type versionstring: string
1827
  @param versionstring: the version to upgrade to
1828
  @rtype: (bool, list)
1829
  @return: tuple of a bool indicating success and a list of rollback tasks
1830

1831
  """
1832
  rollback = []
1833

    
1834
  if not _VerifyVersionInstalled(versionstring):
1835
    return (False, rollback)
1836

    
1837
  _WriteIntentToUpgrade(versionstring)
1838
  rollback.append(
1839
    lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1840

    
1841
  ToStdout("Draining queue")
1842
  client = GetClient()
1843
  client.SetQueueDrainFlag(True)
1844

    
1845
  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1846

    
1847
  if utils.SimpleRetry(0, _GetRunning,
1848
                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
1849
                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1850
    ToStderr("Failed to completely empty the queue.")
1851
    return (False, rollback)
1852

    
1853
  ToStdout("Stopping daemons on master node.")
1854
  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1855
    return (False, rollback)
1856

    
1857
  if not _VerifyVersionInstalled(versionstring):
1858
    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1859
    return (False, rollback)
1860

    
1861
  ToStdout("Stopping daemons everywhere.")
1862
  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1863
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1864
  if badnodes:
1865
    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
1866
    return (False, rollback)
1867

    
1868
  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
1869
                           "lib/ganeti%d.tar" % time.time())
1870
  ToStdout("Backing up configuration as %s" % backuptar)
1871
  if not _RunCommandAndReport(["tar", "cf", backuptar,
1872
                               pathutils.DATA_DIR]):
1873
    return (False, rollback)
1874

    
1875
  return (True, rollback)
1876

    
1877

    
1878
def _VersionSpecificDowngrade():
1879
  """
1880
  Perform any additional downrade tasks that are version specific
1881
  and need to be done just after the configuration downgrade. This
1882
  function needs to be idempotent, so that it can be redone if the
1883
  downgrade procedure gets interrupted after changing the
1884
  configuration.
1885

1886
  Note that this function has to be reset with every version bump.
1887

1888
  @return: True upon success
1889
  """
1890
  ToStdout("Performing version-specific downgrade tasks.")
1891
  return True
1892

    
1893

    
1894
def _SwitchVersionAndConfig(versionstring, downgrade):
1895
  """
1896
  Switch to the new Ganeti version and change the configuration,
1897
  in correct order.
1898

1899
  @type versionstring: string
1900
  @param versionstring: the version to change to
1901
  @type downgrade: bool
1902
  @param downgrade: True, if the configuration should be downgraded
1903
  @rtype: (bool, list)
1904
  @return: tupe of a bool indicating success, and a list of
1905
      additional rollback tasks
1906

1907
  """
1908
  rollback = []
1909
  if downgrade:
1910
    ToStdout("Downgrading configuration")
1911
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
1912
      return (False, rollback)
1913
    # Note: version specific downgrades need to be done before switching
1914
    # binaries, so that we still have the knowledgeable binary if the downgrade
1915
    # process gets interrupted at this point.
1916
    if not _VersionSpecificDowngrade():
1917
      return (False, rollback)
1918

    
1919
  # Configuration change is the point of no return. From then onwards, it is
1920
  # safer to push through the up/dowgrade than to try to roll it back.
1921

    
1922
  ToStdout("Switching to version %s on all nodes" % versionstring)
1923
  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
1924
  badnodes = _SetGanetiVersion(versionstring)
1925
  if badnodes:
1926
    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
1927
             % (versionstring, ", ".join(badnodes)))
1928
    if not downgrade:
1929
      return (False, rollback)
1930

    
1931
  # Now that we have changed to the new version of Ganeti we should
1932
  # not communicate over luxi any more, as luxi might have changed in
1933
  # incompatible ways. Therefore, manually call the corresponding ganeti
1934
  # commands using their canonical (version independent) path.
1935

    
1936
  if not downgrade:
1937
    ToStdout("Upgrading configuration")
1938
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
1939
      return (False, rollback)
1940

    
1941
  return (True, rollback)
1942

    
1943

    
1944
def _UpgradeAfterConfigurationChange(oldversion):
1945
  """
1946
  Carry out the upgrade actions necessary after switching to the new
1947
  Ganeti version and updating the configuration.
1948

1949
  As this part is run at a time where the new version of Ganeti is already
1950
  running, no communication should happen via luxi, as this is not a stable
1951
  interface. Also, as the configuration change is the point of no return,
1952
  all actions are pushed trough, even if some of them fail.
1953

1954
  @param oldversion: the version the upgrade started from
1955
  @type oldversion: string
1956
  @rtype: int
1957
  @return: the intended return value
1958

1959
  """
1960
  returnvalue = 0
1961

    
1962
  ToStdout("Ensuring directories everywhere.")
1963
  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
1964
  if badnodes:
1965
    ToStderr("Warning: failed to ensure directories on %s." %
1966
             (", ".join(badnodes)))
1967
    returnvalue = 1
1968

    
1969
  ToStdout("Starting daemons everywhere.")
1970
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1971
  if badnodes:
1972
    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
1973
    returnvalue = 1
1974

    
1975
  ToStdout("Redistributing the configuration.")
1976
  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
1977
    returnvalue = 1
1978

    
1979
  ToStdout("Restarting daemons everywhere.")
1980
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1981
  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1982
  if badnodes:
1983
    ToStderr("Warning: failed to start daemons on %s." %
1984
             (", ".join(list(set(badnodes))),))
1985
    returnvalue = 1
1986

    
1987
  ToStdout("Undraining the queue.")
1988
  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
1989
    returnvalue = 1
1990

    
1991
  _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
1992

    
1993
  ToStdout("Running post-upgrade hooks")
1994
  if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
1995
    returnvalue = 1
1996

    
1997
  ToStdout("Verifying cluster.")
1998
  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
1999
    returnvalue = 1
2000

    
2001
  return returnvalue
2002

    
2003

    
2004
def UpgradeGanetiCommand(opts, args):
2005
  """Upgrade a cluster to a new ganeti version.
2006

2007
  @param opts: the command line options selected by the user
2008
  @type args: list
2009
  @param args: should be an empty list
2010
  @rtype: int
2011
  @return: the desired exit code
2012

2013
  """
2014
  if ((not opts.resume and opts.to is None)
2015
      or (opts.resume and opts.to is not None)):
2016
    ToStderr("Precisely one of the options --to and --resume"
2017
             " has to be given")
2018
    return 1
2019

    
2020
  oldversion = constants.RELEASE_VERSION
2021

    
2022
  if opts.resume:
2023
    ssconf.CheckMaster(False)
2024
    oldversion, versionstring = _ReadIntentToUpgrade()
2025
    if versionstring is None:
2026
      return 0
2027
    version = utils.version.ParseVersion(versionstring)
2028
    if version is None:
2029
      return 1
2030
    configversion = _GetConfigVersion()
2031
    if configversion is None:
2032
      return 1
2033
    # If the upgrade we resume was an upgrade between compatible
2034
    # versions (like 2.10.0 to 2.10.1), the correct configversion
2035
    # does not guarantee that the config has been updated.
2036
    # However, in the case of a compatible update with the configuration
2037
    # not touched, we are running a different dirversion with the same
2038
    # config version.
2039
    config_already_modified = \
2040
      (utils.IsCorrectConfigVersion(version, configversion) and
2041
       not (versionstring != constants.DIR_VERSION and
2042
            configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2043
                              constants.CONFIG_REVISION)))
2044
    if not config_already_modified:
2045
      # We have to start from the beginning; however, some daemons might have
2046
      # already been stopped, so the only way to get into a well-defined state
2047
      # is by starting all daemons again.
2048
      _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2049
  else:
2050
    versionstring = opts.to
2051
    config_already_modified = False
2052
    version = utils.version.ParseVersion(versionstring)
2053
    if version is None:
2054
      ToStderr("Could not parse version string %s" % versionstring)
2055
      return 1
2056

    
2057
  msg = utils.version.UpgradeRange(version)
2058
  if msg is not None:
2059
    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2060
    return 1
2061

    
2062
  if not config_already_modified:
2063
    success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2064
    if not success:
2065
      _ExecuteCommands(rollback)
2066
      return 1
2067
  else:
2068
    rollback = []
2069

    
2070
  downgrade = utils.version.ShouldCfgdowngrade(version)
2071

    
2072
  success, additionalrollback =  \
2073
      _SwitchVersionAndConfig(versionstring, downgrade)
2074
  if not success:
2075
    rollback.extend(additionalrollback)
2076
    _ExecuteCommands(rollback)
2077
    return 1
2078

    
2079
  return _UpgradeAfterConfigurationChange(oldversion)
2080

    
2081

    
2082
commands = {
2083
  "init": (
2084
    InitCluster, [ArgHost(min=1, max=1)],
2085
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2086
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2087
     NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2088
     SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2089
     DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2090
     PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2091
     GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2092
     HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2093
     IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT]
2094
     + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2095
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2096
  "destroy": (
2097
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2098
    "", "Destroy cluster"),
2099
  "rename": (
2100
    RenameCluster, [ArgHost(min=1, max=1)],
2101
    [FORCE_OPT, DRY_RUN_OPT],
2102
    "<new_name>",
2103
    "Renames the cluster"),
2104
  "redist-conf": (
2105
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2106
    [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2107
    "", "Forces a push of the configuration file and ssconf files"
2108
    " to the nodes in the cluster"),
2109
  "verify": (
2110
    VerifyCluster, ARGS_NONE,
2111
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2112
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2113
    "", "Does a check on the cluster configuration"),
2114
  "verify-disks": (
2115
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2116
    "", "Does a check on the cluster disk status"),
2117
  "repair-disk-sizes": (
2118
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2119
    "[instance...]", "Updates mismatches in recorded disk sizes"),
2120
  "master-failover": (
2121
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2122
    "", "Makes the current node the master"),
2123
  "master-ping": (
2124
    MasterPing, ARGS_NONE, [],
2125
    "", "Checks if the master is alive"),
2126
  "version": (
2127
    ShowClusterVersion, ARGS_NONE, [],
2128
    "", "Shows the cluster version"),
2129
  "getmaster": (
2130
    ShowClusterMaster, ARGS_NONE, [],
2131
    "", "Shows the cluster master"),
2132
  "copyfile": (
2133
    ClusterCopyFile, [ArgFile(min=1, max=1)],
2134
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2135
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2136
  "command": (
2137
    RunClusterCommand, [ArgCommand(min=1)],
2138
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2139
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2140
  "info": (
2141
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2142
    "[--roman]", "Show cluster configuration"),
2143
  "list-tags": (
2144
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2145
  "add-tags": (
2146
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2147
    "tag...", "Add tags to the cluster"),
2148
  "remove-tags": (
2149
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2150
    "tag...", "Remove tags from the cluster"),
2151
  "search-tags": (
2152
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2153
    "Searches the tags on all objects on"
2154
    " the cluster for a given pattern (regex)"),
2155
  "queue": (
2156
    QueueOps,
2157
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2158
    [], "drain|undrain|info", "Change queue properties"),
2159
  "watcher": (
2160
    WatcherOps,
2161
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2162
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2163
    [],
2164
    "{pause <timespec>|continue|info}", "Change watcher properties"),
2165
  "modify": (
2166
    SetClusterParams, ARGS_NONE,
2167
    [FORCE_OPT,
2168
     BACKEND_OPT, CP_SIZE_OPT, RQL_OPT,
2169
     ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
2170
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT,
2171
     UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT, DRBD_HELPER_OPT,
2172
     DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT,
2173
     DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2174
     USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] +
2175
     SUBMIT_OPTS +
2176
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
2177
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
2178
    "[opts...]",
2179
    "Alters the parameters of the cluster"),
2180
  "renew-crypto": (
2181
    RenewCrypto, ARGS_NONE,
2182
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2183
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2184
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2185
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2186
     NEW_NODE_CERT_OPT],
2187
    "[opts...]",
2188
    "Renews cluster certificates, keys and secrets"),
2189
  "epo": (
2190
    Epo, [ArgUnknown()],
2191
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2192
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2193
    "[opts...] [args]",
2194
    "Performs an emergency power-off on given args"),
2195
  "activate-master-ip": (
2196
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2197
  "deactivate-master-ip": (
2198
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2199
    "Deactivates the master IP"),
2200
  "show-ispecs-cmd": (
2201
    ShowCreateCommand, ARGS_NONE, [], "",
2202
    "Show the command line to re-create the cluster"),
2203
  "upgrade": (
2204
    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2205
    "Upgrade (or downgrade) to a new Ganeti version"),
2206
  }
2207

    
2208

    
2209
#: dictionary with aliases for commands
2210
aliases = {
2211
  "masterfailover": "master-failover",
2212
  "show": "info",
2213
}
2214

    
2215

    
2216
def Main():
2217
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2218
                     aliases=aliases)