Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 178ad717

History | View | Annotate | Download (71.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import ssconf
47
from ganeti import pathutils
48
from ganeti import serializer
49
from ganeti import qlang
50

    
51

    
52
ON_OPT = cli_option("--on", default=False,
53
                    action="store_true", dest="on",
54
                    help="Recover from an EPO")
55

    
56
GROUPS_OPT = cli_option("--groups", default=False,
57
                        action="store_true", dest="groups",
58
                        help="Arguments are node groups instead of nodes")
59

    
60
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
61
                            help="Override interactive check for --no-voting",
62
                            default=False, action="store_true")
63

    
64
FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
65
                                help="Unconditionally distribute the"
66
                                " configuration, even if the queue"
67
                                " is drained",
68
                                default=False, action="store_true")
69

    
70
TO_OPT = cli_option("--to", default=None, type="string",
71
                    help="The Ganeti version to upgrade to")
72

    
73
RESUME_OPT = cli_option("--resume", default=False, action="store_true",
74
                        help="Resume any pending Ganeti upgrades")
75

    
76
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
77
_EPO_PING_TIMEOUT = 1 # 1 second
78
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
79

    
80

    
81
def _InitEnabledDiskTemplates(opts):
82
  """Initialize the list of enabled disk templates.
83

84
  """
85
  if opts.enabled_disk_templates:
86
    return opts.enabled_disk_templates.split(",")
87
  else:
88
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
89

    
90

    
91
def _InitVgName(opts, enabled_disk_templates):
92
  """Initialize the volume group name.
93

94
  @type enabled_disk_templates: list of strings
95
  @param enabled_disk_templates: cluster-wide enabled disk templates
96

97
  """
98
  vg_name = None
99
  if opts.vg_name is not None:
100
    vg_name = opts.vg_name
101
    if vg_name:
102
      if not utils.IsLvmEnabled(enabled_disk_templates):
103
        ToStdout("You specified a volume group with --vg-name, but you did not"
104
                 " enable any disk template that uses lvm.")
105
    elif utils.IsLvmEnabled(enabled_disk_templates):
106
      raise errors.OpPrereqError(
107
          "LVM disk templates are enabled, but vg name not set.")
108
  elif utils.IsLvmEnabled(enabled_disk_templates):
109
    vg_name = constants.DEFAULT_VG
110
  return vg_name
111

    
112

    
113
def _InitDrbdHelper(opts, enabled_disk_templates):
114
  """Initialize the DRBD usermode helper.
115

116
  """
117
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
118

    
119
  if not drbd_enabled and opts.drbd_helper is not None:
120
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
121
             " is not enabled.")
122

    
123
  if drbd_enabled:
124
    if opts.drbd_helper is None:
125
      return constants.DEFAULT_DRBD_HELPER
126
    if opts.drbd_helper == '':
127
      raise errors.OpPrereqError(
128
          "Unsetting the drbd usermode helper while enabling DRBD is not"
129
          " allowed.")
130

    
131
  return opts.drbd_helper
132

    
133

    
134
@UsesRPC
135
def InitCluster(opts, args):
136
  """Initialize the cluster.
137

138
  @param opts: the command line options selected by the user
139
  @type args: list
140
  @param args: should contain only one element, the desired
141
      cluster name
142
  @rtype: int
143
  @return: the desired exit code
144

145
  """
146
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
147

    
148
  try:
149
    vg_name = _InitVgName(opts, enabled_disk_templates)
150
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
151
  except errors.OpPrereqError, e:
152
    ToStderr(str(e))
153
    return 1
154

    
155
  master_netdev = opts.master_netdev
156
  if master_netdev is None:
157
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
158
    if not nic_mode:
159
      # default case, use bridging
160
      master_netdev = constants.DEFAULT_BRIDGE
161
    elif nic_mode == constants.NIC_MODE_OVS:
162
      # default ovs is different from default bridge
163
      master_netdev = constants.DEFAULT_OVS
164
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
165

    
166
  hvlist = opts.enabled_hypervisors
167
  if hvlist is None:
168
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
169
  hvlist = hvlist.split(",")
170

    
171
  hvparams = dict(opts.hvparams)
172
  beparams = opts.beparams
173
  nicparams = opts.nicparams
174

    
175
  diskparams = dict(opts.diskparams)
176

    
177
  # check the disk template types here, as we cannot rely on the type check done
178
  # by the opcode parameter types
179
  diskparams_keys = set(diskparams.keys())
180
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
181
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
182
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
183
    return 1
184

    
185
  # prepare beparams dict
186
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
187
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
188

    
189
  # prepare nicparams dict
190
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
191
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
192

    
193
  # prepare ndparams dict
194
  if opts.ndparams is None:
195
    ndparams = dict(constants.NDC_DEFAULTS)
196
  else:
197
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
198
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
199

    
200
  # prepare hvparams dict
201
  for hv in constants.HYPER_TYPES:
202
    if hv not in hvparams:
203
      hvparams[hv] = {}
204
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
205
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
206

    
207
  # prepare diskparams dict
208
  for templ in constants.DISK_TEMPLATES:
209
    if templ not in diskparams:
210
      diskparams[templ] = {}
211
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
212
                                         diskparams[templ])
213
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
214

    
215
  # prepare ipolicy dict
216
  ipolicy = CreateIPolicyFromOpts(
217
    ispecs_mem_size=opts.ispecs_mem_size,
218
    ispecs_cpu_count=opts.ispecs_cpu_count,
219
    ispecs_disk_count=opts.ispecs_disk_count,
220
    ispecs_disk_size=opts.ispecs_disk_size,
221
    ispecs_nic_count=opts.ispecs_nic_count,
222
    minmax_ispecs=opts.ipolicy_bounds_specs,
223
    std_ispecs=opts.ipolicy_std_specs,
224
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
225
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
226
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
227
    fill_all=True)
228

    
229
  if opts.candidate_pool_size is None:
230
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
231

    
232
  if opts.mac_prefix is None:
233
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
234

    
235
  uid_pool = opts.uid_pool
236
  if uid_pool is not None:
237
    uid_pool = uidpool.ParseUidPool(uid_pool)
238

    
239
  if opts.prealloc_wipe_disks is None:
240
    opts.prealloc_wipe_disks = False
241

    
242
  external_ip_setup_script = opts.use_external_mip_script
243
  if external_ip_setup_script is None:
244
    external_ip_setup_script = False
245

    
246
  try:
247
    primary_ip_version = int(opts.primary_ip_version)
248
  except (ValueError, TypeError), err:
249
    ToStderr("Invalid primary ip version value: %s" % str(err))
250
    return 1
251

    
252
  master_netmask = opts.master_netmask
253
  try:
254
    if master_netmask is not None:
255
      master_netmask = int(master_netmask)
256
  except (ValueError, TypeError), err:
257
    ToStderr("Invalid master netmask value: %s" % str(err))
258
    return 1
259

    
260
  if opts.disk_state:
261
    disk_state = utils.FlatToDict(opts.disk_state)
262
  else:
263
    disk_state = {}
264

    
265
  hv_state = dict(opts.hv_state)
266

    
267
  default_ialloc_params = opts.default_iallocator_params
268
  bootstrap.InitCluster(cluster_name=args[0],
269
                        secondary_ip=opts.secondary_ip,
270
                        vg_name=vg_name,
271
                        mac_prefix=opts.mac_prefix,
272
                        master_netmask=master_netmask,
273
                        master_netdev=master_netdev,
274
                        file_storage_dir=opts.file_storage_dir,
275
                        shared_file_storage_dir=opts.shared_file_storage_dir,
276
                        gluster_storage_dir=opts.gluster_storage_dir,
277
                        enabled_hypervisors=hvlist,
278
                        hvparams=hvparams,
279
                        beparams=beparams,
280
                        nicparams=nicparams,
281
                        ndparams=ndparams,
282
                        diskparams=diskparams,
283
                        ipolicy=ipolicy,
284
                        candidate_pool_size=opts.candidate_pool_size,
285
                        modify_etc_hosts=opts.modify_etc_hosts,
286
                        modify_ssh_setup=opts.modify_ssh_setup,
287
                        maintain_node_health=opts.maintain_node_health,
288
                        drbd_helper=drbd_helper,
289
                        uid_pool=uid_pool,
290
                        default_iallocator=opts.default_iallocator,
291
                        default_iallocator_params=default_ialloc_params,
292
                        primary_ip_version=primary_ip_version,
293
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
294
                        use_external_mip_script=external_ip_setup_script,
295
                        hv_state=hv_state,
296
                        disk_state=disk_state,
297
                        enabled_disk_templates=enabled_disk_templates,
298
                        )
299
  op = opcodes.OpClusterPostInit()
300
  SubmitOpCode(op, opts=opts)
301
  return 0
302

    
303

    
304
@UsesRPC
305
def DestroyCluster(opts, args):
306
  """Destroy the cluster.
307

308
  @param opts: the command line options selected by the user
309
  @type args: list
310
  @param args: should be an empty list
311
  @rtype: int
312
  @return: the desired exit code
313

314
  """
315
  if not opts.yes_do_it:
316
    ToStderr("Destroying a cluster is irreversible. If you really want"
317
             " destroy this cluster, supply the --yes-do-it option.")
318
    return 1
319

    
320
  op = opcodes.OpClusterDestroy()
321
  master_uuid = SubmitOpCode(op, opts=opts)
322
  # if we reached this, the opcode didn't fail; we can proceed to
323
  # shutdown all the daemons
324
  bootstrap.FinalizeClusterDestroy(master_uuid)
325
  return 0
326

    
327

    
328
def RenameCluster(opts, args):
329
  """Rename the cluster.
330

331
  @param opts: the command line options selected by the user
332
  @type args: list
333
  @param args: should contain only one element, the new cluster name
334
  @rtype: int
335
  @return: the desired exit code
336

337
  """
338
  cl = GetClient()
339

    
340
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
341

    
342
  new_name = args[0]
343
  if not opts.force:
344
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
345
                " connected over the network to the cluster name, the"
346
                " operation is very dangerous as the IP address will be"
347
                " removed from the node and the change may not go through."
348
                " Continue?") % (cluster_name, new_name)
349
    if not AskUser(usertext):
350
      return 1
351

    
352
  op = opcodes.OpClusterRename(name=new_name)
353
  result = SubmitOpCode(op, opts=opts, cl=cl)
354

    
355
  if result:
356
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
357

    
358
  return 0
359

    
360

    
361
def ActivateMasterIp(opts, args):
362
  """Activates the master IP.
363

364
  """
365
  op = opcodes.OpClusterActivateMasterIp()
366
  SubmitOpCode(op)
367
  return 0
368

    
369

    
370
def DeactivateMasterIp(opts, args):
371
  """Deactivates the master IP.
372

373
  """
374
  if not opts.confirm:
375
    usertext = ("This will disable the master IP. All the open connections to"
376
                " the master IP will be closed. To reach the master you will"
377
                " need to use its node IP."
378
                " Continue?")
379
    if not AskUser(usertext):
380
      return 1
381

    
382
  op = opcodes.OpClusterDeactivateMasterIp()
383
  SubmitOpCode(op)
384
  return 0
385

    
386

    
387
def RedistributeConfig(opts, args):
388
  """Forces push of the cluster configuration.
389

390
  @param opts: the command line options selected by the user
391
  @type args: list
392
  @param args: empty list
393
  @rtype: int
394
  @return: the desired exit code
395

396
  """
397
  op = opcodes.OpClusterRedistConf()
398
  if opts.yes_do_it:
399
    SubmitOpCodeToDrainedQueue(op)
400
  else:
401
    SubmitOrSend(op, opts)
402
  return 0
403

    
404

    
405
def ShowClusterVersion(opts, args):
406
  """Write version of ganeti software to the standard output.
407

408
  @param opts: the command line options selected by the user
409
  @type args: list
410
  @param args: should be an empty list
411
  @rtype: int
412
  @return: the desired exit code
413

414
  """
415
  cl = GetClient(query=True)
416
  result = cl.QueryClusterInfo()
417
  ToStdout("Software version: %s", result["software_version"])
418
  ToStdout("Internode protocol: %s", result["protocol_version"])
419
  ToStdout("Configuration format: %s", result["config_version"])
420
  ToStdout("OS api version: %s", result["os_api_version"])
421
  ToStdout("Export interface: %s", result["export_version"])
422
  ToStdout("VCS version: %s", result["vcs_version"])
423
  return 0
424

    
425

    
426
def ShowClusterMaster(opts, args):
427
  """Write name of master node to the standard output.
428

429
  @param opts: the command line options selected by the user
430
  @type args: list
431
  @param args: should be an empty list
432
  @rtype: int
433
  @return: the desired exit code
434

435
  """
436
  master = bootstrap.GetMaster()
437
  ToStdout(master)
438
  return 0
439

    
440

    
441
def _FormatGroupedParams(paramsdict, roman=False):
442
  """Format Grouped parameters (be, nic, disk) by group.
443

444
  @type paramsdict: dict of dicts
445
  @param paramsdict: {group: {param: value, ...}, ...}
446
  @rtype: dict of dicts
447
  @return: copy of the input dictionaries with strings as values
448

449
  """
450
  ret = {}
451
  for (item, val) in paramsdict.items():
452
    if isinstance(val, dict):
453
      ret[item] = _FormatGroupedParams(val, roman=roman)
454
    elif roman and isinstance(val, int):
455
      ret[item] = compat.TryToRoman(val)
456
    else:
457
      ret[item] = str(val)
458
  return ret
459

    
460

    
461
def ShowClusterConfig(opts, args):
462
  """Shows cluster information.
463

464
  @param opts: the command line options selected by the user
465
  @type args: list
466
  @param args: should be an empty list
467
  @rtype: int
468
  @return: the desired exit code
469

470
  """
471
  cl = GetClient(query=True)
472
  result = cl.QueryClusterInfo()
473

    
474
  if result["tags"]:
475
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
476
  else:
477
    tags = "(none)"
478
  if result["reserved_lvs"]:
479
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
480
  else:
481
    reserved_lvs = "(none)"
482

    
483
  enabled_hv = result["enabled_hypervisors"]
484
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
485
                  if k in enabled_hv)
486

    
487
  info = [
488
    ("Cluster name", result["name"]),
489
    ("Cluster UUID", result["uuid"]),
490

    
491
    ("Creation time", utils.FormatTime(result["ctime"])),
492
    ("Modification time", utils.FormatTime(result["mtime"])),
493

    
494
    ("Master node", result["master"]),
495

    
496
    ("Architecture (this node)",
497
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
498

    
499
    ("Tags", tags),
500

    
501
    ("Default hypervisor", result["default_hypervisor"]),
502
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
503

    
504
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
505

    
506
    ("OS-specific hypervisor parameters",
507
     _FormatGroupedParams(result["os_hvp"])),
508

    
509
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
510

    
511
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
512
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
513

    
514
    ("Cluster parameters", [
515
      ("candidate pool size",
516
       compat.TryToRoman(result["candidate_pool_size"],
517
                         convert=opts.roman_integers)),
518
      ("maximal number of jobs running simultaneously",
519
       compat.TryToRoman(result["max_running_jobs"],
520
                         convert=opts.roman_integers)),
521
      ("master netdev", result["master_netdev"]),
522
      ("master netmask", result["master_netmask"]),
523
      ("use external master IP address setup script",
524
       result["use_external_mip_script"]),
525
      ("lvm volume group", result["volume_group_name"]),
526
      ("lvm reserved volumes", reserved_lvs),
527
      ("drbd usermode helper", result["drbd_usermode_helper"]),
528
      ("file storage path", result["file_storage_dir"]),
529
      ("shared file storage path", result["shared_file_storage_dir"]),
530
      ("gluster storage path", result["gluster_storage_dir"]),
531
      ("maintenance of node health", result["maintain_node_health"]),
532
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
533
      ("default instance allocator", result["default_iallocator"]),
534
      ("default instance allocator parameters",
535
       result["default_iallocator_params"]),
536
      ("primary ip version", result["primary_ip_version"]),
537
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
538
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
539
      ("ExtStorage Providers search path",
540
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
541
      ("enabled disk templates",
542
       utils.CommaJoin(result["enabled_disk_templates"])),
543
      ]),
544

    
545
    ("Default node parameters",
546
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
547

    
548
    ("Default instance parameters",
549
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
550

    
551
    ("Default nic parameters",
552
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
553

    
554
    ("Default disk parameters",
555
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
556

    
557
    ("Instance policy - limits for instances",
558
     FormatPolicyInfo(result["ipolicy"], None, True)),
559
    ]
560

    
561
  PrintGenericInfo(info)
562
  return 0
563

    
564

    
565
def ClusterCopyFile(opts, args):
566
  """Copy a file from master to some nodes.
567

568
  @param opts: the command line options selected by the user
569
  @type args: list
570
  @param args: should contain only one element, the path of
571
      the file to be copied
572
  @rtype: int
573
  @return: the desired exit code
574

575
  """
576
  filename = args[0]
577
  if not os.path.exists(filename):
578
    raise errors.OpPrereqError("No such filename '%s'" % filename,
579
                               errors.ECODE_INVAL)
580

    
581
  cl = GetClient()
582
  qcl = GetClient(query=True)
583
  try:
584
    cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
585

    
586
    results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
587
                             secondary_ips=opts.use_replication_network,
588
                             nodegroup=opts.nodegroup)
589
    ports = GetNodesSshPorts(opts.nodes, qcl)
590
  finally:
591
    cl.Close()
592
    qcl.Close()
593

    
594
  srun = ssh.SshRunner(cluster_name)
595
  for (node, port) in zip(results, ports):
596
    if not srun.CopyFileToNode(node, port, filename):
597
      ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
598

    
599
  return 0
600

    
601

    
602
def RunClusterCommand(opts, args):
603
  """Run a command on some nodes.
604

605
  @param opts: the command line options selected by the user
606
  @type args: list
607
  @param args: should contain the command to be run and its arguments
608
  @rtype: int
609
  @return: the desired exit code
610

611
  """
612
  cl = GetClient()
613
  qcl = GetClient(query=True)
614

    
615
  command = " ".join(args)
616

    
617
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
618
  ports = GetNodesSshPorts(nodes, qcl)
619

    
620
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
621
                                                    "master_node"])
622

    
623
  srun = ssh.SshRunner(cluster_name=cluster_name)
624

    
625
  # Make sure master node is at list end
626
  if master_node in nodes:
627
    nodes.remove(master_node)
628
    nodes.append(master_node)
629

    
630
  for (name, port) in zip(nodes, ports):
631
    result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
632

    
633
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
634
      # Do not output anything for successful commands
635
      continue
636

    
637
    ToStdout("------------------------------------------------")
638
    if opts.show_machine_names:
639
      for line in result.output.splitlines():
640
        ToStdout("%s: %s", name, line)
641
    else:
642
      ToStdout("node: %s", name)
643
      ToStdout("%s", result.output)
644
    ToStdout("return code = %s", result.exit_code)
645

    
646
  return 0
647

    
648

    
649
def VerifyCluster(opts, args):
650
  """Verify integrity of cluster, performing various test on nodes.
651

652
  @param opts: the command line options selected by the user
653
  @type args: list
654
  @param args: should be an empty list
655
  @rtype: int
656
  @return: the desired exit code
657

658
  """
659
  skip_checks = []
660

    
661
  if opts.skip_nplusone_mem:
662
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
663

    
664
  cl = GetClient()
665

    
666
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
667
                               error_codes=opts.error_codes,
668
                               debug_simulate_errors=opts.simulate_errors,
669
                               skip_checks=skip_checks,
670
                               ignore_errors=opts.ignore_errors,
671
                               group_name=opts.nodegroup)
672
  result = SubmitOpCode(op, cl=cl, opts=opts)
673

    
674
  # Keep track of submitted jobs
675
  jex = JobExecutor(cl=cl, opts=opts)
676

    
677
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
678
    jex.AddJobId(None, status, job_id)
679

    
680
  results = jex.GetResults()
681

    
682
  (bad_jobs, bad_results) = \
683
    map(len,
684
        # Convert iterators to lists
685
        map(list,
686
            # Count errors
687
            map(compat.partial(itertools.ifilterfalse, bool),
688
                # Convert result to booleans in a tuple
689
                zip(*((job_success, len(op_results) == 1 and op_results[0])
690
                      for (job_success, op_results) in results)))))
691

    
692
  if bad_jobs == 0 and bad_results == 0:
693
    rcode = constants.EXIT_SUCCESS
694
  else:
695
    rcode = constants.EXIT_FAILURE
696
    if bad_jobs > 0:
697
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
698

    
699
  return rcode
700

    
701

    
702
def VerifyDisks(opts, args):
703
  """Verify integrity of cluster disks.
704

705
  @param opts: the command line options selected by the user
706
  @type args: list
707
  @param args: should be an empty list
708
  @rtype: int
709
  @return: the desired exit code
710

711
  """
712
  cl = GetClient()
713

    
714
  op = opcodes.OpClusterVerifyDisks()
715

    
716
  result = SubmitOpCode(op, cl=cl, opts=opts)
717

    
718
  # Keep track of submitted jobs
719
  jex = JobExecutor(cl=cl, opts=opts)
720

    
721
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
722
    jex.AddJobId(None, status, job_id)
723

    
724
  retcode = constants.EXIT_SUCCESS
725

    
726
  for (status, result) in jex.GetResults():
727
    if not status:
728
      ToStdout("Job failed: %s", result)
729
      continue
730

    
731
    ((bad_nodes, instances, missing), ) = result
732

    
733
    for node, text in bad_nodes.items():
734
      ToStdout("Error gathering data on node %s: %s",
735
               node, utils.SafeEncode(text[-400:]))
736
      retcode = constants.EXIT_FAILURE
737
      ToStdout("You need to fix these nodes first before fixing instances")
738

    
739
    for iname in instances:
740
      if iname in missing:
741
        continue
742
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
743
      try:
744
        ToStdout("Activating disks for instance '%s'", iname)
745
        SubmitOpCode(op, opts=opts, cl=cl)
746
      except errors.GenericError, err:
747
        nret, msg = FormatError(err)
748
        retcode |= nret
749
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
750

    
751
    if missing:
752
      for iname, ival in missing.iteritems():
753
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
754
        if all_missing:
755
          ToStdout("Instance %s cannot be verified as it lives on"
756
                   " broken nodes", iname)
757
        else:
758
          ToStdout("Instance %s has missing logical volumes:", iname)
759
          ival.sort()
760
          for node, vol in ival:
761
            if node in bad_nodes:
762
              ToStdout("\tbroken node %s /dev/%s", node, vol)
763
            else:
764
              ToStdout("\t%s /dev/%s", node, vol)
765

    
766
      ToStdout("You need to replace or recreate disks for all the above"
767
               " instances if this message persists after fixing broken nodes.")
768
      retcode = constants.EXIT_FAILURE
769
    elif not instances:
770
      ToStdout("No disks need to be activated.")
771

    
772
  return retcode
773

    
774

    
775
def RepairDiskSizes(opts, args):
776
  """Verify sizes of cluster disks.
777

778
  @param opts: the command line options selected by the user
779
  @type args: list
780
  @param args: optional list of instances to restrict check to
781
  @rtype: int
782
  @return: the desired exit code
783

784
  """
785
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
786
  SubmitOpCode(op, opts=opts)
787

    
788

    
789
@UsesRPC
790
def MasterFailover(opts, args):
791
  """Failover the master node.
792

793
  This command, when run on a non-master node, will cause the current
794
  master to cease being master, and the non-master to become new
795
  master.
796

797
  @param opts: the command line options selected by the user
798
  @type args: list
799
  @param args: should be an empty list
800
  @rtype: int
801
  @return: the desired exit code
802

803
  """
804
  if opts.no_voting and not opts.yes_do_it:
805
    usertext = ("This will perform the failover even if most other nodes"
806
                " are down, or if this node is outdated. This is dangerous"
807
                " as it can lead to a non-consistent cluster. Check the"
808
                " gnt-cluster(8) man page before proceeding. Continue?")
809
    if not AskUser(usertext):
810
      return 1
811

    
812
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
813

    
814

    
815
def MasterPing(opts, args):
816
  """Checks if the master is alive.
817

818
  @param opts: the command line options selected by the user
819
  @type args: list
820
  @param args: should be an empty list
821
  @rtype: int
822
  @return: the desired exit code
823

824
  """
825
  try:
826
    cl = GetClient()
827
    cl.QueryClusterInfo()
828
    return 0
829
  except Exception: # pylint: disable=W0703
830
    return 1
831

    
832

    
833
def SearchTags(opts, args):
834
  """Searches the tags on all the cluster.
835

836
  @param opts: the command line options selected by the user
837
  @type args: list
838
  @param args: should contain only one element, the tag pattern
839
  @rtype: int
840
  @return: the desired exit code
841

842
  """
843
  op = opcodes.OpTagsSearch(pattern=args[0])
844
  result = SubmitOpCode(op, opts=opts)
845
  if not result:
846
    return 1
847
  result = list(result)
848
  result.sort()
849
  for path, tag in result:
850
    ToStdout("%s %s", path, tag)
851

    
852

    
853
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
854
  """Reads and verifies an X509 certificate.
855

856
  @type cert_filename: string
857
  @param cert_filename: the path of the file containing the certificate to
858
                        verify encoded in PEM format
859
  @type verify_private_key: bool
860
  @param verify_private_key: whether to verify the private key in addition to
861
                             the public certificate
862
  @rtype: string
863
  @return: a string containing the PEM-encoded certificate.
864

865
  """
866
  try:
867
    pem = utils.ReadFile(cert_filename)
868
  except IOError, err:
869
    raise errors.X509CertError(cert_filename,
870
                               "Unable to read certificate: %s" % str(err))
871

    
872
  try:
873
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
874
  except Exception, err:
875
    raise errors.X509CertError(cert_filename,
876
                               "Unable to load certificate: %s" % str(err))
877

    
878
  if verify_private_key:
879
    try:
880
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
881
    except Exception, err:
882
      raise errors.X509CertError(cert_filename,
883
                                 "Unable to load private key: %s" % str(err))
884

    
885
  return pem
886

    
887

    
888
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
889
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
890
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
891
                 cds_filename, force, new_node_cert):
892
  """Renews cluster certificates, keys and secrets.
893

894
  @type new_cluster_cert: bool
895
  @param new_cluster_cert: Whether to generate a new cluster certificate
896
  @type new_rapi_cert: bool
897
  @param new_rapi_cert: Whether to generate a new RAPI certificate
898
  @type rapi_cert_filename: string
899
  @param rapi_cert_filename: Path to file containing new RAPI certificate
900
  @type new_spice_cert: bool
901
  @param new_spice_cert: Whether to generate a new SPICE certificate
902
  @type spice_cert_filename: string
903
  @param spice_cert_filename: Path to file containing new SPICE certificate
904
  @type spice_cacert_filename: string
905
  @param spice_cacert_filename: Path to file containing the certificate of the
906
                                CA that signed the SPICE certificate
907
  @type new_confd_hmac_key: bool
908
  @param new_confd_hmac_key: Whether to generate a new HMAC key
909
  @type new_cds: bool
910
  @param new_cds: Whether to generate a new cluster domain secret
911
  @type cds_filename: string
912
  @param cds_filename: Path to file containing new cluster domain secret
913
  @type force: bool
914
  @param force: Whether to ask user for confirmation
915
  @type new_node_cert: string
916
  @param new_node_cert: Whether to generate new node certificates
917

918
  """
919
  if new_rapi_cert and rapi_cert_filename:
920
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
921
             " options can be specified at the same time.")
922
    return 1
923

    
924
  if new_cds and cds_filename:
925
    ToStderr("Only one of the --new-cluster-domain-secret and"
926
             " --cluster-domain-secret options can be specified at"
927
             " the same time.")
928
    return 1
929

    
930
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
931
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
932
             " and --spice-ca-certificate must not be used.")
933
    return 1
934

    
935
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
936
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
937
             " specified.")
938
    return 1
939

    
940
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
941
  try:
942
    if rapi_cert_filename:
943
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
944
    if spice_cert_filename:
945
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
946
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
947
  except errors.X509CertError, err:
948
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
949
    return 1
950

    
951
  if cds_filename:
952
    try:
953
      cds = utils.ReadFile(cds_filename)
954
    except Exception, err: # pylint: disable=W0703
955
      ToStderr("Can't load new cluster domain secret from %s: %s" %
956
               (cds_filename, str(err)))
957
      return 1
958
  else:
959
    cds = None
960

    
961
  if not force:
962
    usertext = ("This requires all daemons on all nodes to be restarted and"
963
                " may take some time. Continue?")
964
    if not AskUser(usertext):
965
      return 1
966

    
967
  def _RenewCryptoInner(ctx):
968
    ctx.feedback_fn("Updating certificates and keys")
969
    # Note: the node certificate will be generated in the LU
970
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
971
                                    new_rapi_cert,
972
                                    new_spice_cert,
973
                                    new_confd_hmac_key,
974
                                    new_cds,
975
                                    rapi_cert_pem=rapi_cert_pem,
976
                                    spice_cert_pem=spice_cert_pem,
977
                                    spice_cacert_pem=spice_cacert_pem,
978
                                    cds=cds)
979

    
980
    files_to_copy = []
981

    
982
    if new_cluster_cert:
983
      files_to_copy.append(pathutils.NODED_CERT_FILE)
984

    
985
    if new_rapi_cert or rapi_cert_pem:
986
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
987

    
988
    if new_spice_cert or spice_cert_pem:
989
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
990
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
991

    
992
    if new_confd_hmac_key:
993
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
994

    
995
    if new_cds or cds:
996
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
997

    
998
    if files_to_copy:
999
      for node_name in ctx.nonmaster_nodes:
1000
        port = ctx.ssh_ports[node_name]
1001
        ctx.feedback_fn("Copying %s to %s:%d" %
1002
                        (", ".join(files_to_copy), node_name, port))
1003
        for file_name in files_to_copy:
1004
          ctx.ssh.CopyFileToNode(node_name, port, file_name)
1005

    
1006
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1007

    
1008
  ToStdout("All requested certificates and keys have been replaced."
1009
           " Running \"gnt-cluster verify\" now is recommended.")
1010

    
1011
  if new_node_cert:
1012
    cl = GetClient()
1013
    renew_op = opcodes.OpClusterRenewCrypto()
1014
    SubmitOpCode(renew_op, cl=cl)
1015

    
1016
  return 0
1017

    
1018

    
1019
def RenewCrypto(opts, args):
1020
  """Renews cluster certificates, keys and secrets.
1021

1022
  """
1023
  return _RenewCrypto(opts.new_cluster_cert,
1024
                      opts.new_rapi_cert,
1025
                      opts.rapi_cert,
1026
                      opts.new_spice_cert,
1027
                      opts.spice_cert,
1028
                      opts.spice_cacert,
1029
                      opts.new_confd_hmac_key,
1030
                      opts.new_cluster_domain_secret,
1031
                      opts.cluster_domain_secret,
1032
                      opts.force,
1033
                      opts.new_node_cert)
1034

    
1035

    
1036
def _GetEnabledDiskTemplates(opts):
1037
  """Determine the list of enabled disk templates.
1038

1039
  """
1040
  if opts.enabled_disk_templates:
1041
    return opts.enabled_disk_templates.split(",")
1042
  else:
1043
    return None
1044

    
1045

    
1046
def _GetVgName(opts, enabled_disk_templates):
1047
  """Determine the volume group name.
1048

1049
  @type enabled_disk_templates: list of strings
1050
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1051

1052
  """
1053
  # consistency between vg name and enabled disk templates
1054
  vg_name = None
1055
  if opts.vg_name is not None:
1056
    vg_name = opts.vg_name
1057
  if enabled_disk_templates:
1058
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1059
      ToStdout("You specified a volume group with --vg-name, but you did not"
1060
               " enable any of the following lvm-based disk templates: %s" %
1061
               utils.CommaJoin(constants.DTS_LVM))
1062
  return vg_name
1063

    
1064

    
1065
def _GetDrbdHelper(opts, enabled_disk_templates):
1066
  """Determine the DRBD usermode helper.
1067

1068
  """
1069
  drbd_helper = opts.drbd_helper
1070
  if enabled_disk_templates:
1071
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1072
    if not drbd_enabled and opts.drbd_helper:
1073
      ToStdout("You specified a DRBD usermode helper with "
1074
               " --drbd-usermode-helper while DRBD is not enabled.")
1075
  return drbd_helper
1076

    
1077

    
1078
def SetClusterParams(opts, args):
1079
  """Modify the cluster.
1080

1081
  @param opts: the command line options selected by the user
1082
  @type args: list
1083
  @param args: should be an empty list
1084
  @rtype: int
1085
  @return: the desired exit code
1086

1087
  """
1088
  if not (opts.vg_name is not None or
1089
          opts.drbd_helper is not None or
1090
          opts.enabled_hypervisors or opts.hvparams or
1091
          opts.beparams or opts.nicparams or
1092
          opts.ndparams or opts.diskparams or
1093
          opts.candidate_pool_size is not None or
1094
          opts.max_running_jobs is not None or
1095
          opts.uid_pool is not None or
1096
          opts.maintain_node_health is not None or
1097
          opts.add_uids is not None or
1098
          opts.remove_uids is not None or
1099
          opts.default_iallocator is not None or
1100
          opts.default_iallocator_params or
1101
          opts.reserved_lvs is not None or
1102
          opts.master_netdev is not None or
1103
          opts.master_netmask is not None or
1104
          opts.use_external_mip_script is not None or
1105
          opts.prealloc_wipe_disks is not None or
1106
          opts.hv_state or
1107
          opts.enabled_disk_templates or
1108
          opts.disk_state or
1109
          opts.ipolicy_bounds_specs is not None or
1110
          opts.ipolicy_std_specs is not None or
1111
          opts.ipolicy_disk_templates is not None or
1112
          opts.ipolicy_vcpu_ratio is not None or
1113
          opts.ipolicy_spindle_ratio is not None or
1114
          opts.modify_etc_hosts is not None or
1115
          opts.file_storage_dir is not None):
1116
    ToStderr("Please give at least one of the parameters.")
1117
    return 1
1118

    
1119
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1120
  vg_name = _GetVgName(opts, enabled_disk_templates)
1121

    
1122
  try:
1123
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1124
  except errors.OpPrereqError, e:
1125
    ToStderr(str(e))
1126
    return 1
1127

    
1128
  hvlist = opts.enabled_hypervisors
1129
  if hvlist is not None:
1130
    hvlist = hvlist.split(",")
1131

    
1132
  # a list of (name, dict) we can pass directly to dict() (or [])
1133
  hvparams = dict(opts.hvparams)
1134
  for hv_params in hvparams.values():
1135
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1136

    
1137
  diskparams = dict(opts.diskparams)
1138

    
1139
  for dt_params in diskparams.values():
1140
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1141

    
1142
  beparams = opts.beparams
1143
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1144

    
1145
  nicparams = opts.nicparams
1146
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1147

    
1148
  ndparams = opts.ndparams
1149
  if ndparams is not None:
1150
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1151

    
1152
  ipolicy = CreateIPolicyFromOpts(
1153
    minmax_ispecs=opts.ipolicy_bounds_specs,
1154
    std_ispecs=opts.ipolicy_std_specs,
1155
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1156
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1157
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1158
    )
1159

    
1160
  mnh = opts.maintain_node_health
1161

    
1162
  uid_pool = opts.uid_pool
1163
  if uid_pool is not None:
1164
    uid_pool = uidpool.ParseUidPool(uid_pool)
1165

    
1166
  add_uids = opts.add_uids
1167
  if add_uids is not None:
1168
    add_uids = uidpool.ParseUidPool(add_uids)
1169

    
1170
  remove_uids = opts.remove_uids
1171
  if remove_uids is not None:
1172
    remove_uids = uidpool.ParseUidPool(remove_uids)
1173

    
1174
  if opts.reserved_lvs is not None:
1175
    if opts.reserved_lvs == "":
1176
      opts.reserved_lvs = []
1177
    else:
1178
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1179

    
1180
  if opts.master_netmask is not None:
1181
    try:
1182
      opts.master_netmask = int(opts.master_netmask)
1183
    except ValueError:
1184
      ToStderr("The --master-netmask option expects an int parameter.")
1185
      return 1
1186

    
1187
  ext_ip_script = opts.use_external_mip_script
1188

    
1189
  if opts.disk_state:
1190
    disk_state = utils.FlatToDict(opts.disk_state)
1191
  else:
1192
    disk_state = {}
1193

    
1194
  hv_state = dict(opts.hv_state)
1195

    
1196
  op = opcodes.OpClusterSetParams(
1197
    vg_name=vg_name,
1198
    drbd_helper=drbd_helper,
1199
    enabled_hypervisors=hvlist,
1200
    hvparams=hvparams,
1201
    os_hvp=None,
1202
    beparams=beparams,
1203
    nicparams=nicparams,
1204
    ndparams=ndparams,
1205
    diskparams=diskparams,
1206
    ipolicy=ipolicy,
1207
    candidate_pool_size=opts.candidate_pool_size,
1208
    max_running_jobs=opts.max_running_jobs,
1209
    maintain_node_health=mnh,
1210
    modify_etc_hosts=opts.modify_etc_hosts,
1211
    uid_pool=uid_pool,
1212
    add_uids=add_uids,
1213
    remove_uids=remove_uids,
1214
    default_iallocator=opts.default_iallocator,
1215
    default_iallocator_params=opts.default_iallocator_params,
1216
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1217
    master_netdev=opts.master_netdev,
1218
    master_netmask=opts.master_netmask,
1219
    reserved_lvs=opts.reserved_lvs,
1220
    use_external_mip_script=ext_ip_script,
1221
    hv_state=hv_state,
1222
    disk_state=disk_state,
1223
    enabled_disk_templates=enabled_disk_templates,
1224
    force=opts.force,
1225
    file_storage_dir=opts.file_storage_dir,
1226
    )
1227
  SubmitOrSend(op, opts)
1228
  return 0
1229

    
1230

    
1231
def QueueOps(opts, args):
1232
  """Queue operations.
1233

1234
  @param opts: the command line options selected by the user
1235
  @type args: list
1236
  @param args: should contain only one element, the subcommand
1237
  @rtype: int
1238
  @return: the desired exit code
1239

1240
  """
1241
  command = args[0]
1242
  client = GetClient()
1243
  if command in ("drain", "undrain"):
1244
    drain_flag = command == "drain"
1245
    client.SetQueueDrainFlag(drain_flag)
1246
  elif command == "info":
1247
    result = client.QueryConfigValues(["drain_flag"])
1248
    if result[0]:
1249
      val = "set"
1250
    else:
1251
      val = "unset"
1252
    ToStdout("The drain flag is %s" % val)
1253
  else:
1254
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1255
                               errors.ECODE_INVAL)
1256

    
1257
  return 0
1258

    
1259

    
1260
def _ShowWatcherPause(until):
1261
  if until is None or until < time.time():
1262
    ToStdout("The watcher is not paused.")
1263
  else:
1264
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1265

    
1266

    
1267
def WatcherOps(opts, args):
1268
  """Watcher operations.
1269

1270
  @param opts: the command line options selected by the user
1271
  @type args: list
1272
  @param args: should contain only one element, the subcommand
1273
  @rtype: int
1274
  @return: the desired exit code
1275

1276
  """
1277
  command = args[0]
1278
  client = GetClient()
1279

    
1280
  if command == "continue":
1281
    client.SetWatcherPause(None)
1282
    ToStdout("The watcher is no longer paused.")
1283

    
1284
  elif command == "pause":
1285
    if len(args) < 2:
1286
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1287

    
1288
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1289
    _ShowWatcherPause(result)
1290

    
1291
  elif command == "info":
1292
    result = client.QueryConfigValues(["watcher_pause"])
1293
    _ShowWatcherPause(result[0])
1294

    
1295
  else:
1296
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1297
                               errors.ECODE_INVAL)
1298

    
1299
  return 0
1300

    
1301

    
1302
def _OobPower(opts, node_list, power):
1303
  """Puts the node in the list to desired power state.
1304

1305
  @param opts: The command line options selected by the user
1306
  @param node_list: The list of nodes to operate on
1307
  @param power: True if they should be powered on, False otherwise
1308
  @return: The success of the operation (none failed)
1309

1310
  """
1311
  if power:
1312
    command = constants.OOB_POWER_ON
1313
  else:
1314
    command = constants.OOB_POWER_OFF
1315

    
1316
  op = opcodes.OpOobCommand(node_names=node_list,
1317
                            command=command,
1318
                            ignore_status=True,
1319
                            timeout=opts.oob_timeout,
1320
                            power_delay=opts.power_delay)
1321
  result = SubmitOpCode(op, opts=opts)
1322
  errs = 0
1323
  for node_result in result:
1324
    (node_tuple, data_tuple) = node_result
1325
    (_, node_name) = node_tuple
1326
    (data_status, _) = data_tuple
1327
    if data_status != constants.RS_NORMAL:
1328
      assert data_status != constants.RS_UNAVAIL
1329
      errs += 1
1330
      ToStderr("There was a problem changing power for %s, please investigate",
1331
               node_name)
1332

    
1333
  if errs > 0:
1334
    return False
1335

    
1336
  return True
1337

    
1338

    
1339
def _InstanceStart(opts, inst_list, start, no_remember=False):
1340
  """Puts the instances in the list to desired state.
1341

1342
  @param opts: The command line options selected by the user
1343
  @param inst_list: The list of instances to operate on
1344
  @param start: True if they should be started, False for shutdown
1345
  @param no_remember: If the instance state should be remembered
1346
  @return: The success of the operation (none failed)
1347

1348
  """
1349
  if start:
1350
    opcls = opcodes.OpInstanceStartup
1351
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1352
  else:
1353
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1354
                           timeout=opts.shutdown_timeout,
1355
                           no_remember=no_remember)
1356
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1357

    
1358
  jex = JobExecutor(opts=opts)
1359

    
1360
  for inst in inst_list:
1361
    ToStdout("Submit %s of instance %s", text_submit, inst)
1362
    op = opcls(instance_name=inst)
1363
    jex.QueueJob(inst, op)
1364

    
1365
  results = jex.GetResults()
1366
  bad_cnt = len([1 for (success, _) in results if not success])
1367

    
1368
  if bad_cnt == 0:
1369
    ToStdout("All instances have been %s successfully", text_success)
1370
  else:
1371
    ToStderr("There were errors while %s instances:\n"
1372
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1373
             len(results))
1374
    return False
1375

    
1376
  return True
1377

    
1378

    
1379
class _RunWhenNodesReachableHelper:
1380
  """Helper class to make shared internal state sharing easier.
1381

1382
  @ivar success: Indicates if all action_cb calls were successful
1383

1384
  """
1385
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1386
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1387
    """Init the object.
1388

1389
    @param node_list: The list of nodes to be reachable
1390
    @param action_cb: Callback called when a new host is reachable
1391
    @type node2ip: dict
1392
    @param node2ip: Node to ip mapping
1393
    @param port: The port to use for the TCP ping
1394
    @param feedback_fn: The function used for feedback
1395
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1396
    @param _sleep_fn: Function to sleep (for unittest use only)
1397

1398
    """
1399
    self.down = set(node_list)
1400
    self.up = set()
1401
    self.node2ip = node2ip
1402
    self.success = True
1403
    self.action_cb = action_cb
1404
    self.port = port
1405
    self.feedback_fn = feedback_fn
1406
    self._ping_fn = _ping_fn
1407
    self._sleep_fn = _sleep_fn
1408

    
1409
  def __call__(self):
1410
    """When called we run action_cb.
1411

1412
    @raises utils.RetryAgain: When there are still down nodes
1413

1414
    """
1415
    if not self.action_cb(self.up):
1416
      self.success = False
1417

    
1418
    if self.down:
1419
      raise utils.RetryAgain()
1420
    else:
1421
      return self.success
1422

    
1423
  def Wait(self, secs):
1424
    """Checks if a host is up or waits remaining seconds.
1425

1426
    @param secs: The secs remaining
1427

1428
    """
1429
    start = time.time()
1430
    for node in self.down:
1431
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1432
                       live_port_needed=True):
1433
        self.feedback_fn("Node %s became available" % node)
1434
        self.up.add(node)
1435
        self.down -= self.up
1436
        # If we have a node available there is the possibility to run the
1437
        # action callback successfully, therefore we don't wait and return
1438
        return
1439

    
1440
    self._sleep_fn(max(0.0, start + secs - time.time()))
1441

    
1442

    
1443
def _RunWhenNodesReachable(node_list, action_cb, interval):
1444
  """Run action_cb when nodes become reachable.
1445

1446
  @param node_list: The list of nodes to be reachable
1447
  @param action_cb: Callback called when a new host is reachable
1448
  @param interval: The earliest time to retry
1449

1450
  """
1451
  client = GetClient()
1452
  cluster_info = client.QueryClusterInfo()
1453
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1454
    family = netutils.IPAddress.family
1455
  else:
1456
    family = netutils.IP6Address.family
1457

    
1458
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1459
                 for node in node_list)
1460

    
1461
  port = netutils.GetDaemonPort(constants.NODED)
1462
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1463
                                        ToStdout)
1464

    
1465
  try:
1466
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1467
                       wait_fn=helper.Wait)
1468
  except utils.RetryTimeout:
1469
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1470
             " again:\n  - %s", "  - ".join(helper.down))
1471
    return False
1472

    
1473

    
1474
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1475
                          _instance_start_fn=_InstanceStart):
1476
  """Start the instances conditional based on node_states.
1477

1478
  @param opts: The command line options selected by the user
1479
  @param inst_map: A dict of inst -> nodes mapping
1480
  @param nodes_online: A list of nodes online
1481
  @param _instance_start_fn: Callback to start instances (unittest use only)
1482
  @return: Success of the operation on all instances
1483

1484
  """
1485
  start_inst_list = []
1486
  for (inst, nodes) in inst_map.items():
1487
    if not (nodes - nodes_online):
1488
      # All nodes the instance lives on are back online
1489
      start_inst_list.append(inst)
1490

    
1491
  for inst in start_inst_list:
1492
    del inst_map[inst]
1493

    
1494
  if start_inst_list:
1495
    return _instance_start_fn(opts, start_inst_list, True)
1496

    
1497
  return True
1498

    
1499

    
1500
def _EpoOn(opts, full_node_list, node_list, inst_map):
1501
  """Does the actual power on.
1502

1503
  @param opts: The command line options selected by the user
1504
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1505
                         OOB)
1506
  @param node_list: The list of nodes to operate on (all need to support OOB)
1507
  @param inst_map: A dict of inst -> nodes mapping
1508
  @return: The desired exit status
1509

1510
  """
1511
  if node_list and not _OobPower(opts, node_list, False):
1512
    ToStderr("Not all nodes seem to get back up, investigate and start"
1513
             " manually if needed")
1514

    
1515
  # Wait for the nodes to be back up
1516
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1517

    
1518
  ToStdout("Waiting until all nodes are available again")
1519
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1520
    ToStderr("Please investigate and start stopped instances manually")
1521
    return constants.EXIT_FAILURE
1522

    
1523
  return constants.EXIT_SUCCESS
1524

    
1525

    
1526
def _EpoOff(opts, node_list, inst_map):
1527
  """Does the actual power off.
1528

1529
  @param opts: The command line options selected by the user
1530
  @param node_list: The list of nodes to operate on (all need to support OOB)
1531
  @param inst_map: A dict of inst -> nodes mapping
1532
  @return: The desired exit status
1533

1534
  """
1535
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1536
    ToStderr("Please investigate and stop instances manually before continuing")
1537
    return constants.EXIT_FAILURE
1538

    
1539
  if not node_list:
1540
    return constants.EXIT_SUCCESS
1541

    
1542
  if _OobPower(opts, node_list, False):
1543
    return constants.EXIT_SUCCESS
1544
  else:
1545
    return constants.EXIT_FAILURE
1546

    
1547

    
1548
def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1549
        _confirm_fn=ConfirmOperation,
1550
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1551
  """EPO operations.
1552

1553
  @param opts: the command line options selected by the user
1554
  @type args: list
1555
  @param args: should contain only one element, the subcommand
1556
  @rtype: int
1557
  @return: the desired exit code
1558

1559
  """
1560
  if opts.groups and opts.show_all:
1561
    _stderr_fn("Only one of --groups or --all are allowed")
1562
    return constants.EXIT_FAILURE
1563
  elif args and opts.show_all:
1564
    _stderr_fn("Arguments in combination with --all are not allowed")
1565
    return constants.EXIT_FAILURE
1566

    
1567
  if qcl is None:
1568
    # Query client
1569
    qcl = GetClient(query=True)
1570

    
1571
  if opts.groups:
1572
    node_query_list = \
1573
      itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1574
  else:
1575
    node_query_list = args
1576

    
1577
  result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1578
                                            "sinst_list", "powered", "offline"],
1579
                          False)
1580

    
1581
  all_nodes = map(compat.fst, result)
1582
  node_list = []
1583
  inst_map = {}
1584
  for (node, master, pinsts, sinsts, powered, offline) in result:
1585
    if not offline:
1586
      for inst in (pinsts + sinsts):
1587
        if inst in inst_map:
1588
          if not master:
1589
            inst_map[inst].add(node)
1590
        elif master:
1591
          inst_map[inst] = set()
1592
        else:
1593
          inst_map[inst] = set([node])
1594

    
1595
    if master and opts.on:
1596
      # We ignore the master for turning on the machines, in fact we are
1597
      # already operating on the master at this point :)
1598
      continue
1599
    elif master and not opts.show_all:
1600
      _stderr_fn("%s is the master node, please do a master-failover to another"
1601
                 " node not affected by the EPO or use --all if you intend to"
1602
                 " shutdown the whole cluster", node)
1603
      return constants.EXIT_FAILURE
1604
    elif powered is None:
1605
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1606
                 " handled in a fully automated manner", node)
1607
    elif powered == opts.on:
1608
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1609
    elif not offline or (offline and powered):
1610
      node_list.append(node)
1611

    
1612
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1613
    return constants.EXIT_FAILURE
1614

    
1615
  if opts.on:
1616
    return _on_fn(opts, all_nodes, node_list, inst_map)
1617
  else:
1618
    return _off_fn(opts, node_list, inst_map)
1619

    
1620

    
1621
def _GetCreateCommand(info):
1622
  buf = StringIO()
1623
  buf.write("gnt-cluster init")
1624
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1625
  buf.write(" ")
1626
  buf.write(info["name"])
1627
  return buf.getvalue()
1628

    
1629

    
1630
def ShowCreateCommand(opts, args):
1631
  """Shows the command that can be used to re-create the cluster.
1632

1633
  Currently it works only for ipolicy specs.
1634

1635
  """
1636
  cl = GetClient(query=True)
1637
  result = cl.QueryClusterInfo()
1638
  ToStdout(_GetCreateCommand(result))
1639

    
1640

    
1641
def _RunCommandAndReport(cmd):
1642
  """Run a command and report its output, iff it failed.
1643

1644
  @param cmd: the command to execute
1645
  @type cmd: list
1646
  @rtype: bool
1647
  @return: False, if the execution failed.
1648

1649
  """
1650
  result = utils.RunCmd(cmd)
1651
  if result.failed:
1652
    ToStderr("Command %s failed: %s; Output %s" %
1653
             (cmd, result.fail_reason, result.output))
1654
    return False
1655
  return True
1656

    
1657

    
1658
def _VerifyCommand(cmd):
1659
  """Verify that a given command succeeds on all online nodes.
1660

1661
  As this function is intended to run during upgrades, it
1662
  is implemented in such a way that it still works, if all Ganeti
1663
  daemons are down.
1664

1665
  @param cmd: the command to execute
1666
  @type cmd: list
1667
  @rtype: list
1668
  @return: the list of node names that are online where
1669
      the command failed.
1670

1671
  """
1672
  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1673

    
1674
  nodes = ssconf.SimpleStore().GetOnlineNodeList()
1675
  master_node = ssconf.SimpleStore().GetMasterNode()
1676
  cluster_name = ssconf.SimpleStore().GetClusterName()
1677

    
1678
  # If master node is in 'nodes', make sure master node is at list end
1679
  if master_node in nodes:
1680
    nodes.remove(master_node)
1681
    nodes.append(master_node)
1682

    
1683
  failed = []
1684

    
1685
  srun = ssh.SshRunner(cluster_name=cluster_name)
1686
  for name in nodes:
1687
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1688
    if result.exit_code != 0:
1689
      failed.append(name)
1690

    
1691
  return failed
1692

    
1693

    
1694
def _VerifyVersionInstalled(versionstring):
1695
  """Verify that the given version of ganeti is installed on all online nodes.
1696

1697
  Do nothing, if this is the case, otherwise print an appropriate
1698
  message to stderr.
1699

1700
  @param versionstring: the version to check for
1701
  @type versionstring: string
1702
  @rtype: bool
1703
  @return: True, if the version is installed on all online nodes
1704

1705
  """
1706
  badnodes = _VerifyCommand(["test", "-d",
1707
                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
1708
  if badnodes:
1709
    ToStderr("Ganeti version %s not installed on nodes %s"
1710
             % (versionstring, ", ".join(badnodes)))
1711
    return False
1712

    
1713
  return True
1714

    
1715

    
1716
def _GetRunning():
1717
  """Determine the list of running jobs.
1718

1719
  @rtype: list
1720
  @return: the number of jobs still running
1721

1722
  """
1723
  cl = GetClient()
1724
  qfilter = qlang.MakeSimpleFilter("status",
1725
                                   frozenset([constants.JOB_STATUS_RUNNING]))
1726
  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1727

    
1728

    
1729
def _SetGanetiVersion(versionstring):
1730
  """Set the active version of ganeti to the given versionstring
1731

1732
  @type versionstring: string
1733
  @rtype: list
1734
  @return: the list of nodes where the version change failed
1735

1736
  """
1737
  failed = []
1738
  if constants.HAS_GNU_LN:
1739
    failed.extend(_VerifyCommand(
1740
        ["ln", "-s", "-f", "-T",
1741
         os.path.join(pathutils.PKGLIBDIR, versionstring),
1742
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1743
    failed.extend(_VerifyCommand(
1744
        ["ln", "-s", "-f", "-T",
1745
         os.path.join(pathutils.SHAREDIR, versionstring),
1746
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1747
  else:
1748
    failed.extend(_VerifyCommand(
1749
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1750
    failed.extend(_VerifyCommand(
1751
        ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1752
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1753
    failed.extend(_VerifyCommand(
1754
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1755
    failed.extend(_VerifyCommand(
1756
        ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1757
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1758
  return list(set(failed))
1759

    
1760

    
1761
def _ExecuteCommands(fns):
1762
  """Execute a list of functions, in reverse order.
1763

1764
  @type fns: list of functions.
1765
  @param fns: the functions to be executed.
1766

1767
  """
1768
  for fn in reversed(fns):
1769
    fn()
1770

    
1771

    
1772
def _GetConfigVersion():
1773
  """Determine the version the configuration file currently has.
1774

1775
  @rtype: tuple or None
1776
  @return: (major, minor, revision) if the version can be determined,
1777
      None otherwise
1778

1779
  """
1780
  config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1781
  try:
1782
    config_version = config_data["version"]
1783
  except KeyError:
1784
    return None
1785
  return utils.SplitVersion(config_version)
1786

    
1787

    
1788
def _ReadIntentToUpgrade():
1789
  """Read the file documenting the intent to upgrade the cluster.
1790

1791
  @rtype: (string, string) or (None, None)
1792
  @return: (old version, version to upgrade to), if the file exists,
1793
      and (None, None) otherwise.
1794

1795
  """
1796
  if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1797
    return (None, None)
1798

    
1799
  contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1800
  contents = utils.UnescapeAndSplit(contentstring)
1801
  if len(contents) != 3:
1802
    # file syntactically mal-formed
1803
    return (None, None)
1804
  return (contents[0], contents[1])
1805

    
1806

    
1807
def _WriteIntentToUpgrade(version):
1808
  """Write file documenting the intent to upgrade the cluster.
1809

1810
  @type version: string
1811
  @param version: the version we intent to upgrade to
1812

1813
  """
1814
  utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1815
                  data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1816
                                            "%d" % os.getpid()]))
1817

    
1818

    
1819
def _UpgradeBeforeConfigurationChange(versionstring):
1820
  """
1821
  Carry out all the tasks necessary for an upgrade that happen before
1822
  the configuration file, or Ganeti version, changes.
1823

1824
  @type versionstring: string
1825
  @param versionstring: the version to upgrade to
1826
  @rtype: (bool, list)
1827
  @return: tuple of a bool indicating success and a list of rollback tasks
1828

1829
  """
1830
  rollback = []
1831

    
1832
  if not _VerifyVersionInstalled(versionstring):
1833
    return (False, rollback)
1834

    
1835
  _WriteIntentToUpgrade(versionstring)
1836
  rollback.append(
1837
    lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1838

    
1839
  ToStdout("Draining queue")
1840
  client = GetClient()
1841
  client.SetQueueDrainFlag(True)
1842

    
1843
  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1844

    
1845
  if utils.SimpleRetry(0, _GetRunning,
1846
                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
1847
                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1848
    ToStderr("Failed to completely empty the queue.")
1849
    return (False, rollback)
1850

    
1851
  ToStdout("Stopping daemons on master node.")
1852
  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1853
    return (False, rollback)
1854

    
1855
  if not _VerifyVersionInstalled(versionstring):
1856
    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1857
    return (False, rollback)
1858

    
1859
  ToStdout("Stopping daemons everywhere.")
1860
  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1861
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1862
  if badnodes:
1863
    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
1864
    return (False, rollback)
1865

    
1866
  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
1867
                           "lib/ganeti%d.tar" % time.time())
1868
  ToStdout("Backing up configuration as %s" % backuptar)
1869
  if not _RunCommandAndReport(["tar", "cf", backuptar,
1870
                               pathutils.DATA_DIR]):
1871
    return (False, rollback)
1872

    
1873
  return (True, rollback)
1874

    
1875

    
1876
def _VersionSpecificDowngrade():
1877
  """
1878
  Perform any additional downrade tasks that are version specific
1879
  and need to be done just after the configuration downgrade. This
1880
  function needs to be idempotent, so that it can be redone if the
1881
  downgrade procedure gets interrupted after changing the
1882
  configuration.
1883

1884
  Note that this function has to be reset with every version bump.
1885

1886
  @return: True upon success
1887
  """
1888
  ToStdout("Performing version-specific downgrade tasks.")
1889

    
1890
  ToStdout("...removing client certificates ssconf file")
1891
  ssconffile = ssconf.SimpleStore().KeyToFilename(
1892
    constants.SS_MASTER_CANDIDATES_CERTS)
1893
  badnodes = _VerifyCommand(["rm", "-f", ssconffile])
1894
  if badnodes:
1895
    ToStderr("Warning: failed to clean up ssconf on %s."
1896
             % (", ".join(badnodes),))
1897
    return False
1898

    
1899
  ToStdout("...removing client certificates")
1900
  badnodes = _VerifyCommand(["rm", "-f", pathutils.NODED_CLIENT_CERT_FILE])
1901
  if badnodes:
1902
    ToStderr("Warning: failed to clean up certificates on %s."
1903
             % (", ".join(badnodes),))
1904
    return False
1905

    
1906
  return True
1907

    
1908

    
1909
def _SwitchVersionAndConfig(versionstring, downgrade):
1910
  """
1911
  Switch to the new Ganeti version and change the configuration,
1912
  in correct order.
1913

1914
  @type versionstring: string
1915
  @param versionstring: the version to change to
1916
  @type downgrade: bool
1917
  @param downgrade: True, if the configuration should be downgraded
1918
  @rtype: (bool, list)
1919
  @return: tupe of a bool indicating success, and a list of
1920
      additional rollback tasks
1921

1922
  """
1923
  rollback = []
1924
  if downgrade:
1925
    ToStdout("Downgrading configuration")
1926
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
1927
      return (False, rollback)
1928
    # Note: version specific downgrades need to be done before switching
1929
    # binaries, so that we still have the knowledgeable binary if the downgrade
1930
    # process gets interrupted at this point.
1931
    if not _VersionSpecificDowngrade():
1932
      return (False, rollback)
1933

    
1934
  # Configuration change is the point of no return. From then onwards, it is
1935
  # safer to push through the up/dowgrade than to try to roll it back.
1936

    
1937
  ToStdout("Switching to version %s on all nodes" % versionstring)
1938
  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
1939
  badnodes = _SetGanetiVersion(versionstring)
1940
  if badnodes:
1941
    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
1942
             % (versionstring, ", ".join(badnodes)))
1943
    if not downgrade:
1944
      return (False, rollback)
1945

    
1946
  # Now that we have changed to the new version of Ganeti we should
1947
  # not communicate over luxi any more, as luxi might have changed in
1948
  # incompatible ways. Therefore, manually call the corresponding ganeti
1949
  # commands using their canonical (version independent) path.
1950

    
1951
  if not downgrade:
1952
    ToStdout("Upgrading configuration")
1953
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
1954
      return (False, rollback)
1955

    
1956
  return (True, rollback)
1957

    
1958

    
1959
def _UpgradeAfterConfigurationChange(oldversion):
1960
  """
1961
  Carry out the upgrade actions necessary after switching to the new
1962
  Ganeti version and updating the configuration.
1963

1964
  As this part is run at a time where the new version of Ganeti is already
1965
  running, no communication should happen via luxi, as this is not a stable
1966
  interface. Also, as the configuration change is the point of no return,
1967
  all actions are pushed trough, even if some of them fail.
1968

1969
  @param oldversion: the version the upgrade started from
1970
  @type oldversion: string
1971
  @rtype: int
1972
  @return: the intended return value
1973

1974
  """
1975
  returnvalue = 0
1976

    
1977
  ToStdout("Starting daemons everywhere.")
1978
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1979
  if badnodes:
1980
    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
1981
    returnvalue = 1
1982

    
1983
  ToStdout("Ensuring directories everywhere.")
1984
  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
1985
  if badnodes:
1986
    ToStderr("Warning: failed to ensure directories on %s." %
1987
             (", ".join(badnodes)))
1988
    returnvalue = 1
1989

    
1990
  ToStdout("Redistributing the configuration.")
1991
  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
1992
    returnvalue = 1
1993

    
1994
  ToStdout("Restarting daemons everywhere.")
1995
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1996
  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1997
  if badnodes:
1998
    ToStderr("Warning: failed to start daemons on %s." %
1999
             (", ".join(list(set(badnodes))),))
2000
    returnvalue = 1
2001

    
2002
  ToStdout("Undraining the queue.")
2003
  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
2004
    returnvalue = 1
2005

    
2006
  _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
2007

    
2008
  ToStdout("Running post-upgrade hooks")
2009
  if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
2010
    returnvalue = 1
2011

    
2012
  ToStdout("Verifying cluster.")
2013
  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2014
    returnvalue = 1
2015

    
2016
  return returnvalue
2017

    
2018

    
2019
def UpgradeGanetiCommand(opts, args):
2020
  """Upgrade a cluster to a new ganeti version.
2021

2022
  @param opts: the command line options selected by the user
2023
  @type args: list
2024
  @param args: should be an empty list
2025
  @rtype: int
2026
  @return: the desired exit code
2027

2028
  """
2029
  if ((not opts.resume and opts.to is None)
2030
      or (opts.resume and opts.to is not None)):
2031
    ToStderr("Precisely one of the options --to and --resume"
2032
             " has to be given")
2033
    return 1
2034

    
2035
  oldversion = constants.RELEASE_VERSION
2036

    
2037
  if opts.resume:
2038
    ssconf.CheckMaster(False)
2039
    oldversion, versionstring = _ReadIntentToUpgrade()
2040
    if versionstring is None:
2041
      return 0
2042
    version = utils.version.ParseVersion(versionstring)
2043
    if version is None:
2044
      return 1
2045
    configversion = _GetConfigVersion()
2046
    if configversion is None:
2047
      return 1
2048
    # If the upgrade we resume was an upgrade between compatible
2049
    # versions (like 2.10.0 to 2.10.1), the correct configversion
2050
    # does not guarantee that the config has been updated.
2051
    # However, in the case of a compatible update with the configuration
2052
    # not touched, we are running a different dirversion with the same
2053
    # config version.
2054
    config_already_modified = \
2055
      (utils.IsCorrectConfigVersion(version, configversion) and
2056
       not (versionstring != constants.DIR_VERSION and
2057
            configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2058
                              constants.CONFIG_REVISION)))
2059
    if not config_already_modified:
2060
      # We have to start from the beginning; however, some daemons might have
2061
      # already been stopped, so the only way to get into a well-defined state
2062
      # is by starting all daemons again.
2063
      _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2064
  else:
2065
    versionstring = opts.to
2066
    config_already_modified = False
2067
    version = utils.version.ParseVersion(versionstring)
2068
    if version is None:
2069
      ToStderr("Could not parse version string %s" % versionstring)
2070
      return 1
2071

    
2072
  msg = utils.version.UpgradeRange(version)
2073
  if msg is not None:
2074
    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2075
    return 1
2076

    
2077
  if not config_already_modified:
2078
    success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2079
    if not success:
2080
      _ExecuteCommands(rollback)
2081
      return 1
2082
  else:
2083
    rollback = []
2084

    
2085
  downgrade = utils.version.ShouldCfgdowngrade(version)
2086

    
2087
  success, additionalrollback =  \
2088
      _SwitchVersionAndConfig(versionstring, downgrade)
2089
  if not success:
2090
    rollback.extend(additionalrollback)
2091
    _ExecuteCommands(rollback)
2092
    return 1
2093

    
2094
  return _UpgradeAfterConfigurationChange(oldversion)
2095

    
2096

    
2097
commands = {
2098
  "init": (
2099
    InitCluster, [ArgHost(min=1, max=1)],
2100
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2101
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2102
     NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2103
     SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2104
     DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2105
     PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2106
     GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2107
     HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2108
     IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT]
2109
     + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2110
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2111
  "destroy": (
2112
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2113
    "", "Destroy cluster"),
2114
  "rename": (
2115
    RenameCluster, [ArgHost(min=1, max=1)],
2116
    [FORCE_OPT, DRY_RUN_OPT],
2117
    "<new_name>",
2118
    "Renames the cluster"),
2119
  "redist-conf": (
2120
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2121
    [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2122
    "", "Forces a push of the configuration file and ssconf files"
2123
    " to the nodes in the cluster"),
2124
  "verify": (
2125
    VerifyCluster, ARGS_NONE,
2126
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2127
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2128
    "", "Does a check on the cluster configuration"),
2129
  "verify-disks": (
2130
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2131
    "", "Does a check on the cluster disk status"),
2132
  "repair-disk-sizes": (
2133
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2134
    "[instance...]", "Updates mismatches in recorded disk sizes"),
2135
  "master-failover": (
2136
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2137
    "", "Makes the current node the master"),
2138
  "master-ping": (
2139
    MasterPing, ARGS_NONE, [],
2140
    "", "Checks if the master is alive"),
2141
  "version": (
2142
    ShowClusterVersion, ARGS_NONE, [],
2143
    "", "Shows the cluster version"),
2144
  "getmaster": (
2145
    ShowClusterMaster, ARGS_NONE, [],
2146
    "", "Shows the cluster master"),
2147
  "copyfile": (
2148
    ClusterCopyFile, [ArgFile(min=1, max=1)],
2149
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2150
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2151
  "command": (
2152
    RunClusterCommand, [ArgCommand(min=1)],
2153
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2154
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2155
  "info": (
2156
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2157
    "[--roman]", "Show cluster configuration"),
2158
  "list-tags": (
2159
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2160
  "add-tags": (
2161
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2162
    "tag...", "Add tags to the cluster"),
2163
  "remove-tags": (
2164
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2165
    "tag...", "Remove tags from the cluster"),
2166
  "search-tags": (
2167
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2168
    "Searches the tags on all objects on"
2169
    " the cluster for a given pattern (regex)"),
2170
  "queue": (
2171
    QueueOps,
2172
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2173
    [], "drain|undrain|info", "Change queue properties"),
2174
  "watcher": (
2175
    WatcherOps,
2176
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2177
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2178
    [],
2179
    "{pause <timespec>|continue|info}", "Change watcher properties"),
2180
  "modify": (
2181
    SetClusterParams, ARGS_NONE,
2182
    [FORCE_OPT,
2183
     BACKEND_OPT, CP_SIZE_OPT, RQL_OPT,
2184
     ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
2185
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT,
2186
     UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT, DRBD_HELPER_OPT,
2187
     DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT,
2188
     DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2189
     USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] +
2190
     SUBMIT_OPTS +
2191
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
2192
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
2193
    "[opts...]",
2194
    "Alters the parameters of the cluster"),
2195
  "renew-crypto": (
2196
    RenewCrypto, ARGS_NONE,
2197
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2198
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2199
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2200
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2201
     NEW_NODE_CERT_OPT],
2202
    "[opts...]",
2203
    "Renews cluster certificates, keys and secrets"),
2204
  "epo": (
2205
    Epo, [ArgUnknown()],
2206
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2207
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2208
    "[opts...] [args]",
2209
    "Performs an emergency power-off on given args"),
2210
  "activate-master-ip": (
2211
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2212
  "deactivate-master-ip": (
2213
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2214
    "Deactivates the master IP"),
2215
  "show-ispecs-cmd": (
2216
    ShowCreateCommand, ARGS_NONE, [], "",
2217
    "Show the command line to re-create the cluster"),
2218
  "upgrade": (
2219
    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2220
    "Upgrade (or downgrade) to a new Ganeti version"),
2221
  }
2222

    
2223

    
2224
#: dictionary with aliases for commands
2225
aliases = {
2226
  "masterfailover": "master-failover",
2227
  "show": "info",
2228
}
2229

    
2230

    
2231
def Main():
2232
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2233
                     aliases=aliases)