Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 0fcb3314

History | View | Annotate | Download (71 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import bootstrap
37
from ganeti import compat
38
from ganeti import constants
39
from ganeti import errors
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import qlang
45
from ganeti import serializer
46
from ganeti import ssconf
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti.client import base
51

    
52

    
53
ON_OPT = cli_option("--on", default=False,
54
                    action="store_true", dest="on",
55
                    help="Recover from an EPO")
56

    
57
GROUPS_OPT = cli_option("--groups", default=False,
58
                        action="store_true", dest="groups",
59
                        help="Arguments are node groups instead of nodes")
60

    
61
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
62
                            help="Override interactive check for --no-voting",
63
                            default=False, action="store_true")
64

    
65
FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
66
                                help="Unconditionally distribute the"
67
                                " configuration, even if the queue"
68
                                " is drained",
69
                                default=False, action="store_true")
70

    
71
TO_OPT = cli_option("--to", default=None, type="string",
72
                    help="The Ganeti version to upgrade to")
73

    
74
RESUME_OPT = cli_option("--resume", default=False, action="store_true",
75
                        help="Resume any pending Ganeti upgrades")
76

    
77
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
78
_EPO_PING_TIMEOUT = 1 # 1 second
79
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
80

    
81

    
82
def _InitEnabledDiskTemplates(opts):
83
  """Initialize the list of enabled disk templates.
84

85
  """
86
  if opts.enabled_disk_templates:
87
    return opts.enabled_disk_templates.split(",")
88
  else:
89
    return constants.DEFAULT_ENABLED_DISK_TEMPLATES
90

    
91

    
92
def _InitVgName(opts, enabled_disk_templates):
93
  """Initialize the volume group name.
94

95
  @type enabled_disk_templates: list of strings
96
  @param enabled_disk_templates: cluster-wide enabled disk templates
97

98
  """
99
  vg_name = None
100
  if opts.vg_name is not None:
101
    vg_name = opts.vg_name
102
    if vg_name:
103
      if not utils.IsLvmEnabled(enabled_disk_templates):
104
        ToStdout("You specified a volume group with --vg-name, but you did not"
105
                 " enable any disk template that uses lvm.")
106
    elif utils.IsLvmEnabled(enabled_disk_templates):
107
      raise errors.OpPrereqError(
108
          "LVM disk templates are enabled, but vg name not set.")
109
  elif utils.IsLvmEnabled(enabled_disk_templates):
110
    vg_name = constants.DEFAULT_VG
111
  return vg_name
112

    
113

    
114
def _InitDrbdHelper(opts, enabled_disk_templates):
115
  """Initialize the DRBD usermode helper.
116

117
  """
118
  drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
119

    
120
  if not drbd_enabled and opts.drbd_helper is not None:
121
    ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
122
             " is not enabled.")
123

    
124
  if drbd_enabled:
125
    if opts.drbd_helper is None:
126
      return constants.DEFAULT_DRBD_HELPER
127
    if opts.drbd_helper == '':
128
      raise errors.OpPrereqError(
129
          "Unsetting the drbd usermode helper while enabling DRBD is not"
130
          " allowed.")
131

    
132
  return opts.drbd_helper
133

    
134

    
135
@UsesRPC
136
def InitCluster(opts, args):
137
  """Initialize the cluster.
138

139
  @param opts: the command line options selected by the user
140
  @type args: list
141
  @param args: should contain only one element, the desired
142
      cluster name
143
  @rtype: int
144
  @return: the desired exit code
145

146
  """
147
  enabled_disk_templates = _InitEnabledDiskTemplates(opts)
148

    
149
  try:
150
    vg_name = _InitVgName(opts, enabled_disk_templates)
151
    drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
152
  except errors.OpPrereqError, e:
153
    ToStderr(str(e))
154
    return 1
155

    
156
  master_netdev = opts.master_netdev
157
  if master_netdev is None:
158
    nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
159
    if not nic_mode:
160
      # default case, use bridging
161
      master_netdev = constants.DEFAULT_BRIDGE
162
    elif nic_mode == constants.NIC_MODE_OVS:
163
      # default ovs is different from default bridge
164
      master_netdev = constants.DEFAULT_OVS
165
      opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
166

    
167
  hvlist = opts.enabled_hypervisors
168
  if hvlist is None:
169
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
170
  hvlist = hvlist.split(",")
171

    
172
  hvparams = dict(opts.hvparams)
173
  beparams = opts.beparams
174
  nicparams = opts.nicparams
175

    
176
  diskparams = dict(opts.diskparams)
177

    
178
  # check the disk template types here, as we cannot rely on the type check done
179
  # by the opcode parameter types
180
  diskparams_keys = set(diskparams.keys())
181
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
182
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
183
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
184
    return 1
185

    
186
  # prepare beparams dict
187
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
188
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
189

    
190
  # prepare nicparams dict
191
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
192
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
193

    
194
  # prepare ndparams dict
195
  if opts.ndparams is None:
196
    ndparams = dict(constants.NDC_DEFAULTS)
197
  else:
198
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
199
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
200

    
201
  # prepare hvparams dict
202
  for hv in constants.HYPER_TYPES:
203
    if hv not in hvparams:
204
      hvparams[hv] = {}
205
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
206
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
207

    
208
  # prepare diskparams dict
209
  for templ in constants.DISK_TEMPLATES:
210
    if templ not in diskparams:
211
      diskparams[templ] = {}
212
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
213
                                         diskparams[templ])
214
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
215

    
216
  # prepare ipolicy dict
217
  ipolicy = CreateIPolicyFromOpts(
218
    ispecs_mem_size=opts.ispecs_mem_size,
219
    ispecs_cpu_count=opts.ispecs_cpu_count,
220
    ispecs_disk_count=opts.ispecs_disk_count,
221
    ispecs_disk_size=opts.ispecs_disk_size,
222
    ispecs_nic_count=opts.ispecs_nic_count,
223
    minmax_ispecs=opts.ipolicy_bounds_specs,
224
    std_ispecs=opts.ipolicy_std_specs,
225
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
226
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
227
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
228
    fill_all=True)
229

    
230
  if opts.candidate_pool_size is None:
231
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
232

    
233
  if opts.mac_prefix is None:
234
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
235

    
236
  uid_pool = opts.uid_pool
237
  if uid_pool is not None:
238
    uid_pool = uidpool.ParseUidPool(uid_pool)
239

    
240
  if opts.prealloc_wipe_disks is None:
241
    opts.prealloc_wipe_disks = False
242

    
243
  external_ip_setup_script = opts.use_external_mip_script
244
  if external_ip_setup_script is None:
245
    external_ip_setup_script = False
246

    
247
  try:
248
    primary_ip_version = int(opts.primary_ip_version)
249
  except (ValueError, TypeError), err:
250
    ToStderr("Invalid primary ip version value: %s" % str(err))
251
    return 1
252

    
253
  master_netmask = opts.master_netmask
254
  try:
255
    if master_netmask is not None:
256
      master_netmask = int(master_netmask)
257
  except (ValueError, TypeError), err:
258
    ToStderr("Invalid master netmask value: %s" % str(err))
259
    return 1
260

    
261
  if opts.disk_state:
262
    disk_state = utils.FlatToDict(opts.disk_state)
263
  else:
264
    disk_state = {}
265

    
266
  hv_state = dict(opts.hv_state)
267

    
268
  default_ialloc_params = opts.default_iallocator_params
269
  bootstrap.InitCluster(cluster_name=args[0],
270
                        secondary_ip=opts.secondary_ip,
271
                        vg_name=vg_name,
272
                        mac_prefix=opts.mac_prefix,
273
                        master_netmask=master_netmask,
274
                        master_netdev=master_netdev,
275
                        file_storage_dir=opts.file_storage_dir,
276
                        shared_file_storage_dir=opts.shared_file_storage_dir,
277
                        gluster_storage_dir=opts.gluster_storage_dir,
278
                        enabled_hypervisors=hvlist,
279
                        hvparams=hvparams,
280
                        beparams=beparams,
281
                        nicparams=nicparams,
282
                        ndparams=ndparams,
283
                        diskparams=diskparams,
284
                        ipolicy=ipolicy,
285
                        candidate_pool_size=opts.candidate_pool_size,
286
                        modify_etc_hosts=opts.modify_etc_hosts,
287
                        modify_ssh_setup=opts.modify_ssh_setup,
288
                        maintain_node_health=opts.maintain_node_health,
289
                        drbd_helper=drbd_helper,
290
                        uid_pool=uid_pool,
291
                        default_iallocator=opts.default_iallocator,
292
                        default_iallocator_params=default_ialloc_params,
293
                        primary_ip_version=primary_ip_version,
294
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
295
                        use_external_mip_script=external_ip_setup_script,
296
                        hv_state=hv_state,
297
                        disk_state=disk_state,
298
                        enabled_disk_templates=enabled_disk_templates,
299
                        )
300
  op = opcodes.OpClusterPostInit()
301
  SubmitOpCode(op, opts=opts)
302
  return 0
303

    
304

    
305
@UsesRPC
306
def DestroyCluster(opts, args):
307
  """Destroy the cluster.
308

309
  @param opts: the command line options selected by the user
310
  @type args: list
311
  @param args: should be an empty list
312
  @rtype: int
313
  @return: the desired exit code
314

315
  """
316
  if not opts.yes_do_it:
317
    ToStderr("Destroying a cluster is irreversible. If you really want"
318
             " destroy this cluster, supply the --yes-do-it option.")
319
    return 1
320

    
321
  op = opcodes.OpClusterDestroy()
322
  master_uuid = SubmitOpCode(op, opts=opts)
323
  # if we reached this, the opcode didn't fail; we can proceed to
324
  # shutdown all the daemons
325
  bootstrap.FinalizeClusterDestroy(master_uuid)
326
  return 0
327

    
328

    
329
def RenameCluster(opts, args):
330
  """Rename the cluster.
331

332
  @param opts: the command line options selected by the user
333
  @type args: list
334
  @param args: should contain only one element, the new cluster name
335
  @rtype: int
336
  @return: the desired exit code
337

338
  """
339
  cl = GetClient()
340

    
341
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
342

    
343
  new_name = args[0]
344
  if not opts.force:
345
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
346
                " connected over the network to the cluster name, the"
347
                " operation is very dangerous as the IP address will be"
348
                " removed from the node and the change may not go through."
349
                " Continue?") % (cluster_name, new_name)
350
    if not AskUser(usertext):
351
      return 1
352

    
353
  op = opcodes.OpClusterRename(name=new_name)
354
  result = SubmitOpCode(op, opts=opts, cl=cl)
355

    
356
  if result:
357
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
358

    
359
  return 0
360

    
361

    
362
def ActivateMasterIp(opts, args):
363
  """Activates the master IP.
364

365
  """
366
  op = opcodes.OpClusterActivateMasterIp()
367
  SubmitOpCode(op)
368
  return 0
369

    
370

    
371
def DeactivateMasterIp(opts, args):
372
  """Deactivates the master IP.
373

374
  """
375
  if not opts.confirm:
376
    usertext = ("This will disable the master IP. All the open connections to"
377
                " the master IP will be closed. To reach the master you will"
378
                " need to use its node IP."
379
                " Continue?")
380
    if not AskUser(usertext):
381
      return 1
382

    
383
  op = opcodes.OpClusterDeactivateMasterIp()
384
  SubmitOpCode(op)
385
  return 0
386

    
387

    
388
def RedistributeConfig(opts, args):
389
  """Forces push of the cluster configuration.
390

391
  @param opts: the command line options selected by the user
392
  @type args: list
393
  @param args: empty list
394
  @rtype: int
395
  @return: the desired exit code
396

397
  """
398
  op = opcodes.OpClusterRedistConf()
399
  if opts.yes_do_it:
400
    SubmitOpCodeToDrainedQueue(op)
401
  else:
402
    SubmitOrSend(op, opts)
403
  return 0
404

    
405

    
406
def ShowClusterVersion(opts, args):
407
  """Write version of ganeti software to the standard output.
408

409
  @param opts: the command line options selected by the user
410
  @type args: list
411
  @param args: should be an empty list
412
  @rtype: int
413
  @return: the desired exit code
414

415
  """
416
  cl = GetClient()
417
  result = cl.QueryClusterInfo()
418
  ToStdout("Software version: %s", result["software_version"])
419
  ToStdout("Internode protocol: %s", result["protocol_version"])
420
  ToStdout("Configuration format: %s", result["config_version"])
421
  ToStdout("OS api version: %s", result["os_api_version"])
422
  ToStdout("Export interface: %s", result["export_version"])
423
  ToStdout("VCS version: %s", result["vcs_version"])
424
  return 0
425

    
426

    
427
def ShowClusterMaster(opts, args):
428
  """Write name of master node to the standard output.
429

430
  @param opts: the command line options selected by the user
431
  @type args: list
432
  @param args: should be an empty list
433
  @rtype: int
434
  @return: the desired exit code
435

436
  """
437
  master = bootstrap.GetMaster()
438
  ToStdout(master)
439
  return 0
440

    
441

    
442
def _FormatGroupedParams(paramsdict, roman=False):
443
  """Format Grouped parameters (be, nic, disk) by group.
444

445
  @type paramsdict: dict of dicts
446
  @param paramsdict: {group: {param: value, ...}, ...}
447
  @rtype: dict of dicts
448
  @return: copy of the input dictionaries with strings as values
449

450
  """
451
  ret = {}
452
  for (item, val) in paramsdict.items():
453
    if isinstance(val, dict):
454
      ret[item] = _FormatGroupedParams(val, roman=roman)
455
    elif roman and isinstance(val, int):
456
      ret[item] = compat.TryToRoman(val)
457
    else:
458
      ret[item] = str(val)
459
  return ret
460

    
461

    
462
def ShowClusterConfig(opts, args):
463
  """Shows cluster information.
464

465
  @param opts: the command line options selected by the user
466
  @type args: list
467
  @param args: should be an empty list
468
  @rtype: int
469
  @return: the desired exit code
470

471
  """
472
  cl = GetClient()
473
  result = cl.QueryClusterInfo()
474

    
475
  if result["tags"]:
476
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
477
  else:
478
    tags = "(none)"
479
  if result["reserved_lvs"]:
480
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
481
  else:
482
    reserved_lvs = "(none)"
483

    
484
  enabled_hv = result["enabled_hypervisors"]
485
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
486
                  if k in enabled_hv)
487

    
488
  info = [
489
    ("Cluster name", result["name"]),
490
    ("Cluster UUID", result["uuid"]),
491

    
492
    ("Creation time", utils.FormatTime(result["ctime"])),
493
    ("Modification time", utils.FormatTime(result["mtime"])),
494

    
495
    ("Master node", result["master"]),
496

    
497
    ("Architecture (this node)",
498
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
499

    
500
    ("Tags", tags),
501

    
502
    ("Default hypervisor", result["default_hypervisor"]),
503
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
504

    
505
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
506

    
507
    ("OS-specific hypervisor parameters",
508
     _FormatGroupedParams(result["os_hvp"])),
509

    
510
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
511

    
512
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
513
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
514

    
515
    ("Cluster parameters", [
516
      ("candidate pool size",
517
       compat.TryToRoman(result["candidate_pool_size"],
518
                         convert=opts.roman_integers)),
519
      ("maximal number of jobs running simultaneously",
520
       compat.TryToRoman(result["max_running_jobs"],
521
                         convert=opts.roman_integers)),
522
      ("master netdev", result["master_netdev"]),
523
      ("master netmask", result["master_netmask"]),
524
      ("use external master IP address setup script",
525
       result["use_external_mip_script"]),
526
      ("lvm volume group", result["volume_group_name"]),
527
      ("lvm reserved volumes", reserved_lvs),
528
      ("drbd usermode helper", result["drbd_usermode_helper"]),
529
      ("file storage path", result["file_storage_dir"]),
530
      ("shared file storage path", result["shared_file_storage_dir"]),
531
      ("gluster storage path", result["gluster_storage_dir"]),
532
      ("maintenance of node health", result["maintain_node_health"]),
533
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
534
      ("default instance allocator", result["default_iallocator"]),
535
      ("default instance allocator parameters",
536
       result["default_iallocator_params"]),
537
      ("primary ip version", result["primary_ip_version"]),
538
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
539
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
540
      ("ExtStorage Providers search path",
541
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
542
      ("enabled disk templates",
543
       utils.CommaJoin(result["enabled_disk_templates"])),
544
      ("instance communication network",
545
       result["instance_communication_network"]),
546
      ]),
547

    
548
    ("Default node parameters",
549
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
550

    
551
    ("Default instance parameters",
552
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
553

    
554
    ("Default nic parameters",
555
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
556

    
557
    ("Default disk parameters",
558
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
559

    
560
    ("Instance policy - limits for instances",
561
     FormatPolicyInfo(result["ipolicy"], None, True)),
562
    ]
563

    
564
  PrintGenericInfo(info)
565
  return 0
566

    
567

    
568
def ClusterCopyFile(opts, args):
569
  """Copy a file from master to some nodes.
570

571
  @param opts: the command line options selected by the user
572
  @type args: list
573
  @param args: should contain only one element, the path of
574
      the file to be copied
575
  @rtype: int
576
  @return: the desired exit code
577

578
  """
579
  filename = args[0]
580
  if not os.path.exists(filename):
581
    raise errors.OpPrereqError("No such filename '%s'" % filename,
582
                               errors.ECODE_INVAL)
583

    
584
  cl = GetClient()
585
  qcl = GetClient()
586
  try:
587
    cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
588

    
589
    results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
590
                             secondary_ips=opts.use_replication_network,
591
                             nodegroup=opts.nodegroup)
592
    ports = GetNodesSshPorts(opts.nodes, qcl)
593
  finally:
594
    cl.Close()
595
    qcl.Close()
596

    
597
  srun = ssh.SshRunner(cluster_name)
598
  for (node, port) in zip(results, ports):
599
    if not srun.CopyFileToNode(node, port, filename):
600
      ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
601

    
602
  return 0
603

    
604

    
605
def RunClusterCommand(opts, args):
606
  """Run a command on some nodes.
607

608
  @param opts: the command line options selected by the user
609
  @type args: list
610
  @param args: should contain the command to be run and its arguments
611
  @rtype: int
612
  @return: the desired exit code
613

614
  """
615
  cl = GetClient()
616
  qcl = GetClient()
617

    
618
  command = " ".join(args)
619

    
620
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
621
  ports = GetNodesSshPorts(nodes, qcl)
622

    
623
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
624
                                                    "master_node"])
625

    
626
  srun = ssh.SshRunner(cluster_name=cluster_name)
627

    
628
  # Make sure master node is at list end
629
  if master_node in nodes:
630
    nodes.remove(master_node)
631
    nodes.append(master_node)
632

    
633
  for (name, port) in zip(nodes, ports):
634
    result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
635

    
636
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
637
      # Do not output anything for successful commands
638
      continue
639

    
640
    ToStdout("------------------------------------------------")
641
    if opts.show_machine_names:
642
      for line in result.output.splitlines():
643
        ToStdout("%s: %s", name, line)
644
    else:
645
      ToStdout("node: %s", name)
646
      ToStdout("%s", result.output)
647
    ToStdout("return code = %s", result.exit_code)
648

    
649
  return 0
650

    
651

    
652
def VerifyCluster(opts, args):
653
  """Verify integrity of cluster, performing various test on nodes.
654

655
  @param opts: the command line options selected by the user
656
  @type args: list
657
  @param args: should be an empty list
658
  @rtype: int
659
  @return: the desired exit code
660

661
  """
662
  skip_checks = []
663

    
664
  if opts.skip_nplusone_mem:
665
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
666

    
667
  cl = GetClient()
668

    
669
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
670
                               error_codes=opts.error_codes,
671
                               debug_simulate_errors=opts.simulate_errors,
672
                               skip_checks=skip_checks,
673
                               ignore_errors=opts.ignore_errors,
674
                               group_name=opts.nodegroup)
675
  result = SubmitOpCode(op, cl=cl, opts=opts)
676

    
677
  # Keep track of submitted jobs
678
  jex = JobExecutor(cl=cl, opts=opts)
679

    
680
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
681
    jex.AddJobId(None, status, job_id)
682

    
683
  results = jex.GetResults()
684

    
685
  (bad_jobs, bad_results) = \
686
    map(len,
687
        # Convert iterators to lists
688
        map(list,
689
            # Count errors
690
            map(compat.partial(itertools.ifilterfalse, bool),
691
                # Convert result to booleans in a tuple
692
                zip(*((job_success, len(op_results) == 1 and op_results[0])
693
                      for (job_success, op_results) in results)))))
694

    
695
  if bad_jobs == 0 and bad_results == 0:
696
    rcode = constants.EXIT_SUCCESS
697
  else:
698
    rcode = constants.EXIT_FAILURE
699
    if bad_jobs > 0:
700
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
701

    
702
  return rcode
703

    
704

    
705
def VerifyDisks(opts, args):
706
  """Verify integrity of cluster disks.
707

708
  @param opts: the command line options selected by the user
709
  @type args: list
710
  @param args: should be an empty list
711
  @rtype: int
712
  @return: the desired exit code
713

714
  """
715
  cl = GetClient()
716

    
717
  op = opcodes.OpClusterVerifyDisks()
718

    
719
  result = SubmitOpCode(op, cl=cl, opts=opts)
720

    
721
  # Keep track of submitted jobs
722
  jex = JobExecutor(cl=cl, opts=opts)
723

    
724
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
725
    jex.AddJobId(None, status, job_id)
726

    
727
  retcode = constants.EXIT_SUCCESS
728

    
729
  for (status, result) in jex.GetResults():
730
    if not status:
731
      ToStdout("Job failed: %s", result)
732
      continue
733

    
734
    ((bad_nodes, instances, missing), ) = result
735

    
736
    for node, text in bad_nodes.items():
737
      ToStdout("Error gathering data on node %s: %s",
738
               node, utils.SafeEncode(text[-400:]))
739
      retcode = constants.EXIT_FAILURE
740
      ToStdout("You need to fix these nodes first before fixing instances")
741

    
742
    for iname in instances:
743
      if iname in missing:
744
        continue
745
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
746
      try:
747
        ToStdout("Activating disks for instance '%s'", iname)
748
        SubmitOpCode(op, opts=opts, cl=cl)
749
      except errors.GenericError, err:
750
        nret, msg = FormatError(err)
751
        retcode |= nret
752
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
753

    
754
    if missing:
755
      for iname, ival in missing.iteritems():
756
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
757
        if all_missing:
758
          ToStdout("Instance %s cannot be verified as it lives on"
759
                   " broken nodes", iname)
760
        else:
761
          ToStdout("Instance %s has missing logical volumes:", iname)
762
          ival.sort()
763
          for node, vol in ival:
764
            if node in bad_nodes:
765
              ToStdout("\tbroken node %s /dev/%s", node, vol)
766
            else:
767
              ToStdout("\t%s /dev/%s", node, vol)
768

    
769
      ToStdout("You need to replace or recreate disks for all the above"
770
               " instances if this message persists after fixing broken nodes.")
771
      retcode = constants.EXIT_FAILURE
772
    elif not instances:
773
      ToStdout("No disks need to be activated.")
774

    
775
  return retcode
776

    
777

    
778
def RepairDiskSizes(opts, args):
779
  """Verify sizes of cluster disks.
780

781
  @param opts: the command line options selected by the user
782
  @type args: list
783
  @param args: optional list of instances to restrict check to
784
  @rtype: int
785
  @return: the desired exit code
786

787
  """
788
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
789
  SubmitOpCode(op, opts=opts)
790

    
791

    
792
@UsesRPC
793
def MasterFailover(opts, args):
794
  """Failover the master node.
795

796
  This command, when run on a non-master node, will cause the current
797
  master to cease being master, and the non-master to become new
798
  master.
799

800
  @param opts: the command line options selected by the user
801
  @type args: list
802
  @param args: should be an empty list
803
  @rtype: int
804
  @return: the desired exit code
805

806
  """
807
  if opts.no_voting and not opts.yes_do_it:
808
    usertext = ("This will perform the failover even if most other nodes"
809
                " are down, or if this node is outdated. This is dangerous"
810
                " as it can lead to a non-consistent cluster. Check the"
811
                " gnt-cluster(8) man page before proceeding. Continue?")
812
    if not AskUser(usertext):
813
      return 1
814

    
815
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
816

    
817

    
818
def MasterPing(opts, args):
819
  """Checks if the master is alive.
820

821
  @param opts: the command line options selected by the user
822
  @type args: list
823
  @param args: should be an empty list
824
  @rtype: int
825
  @return: the desired exit code
826

827
  """
828
  try:
829
    cl = GetClient()
830
    cl.QueryClusterInfo()
831
    return 0
832
  except Exception: # pylint: disable=W0703
833
    return 1
834

    
835

    
836
def SearchTags(opts, args):
837
  """Searches the tags on all the cluster.
838

839
  @param opts: the command line options selected by the user
840
  @type args: list
841
  @param args: should contain only one element, the tag pattern
842
  @rtype: int
843
  @return: the desired exit code
844

845
  """
846
  op = opcodes.OpTagsSearch(pattern=args[0])
847
  result = SubmitOpCode(op, opts=opts)
848
  if not result:
849
    return 1
850
  result = list(result)
851
  result.sort()
852
  for path, tag in result:
853
    ToStdout("%s %s", path, tag)
854

    
855

    
856
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
857
  """Reads and verifies an X509 certificate.
858

859
  @type cert_filename: string
860
  @param cert_filename: the path of the file containing the certificate to
861
                        verify encoded in PEM format
862
  @type verify_private_key: bool
863
  @param verify_private_key: whether to verify the private key in addition to
864
                             the public certificate
865
  @rtype: string
866
  @return: a string containing the PEM-encoded certificate.
867

868
  """
869
  try:
870
    pem = utils.ReadFile(cert_filename)
871
  except IOError, err:
872
    raise errors.X509CertError(cert_filename,
873
                               "Unable to read certificate: %s" % str(err))
874

    
875
  try:
876
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
877
  except Exception, err:
878
    raise errors.X509CertError(cert_filename,
879
                               "Unable to load certificate: %s" % str(err))
880

    
881
  if verify_private_key:
882
    try:
883
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
884
    except Exception, err:
885
      raise errors.X509CertError(cert_filename,
886
                                 "Unable to load private key: %s" % str(err))
887

    
888
  return pem
889

    
890

    
891
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
892
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
893
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
894
                 cds_filename, force, new_node_cert):
895
  """Renews cluster certificates, keys and secrets.
896

897
  @type new_cluster_cert: bool
898
  @param new_cluster_cert: Whether to generate a new cluster certificate
899
  @type new_rapi_cert: bool
900
  @param new_rapi_cert: Whether to generate a new RAPI certificate
901
  @type rapi_cert_filename: string
902
  @param rapi_cert_filename: Path to file containing new RAPI certificate
903
  @type new_spice_cert: bool
904
  @param new_spice_cert: Whether to generate a new SPICE certificate
905
  @type spice_cert_filename: string
906
  @param spice_cert_filename: Path to file containing new SPICE certificate
907
  @type spice_cacert_filename: string
908
  @param spice_cacert_filename: Path to file containing the certificate of the
909
                                CA that signed the SPICE certificate
910
  @type new_confd_hmac_key: bool
911
  @param new_confd_hmac_key: Whether to generate a new HMAC key
912
  @type new_cds: bool
913
  @param new_cds: Whether to generate a new cluster domain secret
914
  @type cds_filename: string
915
  @param cds_filename: Path to file containing new cluster domain secret
916
  @type force: bool
917
  @param force: Whether to ask user for confirmation
918
  @type new_node_cert: string
919
  @param new_node_cert: Whether to generate new node certificates
920

921
  """
922
  if new_rapi_cert and rapi_cert_filename:
923
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
924
             " options can be specified at the same time.")
925
    return 1
926

    
927
  if new_cds and cds_filename:
928
    ToStderr("Only one of the --new-cluster-domain-secret and"
929
             " --cluster-domain-secret options can be specified at"
930
             " the same time.")
931
    return 1
932

    
933
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
934
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
935
             " and --spice-ca-certificate must not be used.")
936
    return 1
937

    
938
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
939
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
940
             " specified.")
941
    return 1
942

    
943
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
944
  try:
945
    if rapi_cert_filename:
946
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
947
    if spice_cert_filename:
948
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
949
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
950
  except errors.X509CertError, err:
951
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
952
    return 1
953

    
954
  if cds_filename:
955
    try:
956
      cds = utils.ReadFile(cds_filename)
957
    except Exception, err: # pylint: disable=W0703
958
      ToStderr("Can't load new cluster domain secret from %s: %s" %
959
               (cds_filename, str(err)))
960
      return 1
961
  else:
962
    cds = None
963

    
964
  if not force:
965
    usertext = ("This requires all daemons on all nodes to be restarted and"
966
                " may take some time. Continue?")
967
    if not AskUser(usertext):
968
      return 1
969

    
970
  def _RenewCryptoInner(ctx):
971
    ctx.feedback_fn("Updating certificates and keys")
972
    # Note: the node certificate will be generated in the LU
973
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
974
                                    new_rapi_cert,
975
                                    new_spice_cert,
976
                                    new_confd_hmac_key,
977
                                    new_cds,
978
                                    rapi_cert_pem=rapi_cert_pem,
979
                                    spice_cert_pem=spice_cert_pem,
980
                                    spice_cacert_pem=spice_cacert_pem,
981
                                    cds=cds)
982

    
983
    files_to_copy = []
984

    
985
    if new_cluster_cert:
986
      files_to_copy.append(pathutils.NODED_CERT_FILE)
987

    
988
    if new_rapi_cert or rapi_cert_pem:
989
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
990

    
991
    if new_spice_cert or spice_cert_pem:
992
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
993
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
994

    
995
    if new_confd_hmac_key:
996
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
997

    
998
    if new_cds or cds:
999
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
1000

    
1001
    if files_to_copy:
1002
      for node_name in ctx.nonmaster_nodes:
1003
        port = ctx.ssh_ports[node_name]
1004
        ctx.feedback_fn("Copying %s to %s:%d" %
1005
                        (", ".join(files_to_copy), node_name, port))
1006
        for file_name in files_to_copy:
1007
          ctx.ssh.CopyFileToNode(node_name, port, file_name)
1008

    
1009
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1010

    
1011
  ToStdout("All requested certificates and keys have been replaced."
1012
           " Running \"gnt-cluster verify\" now is recommended.")
1013

    
1014
  if new_node_cert:
1015
    cl = GetClient()
1016
    renew_op = opcodes.OpClusterRenewCrypto()
1017
    SubmitOpCode(renew_op, cl=cl)
1018

    
1019
  return 0
1020

    
1021

    
1022
def RenewCrypto(opts, args):
1023
  """Renews cluster certificates, keys and secrets.
1024

1025
  """
1026
  return _RenewCrypto(opts.new_cluster_cert,
1027
                      opts.new_rapi_cert,
1028
                      opts.rapi_cert,
1029
                      opts.new_spice_cert,
1030
                      opts.spice_cert,
1031
                      opts.spice_cacert,
1032
                      opts.new_confd_hmac_key,
1033
                      opts.new_cluster_domain_secret,
1034
                      opts.cluster_domain_secret,
1035
                      opts.force,
1036
                      opts.new_node_cert)
1037

    
1038

    
1039
def _GetEnabledDiskTemplates(opts):
1040
  """Determine the list of enabled disk templates.
1041

1042
  """
1043
  if opts.enabled_disk_templates:
1044
    return opts.enabled_disk_templates.split(",")
1045
  else:
1046
    return None
1047

    
1048

    
1049
def _GetVgName(opts, enabled_disk_templates):
1050
  """Determine the volume group name.
1051

1052
  @type enabled_disk_templates: list of strings
1053
  @param enabled_disk_templates: cluster-wide enabled disk-templates
1054

1055
  """
1056
  # consistency between vg name and enabled disk templates
1057
  vg_name = None
1058
  if opts.vg_name is not None:
1059
    vg_name = opts.vg_name
1060
  if enabled_disk_templates:
1061
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1062
      ToStdout("You specified a volume group with --vg-name, but you did not"
1063
               " enable any of the following lvm-based disk templates: %s" %
1064
               utils.CommaJoin(constants.DTS_LVM))
1065
  return vg_name
1066

    
1067

    
1068
def _GetDrbdHelper(opts, enabled_disk_templates):
1069
  """Determine the DRBD usermode helper.
1070

1071
  """
1072
  drbd_helper = opts.drbd_helper
1073
  if enabled_disk_templates:
1074
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1075
    if not drbd_enabled and opts.drbd_helper:
1076
      ToStdout("You specified a DRBD usermode helper with "
1077
               " --drbd-usermode-helper while DRBD is not enabled.")
1078
  return drbd_helper
1079

    
1080

    
1081
def SetClusterParams(opts, args):
1082
  """Modify the cluster.
1083

1084
  @param opts: the command line options selected by the user
1085
  @type args: list
1086
  @param args: should be an empty list
1087
  @rtype: int
1088
  @return: the desired exit code
1089

1090
  """
1091
  if not (opts.vg_name is not None or
1092
          opts.drbd_helper is not None or
1093
          opts.enabled_hypervisors or opts.hvparams or
1094
          opts.beparams or opts.nicparams or
1095
          opts.ndparams or opts.diskparams or
1096
          opts.candidate_pool_size is not None or
1097
          opts.max_running_jobs is not None or
1098
          opts.uid_pool is not None or
1099
          opts.maintain_node_health is not None or
1100
          opts.add_uids is not None or
1101
          opts.remove_uids is not None or
1102
          opts.default_iallocator is not None or
1103
          opts.default_iallocator_params or
1104
          opts.reserved_lvs is not None or
1105
          opts.master_netdev is not None or
1106
          opts.master_netmask is not None or
1107
          opts.use_external_mip_script is not None or
1108
          opts.prealloc_wipe_disks is not None or
1109
          opts.hv_state or
1110
          opts.enabled_disk_templates or
1111
          opts.disk_state or
1112
          opts.ipolicy_bounds_specs is not None or
1113
          opts.ipolicy_std_specs is not None or
1114
          opts.ipolicy_disk_templates is not None or
1115
          opts.ipolicy_vcpu_ratio is not None or
1116
          opts.ipolicy_spindle_ratio is not None or
1117
          opts.modify_etc_hosts is not None or
1118
          opts.file_storage_dir is not None or
1119
          opts.instance_communication_network is not None):
1120
    ToStderr("Please give at least one of the parameters.")
1121
    return 1
1122

    
1123
  enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1124
  vg_name = _GetVgName(opts, enabled_disk_templates)
1125

    
1126
  try:
1127
    drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1128
  except errors.OpPrereqError, e:
1129
    ToStderr(str(e))
1130
    return 1
1131

    
1132
  hvlist = opts.enabled_hypervisors
1133
  if hvlist is not None:
1134
    hvlist = hvlist.split(",")
1135

    
1136
  # a list of (name, dict) we can pass directly to dict() (or [])
1137
  hvparams = dict(opts.hvparams)
1138
  for hv_params in hvparams.values():
1139
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1140

    
1141
  diskparams = dict(opts.diskparams)
1142

    
1143
  for dt_params in diskparams.values():
1144
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1145

    
1146
  beparams = opts.beparams
1147
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1148

    
1149
  nicparams = opts.nicparams
1150
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1151

    
1152
  ndparams = opts.ndparams
1153
  if ndparams is not None:
1154
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1155

    
1156
  ipolicy = CreateIPolicyFromOpts(
1157
    minmax_ispecs=opts.ipolicy_bounds_specs,
1158
    std_ispecs=opts.ipolicy_std_specs,
1159
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1160
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1161
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1162
    )
1163

    
1164
  mnh = opts.maintain_node_health
1165

    
1166
  uid_pool = opts.uid_pool
1167
  if uid_pool is not None:
1168
    uid_pool = uidpool.ParseUidPool(uid_pool)
1169

    
1170
  add_uids = opts.add_uids
1171
  if add_uids is not None:
1172
    add_uids = uidpool.ParseUidPool(add_uids)
1173

    
1174
  remove_uids = opts.remove_uids
1175
  if remove_uids is not None:
1176
    remove_uids = uidpool.ParseUidPool(remove_uids)
1177

    
1178
  if opts.reserved_lvs is not None:
1179
    if opts.reserved_lvs == "":
1180
      opts.reserved_lvs = []
1181
    else:
1182
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1183

    
1184
  if opts.master_netmask is not None:
1185
    try:
1186
      opts.master_netmask = int(opts.master_netmask)
1187
    except ValueError:
1188
      ToStderr("The --master-netmask option expects an int parameter.")
1189
      return 1
1190

    
1191
  ext_ip_script = opts.use_external_mip_script
1192

    
1193
  if opts.disk_state:
1194
    disk_state = utils.FlatToDict(opts.disk_state)
1195
  else:
1196
    disk_state = {}
1197

    
1198
  hv_state = dict(opts.hv_state)
1199

    
1200
  op = opcodes.OpClusterSetParams(
1201
    vg_name=vg_name,
1202
    drbd_helper=drbd_helper,
1203
    enabled_hypervisors=hvlist,
1204
    hvparams=hvparams,
1205
    os_hvp=None,
1206
    beparams=beparams,
1207
    nicparams=nicparams,
1208
    ndparams=ndparams,
1209
    diskparams=diskparams,
1210
    ipolicy=ipolicy,
1211
    candidate_pool_size=opts.candidate_pool_size,
1212
    max_running_jobs=opts.max_running_jobs,
1213
    maintain_node_health=mnh,
1214
    modify_etc_hosts=opts.modify_etc_hosts,
1215
    uid_pool=uid_pool,
1216
    add_uids=add_uids,
1217
    remove_uids=remove_uids,
1218
    default_iallocator=opts.default_iallocator,
1219
    default_iallocator_params=opts.default_iallocator_params,
1220
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1221
    master_netdev=opts.master_netdev,
1222
    master_netmask=opts.master_netmask,
1223
    reserved_lvs=opts.reserved_lvs,
1224
    use_external_mip_script=ext_ip_script,
1225
    hv_state=hv_state,
1226
    disk_state=disk_state,
1227
    enabled_disk_templates=enabled_disk_templates,
1228
    force=opts.force,
1229
    file_storage_dir=opts.file_storage_dir,
1230
    instance_communication_network=opts.instance_communication_network
1231
    )
1232
  return base.GetResult(None, opts, SubmitOrSend(op, opts))
1233

    
1234

    
1235
def QueueOps(opts, args):
1236
  """Queue operations.
1237

1238
  @param opts: the command line options selected by the user
1239
  @type args: list
1240
  @param args: should contain only one element, the subcommand
1241
  @rtype: int
1242
  @return: the desired exit code
1243

1244
  """
1245
  command = args[0]
1246
  client = GetClient()
1247
  if command in ("drain", "undrain"):
1248
    drain_flag = command == "drain"
1249
    client.SetQueueDrainFlag(drain_flag)
1250
  elif command == "info":
1251
    result = client.QueryConfigValues(["drain_flag"])
1252
    if result[0]:
1253
      val = "set"
1254
    else:
1255
      val = "unset"
1256
    ToStdout("The drain flag is %s" % val)
1257
  else:
1258
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1259
                               errors.ECODE_INVAL)
1260

    
1261
  return 0
1262

    
1263

    
1264
def _ShowWatcherPause(until):
1265
  if until is None or until < time.time():
1266
    ToStdout("The watcher is not paused.")
1267
  else:
1268
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1269

    
1270

    
1271
def WatcherOps(opts, args):
1272
  """Watcher operations.
1273

1274
  @param opts: the command line options selected by the user
1275
  @type args: list
1276
  @param args: should contain only one element, the subcommand
1277
  @rtype: int
1278
  @return: the desired exit code
1279

1280
  """
1281
  command = args[0]
1282
  client = GetClient()
1283

    
1284
  if command == "continue":
1285
    client.SetWatcherPause(None)
1286
    ToStdout("The watcher is no longer paused.")
1287

    
1288
  elif command == "pause":
1289
    if len(args) < 2:
1290
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1291

    
1292
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1293
    _ShowWatcherPause(result)
1294

    
1295
  elif command == "info":
1296
    result = client.QueryConfigValues(["watcher_pause"])
1297
    _ShowWatcherPause(result[0])
1298

    
1299
  else:
1300
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1301
                               errors.ECODE_INVAL)
1302

    
1303
  return 0
1304

    
1305

    
1306
def _OobPower(opts, node_list, power):
1307
  """Puts the node in the list to desired power state.
1308

1309
  @param opts: The command line options selected by the user
1310
  @param node_list: The list of nodes to operate on
1311
  @param power: True if they should be powered on, False otherwise
1312
  @return: The success of the operation (none failed)
1313

1314
  """
1315
  if power:
1316
    command = constants.OOB_POWER_ON
1317
  else:
1318
    command = constants.OOB_POWER_OFF
1319

    
1320
  op = opcodes.OpOobCommand(node_names=node_list,
1321
                            command=command,
1322
                            ignore_status=True,
1323
                            timeout=opts.oob_timeout,
1324
                            power_delay=opts.power_delay)
1325
  result = SubmitOpCode(op, opts=opts)
1326
  errs = 0
1327
  for node_result in result:
1328
    (node_tuple, data_tuple) = node_result
1329
    (_, node_name) = node_tuple
1330
    (data_status, _) = data_tuple
1331
    if data_status != constants.RS_NORMAL:
1332
      assert data_status != constants.RS_UNAVAIL
1333
      errs += 1
1334
      ToStderr("There was a problem changing power for %s, please investigate",
1335
               node_name)
1336

    
1337
  if errs > 0:
1338
    return False
1339

    
1340
  return True
1341

    
1342

    
1343
def _InstanceStart(opts, inst_list, start, no_remember=False):
1344
  """Puts the instances in the list to desired state.
1345

1346
  @param opts: The command line options selected by the user
1347
  @param inst_list: The list of instances to operate on
1348
  @param start: True if they should be started, False for shutdown
1349
  @param no_remember: If the instance state should be remembered
1350
  @return: The success of the operation (none failed)
1351

1352
  """
1353
  if start:
1354
    opcls = opcodes.OpInstanceStartup
1355
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1356
  else:
1357
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1358
                           timeout=opts.shutdown_timeout,
1359
                           no_remember=no_remember)
1360
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1361

    
1362
  jex = JobExecutor(opts=opts)
1363

    
1364
  for inst in inst_list:
1365
    ToStdout("Submit %s of instance %s", text_submit, inst)
1366
    op = opcls(instance_name=inst)
1367
    jex.QueueJob(inst, op)
1368

    
1369
  results = jex.GetResults()
1370
  bad_cnt = len([1 for (success, _) in results if not success])
1371

    
1372
  if bad_cnt == 0:
1373
    ToStdout("All instances have been %s successfully", text_success)
1374
  else:
1375
    ToStderr("There were errors while %s instances:\n"
1376
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1377
             len(results))
1378
    return False
1379

    
1380
  return True
1381

    
1382

    
1383
class _RunWhenNodesReachableHelper:
1384
  """Helper class to make shared internal state sharing easier.
1385

1386
  @ivar success: Indicates if all action_cb calls were successful
1387

1388
  """
1389
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1390
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1391
    """Init the object.
1392

1393
    @param node_list: The list of nodes to be reachable
1394
    @param action_cb: Callback called when a new host is reachable
1395
    @type node2ip: dict
1396
    @param node2ip: Node to ip mapping
1397
    @param port: The port to use for the TCP ping
1398
    @param feedback_fn: The function used for feedback
1399
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1400
    @param _sleep_fn: Function to sleep (for unittest use only)
1401

1402
    """
1403
    self.down = set(node_list)
1404
    self.up = set()
1405
    self.node2ip = node2ip
1406
    self.success = True
1407
    self.action_cb = action_cb
1408
    self.port = port
1409
    self.feedback_fn = feedback_fn
1410
    self._ping_fn = _ping_fn
1411
    self._sleep_fn = _sleep_fn
1412

    
1413
  def __call__(self):
1414
    """When called we run action_cb.
1415

1416
    @raises utils.RetryAgain: When there are still down nodes
1417

1418
    """
1419
    if not self.action_cb(self.up):
1420
      self.success = False
1421

    
1422
    if self.down:
1423
      raise utils.RetryAgain()
1424
    else:
1425
      return self.success
1426

    
1427
  def Wait(self, secs):
1428
    """Checks if a host is up or waits remaining seconds.
1429

1430
    @param secs: The secs remaining
1431

1432
    """
1433
    start = time.time()
1434
    for node in self.down:
1435
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1436
                       live_port_needed=True):
1437
        self.feedback_fn("Node %s became available" % node)
1438
        self.up.add(node)
1439
        self.down -= self.up
1440
        # If we have a node available there is the possibility to run the
1441
        # action callback successfully, therefore we don't wait and return
1442
        return
1443

    
1444
    self._sleep_fn(max(0.0, start + secs - time.time()))
1445

    
1446

    
1447
def _RunWhenNodesReachable(node_list, action_cb, interval):
1448
  """Run action_cb when nodes become reachable.
1449

1450
  @param node_list: The list of nodes to be reachable
1451
  @param action_cb: Callback called when a new host is reachable
1452
  @param interval: The earliest time to retry
1453

1454
  """
1455
  client = GetClient()
1456
  cluster_info = client.QueryClusterInfo()
1457
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1458
    family = netutils.IPAddress.family
1459
  else:
1460
    family = netutils.IP6Address.family
1461

    
1462
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1463
                 for node in node_list)
1464

    
1465
  port = netutils.GetDaemonPort(constants.NODED)
1466
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1467
                                        ToStdout)
1468

    
1469
  try:
1470
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1471
                       wait_fn=helper.Wait)
1472
  except utils.RetryTimeout:
1473
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1474
             " again:\n  - %s", "  - ".join(helper.down))
1475
    return False
1476

    
1477

    
1478
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1479
                          _instance_start_fn=_InstanceStart):
1480
  """Start the instances conditional based on node_states.
1481

1482
  @param opts: The command line options selected by the user
1483
  @param inst_map: A dict of inst -> nodes mapping
1484
  @param nodes_online: A list of nodes online
1485
  @param _instance_start_fn: Callback to start instances (unittest use only)
1486
  @return: Success of the operation on all instances
1487

1488
  """
1489
  start_inst_list = []
1490
  for (inst, nodes) in inst_map.items():
1491
    if not (nodes - nodes_online):
1492
      # All nodes the instance lives on are back online
1493
      start_inst_list.append(inst)
1494

    
1495
  for inst in start_inst_list:
1496
    del inst_map[inst]
1497

    
1498
  if start_inst_list:
1499
    return _instance_start_fn(opts, start_inst_list, True)
1500

    
1501
  return True
1502

    
1503

    
1504
def _EpoOn(opts, full_node_list, node_list, inst_map):
1505
  """Does the actual power on.
1506

1507
  @param opts: The command line options selected by the user
1508
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1509
                         OOB)
1510
  @param node_list: The list of nodes to operate on (all need to support OOB)
1511
  @param inst_map: A dict of inst -> nodes mapping
1512
  @return: The desired exit status
1513

1514
  """
1515
  if node_list and not _OobPower(opts, node_list, False):
1516
    ToStderr("Not all nodes seem to get back up, investigate and start"
1517
             " manually if needed")
1518

    
1519
  # Wait for the nodes to be back up
1520
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1521

    
1522
  ToStdout("Waiting until all nodes are available again")
1523
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1524
    ToStderr("Please investigate and start stopped instances manually")
1525
    return constants.EXIT_FAILURE
1526

    
1527
  return constants.EXIT_SUCCESS
1528

    
1529

    
1530
def _EpoOff(opts, node_list, inst_map):
1531
  """Does the actual power off.
1532

1533
  @param opts: The command line options selected by the user
1534
  @param node_list: The list of nodes to operate on (all need to support OOB)
1535
  @param inst_map: A dict of inst -> nodes mapping
1536
  @return: The desired exit status
1537

1538
  """
1539
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1540
    ToStderr("Please investigate and stop instances manually before continuing")
1541
    return constants.EXIT_FAILURE
1542

    
1543
  if not node_list:
1544
    return constants.EXIT_SUCCESS
1545

    
1546
  if _OobPower(opts, node_list, False):
1547
    return constants.EXIT_SUCCESS
1548
  else:
1549
    return constants.EXIT_FAILURE
1550

    
1551

    
1552
def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1553
        _confirm_fn=ConfirmOperation,
1554
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1555
  """EPO operations.
1556

1557
  @param opts: the command line options selected by the user
1558
  @type args: list
1559
  @param args: should contain only one element, the subcommand
1560
  @rtype: int
1561
  @return: the desired exit code
1562

1563
  """
1564
  if opts.groups and opts.show_all:
1565
    _stderr_fn("Only one of --groups or --all are allowed")
1566
    return constants.EXIT_FAILURE
1567
  elif args and opts.show_all:
1568
    _stderr_fn("Arguments in combination with --all are not allowed")
1569
    return constants.EXIT_FAILURE
1570

    
1571
  if qcl is None:
1572
    # Query client
1573
    qcl = GetClient()
1574

    
1575
  if opts.groups:
1576
    node_query_list = \
1577
      itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1578
  else:
1579
    node_query_list = args
1580

    
1581
  result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1582
                                            "sinst_list", "powered", "offline"],
1583
                          False)
1584

    
1585
  all_nodes = map(compat.fst, result)
1586
  node_list = []
1587
  inst_map = {}
1588
  for (node, master, pinsts, sinsts, powered, offline) in result:
1589
    if not offline:
1590
      for inst in (pinsts + sinsts):
1591
        if inst in inst_map:
1592
          if not master:
1593
            inst_map[inst].add(node)
1594
        elif master:
1595
          inst_map[inst] = set()
1596
        else:
1597
          inst_map[inst] = set([node])
1598

    
1599
    if master and opts.on:
1600
      # We ignore the master for turning on the machines, in fact we are
1601
      # already operating on the master at this point :)
1602
      continue
1603
    elif master and not opts.show_all:
1604
      _stderr_fn("%s is the master node, please do a master-failover to another"
1605
                 " node not affected by the EPO or use --all if you intend to"
1606
                 " shutdown the whole cluster", node)
1607
      return constants.EXIT_FAILURE
1608
    elif powered is None:
1609
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1610
                 " handled in a fully automated manner", node)
1611
    elif powered == opts.on:
1612
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1613
    elif not offline or (offline and powered):
1614
      node_list.append(node)
1615

    
1616
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1617
    return constants.EXIT_FAILURE
1618

    
1619
  if opts.on:
1620
    return _on_fn(opts, all_nodes, node_list, inst_map)
1621
  else:
1622
    return _off_fn(opts, node_list, inst_map)
1623

    
1624

    
1625
def _GetCreateCommand(info):
1626
  buf = StringIO()
1627
  buf.write("gnt-cluster init")
1628
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1629
  buf.write(" ")
1630
  buf.write(info["name"])
1631
  return buf.getvalue()
1632

    
1633

    
1634
def ShowCreateCommand(opts, args):
1635
  """Shows the command that can be used to re-create the cluster.
1636

1637
  Currently it works only for ipolicy specs.
1638

1639
  """
1640
  cl = GetClient()
1641
  result = cl.QueryClusterInfo()
1642
  ToStdout(_GetCreateCommand(result))
1643

    
1644

    
1645
def _RunCommandAndReport(cmd):
1646
  """Run a command and report its output, iff it failed.
1647

1648
  @param cmd: the command to execute
1649
  @type cmd: list
1650
  @rtype: bool
1651
  @return: False, if the execution failed.
1652

1653
  """
1654
  result = utils.RunCmd(cmd)
1655
  if result.failed:
1656
    ToStderr("Command %s failed: %s; Output %s" %
1657
             (cmd, result.fail_reason, result.output))
1658
    return False
1659
  return True
1660

    
1661

    
1662
def _VerifyCommand(cmd):
1663
  """Verify that a given command succeeds on all online nodes.
1664

1665
  As this function is intended to run during upgrades, it
1666
  is implemented in such a way that it still works, if all Ganeti
1667
  daemons are down.
1668

1669
  @param cmd: the command to execute
1670
  @type cmd: list
1671
  @rtype: list
1672
  @return: the list of node names that are online where
1673
      the command failed.
1674

1675
  """
1676
  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1677

    
1678
  nodes = ssconf.SimpleStore().GetOnlineNodeList()
1679
  master_node = ssconf.SimpleStore().GetMasterNode()
1680
  cluster_name = ssconf.SimpleStore().GetClusterName()
1681

    
1682
  # If master node is in 'nodes', make sure master node is at list end
1683
  if master_node in nodes:
1684
    nodes.remove(master_node)
1685
    nodes.append(master_node)
1686

    
1687
  failed = []
1688

    
1689
  srun = ssh.SshRunner(cluster_name=cluster_name)
1690
  for name in nodes:
1691
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1692
    if result.exit_code != 0:
1693
      failed.append(name)
1694

    
1695
  return failed
1696

    
1697

    
1698
def _VerifyVersionInstalled(versionstring):
1699
  """Verify that the given version of ganeti is installed on all online nodes.
1700

1701
  Do nothing, if this is the case, otherwise print an appropriate
1702
  message to stderr.
1703

1704
  @param versionstring: the version to check for
1705
  @type versionstring: string
1706
  @rtype: bool
1707
  @return: True, if the version is installed on all online nodes
1708

1709
  """
1710
  badnodes = _VerifyCommand(["test", "-d",
1711
                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
1712
  if badnodes:
1713
    ToStderr("Ganeti version %s not installed on nodes %s"
1714
             % (versionstring, ", ".join(badnodes)))
1715
    return False
1716

    
1717
  return True
1718

    
1719

    
1720
def _GetRunning():
1721
  """Determine the list of running jobs.
1722

1723
  @rtype: list
1724
  @return: the number of jobs still running
1725

1726
  """
1727
  cl = GetClient()
1728
  qfilter = qlang.MakeSimpleFilter("status",
1729
                                   frozenset([constants.JOB_STATUS_RUNNING]))
1730
  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1731

    
1732

    
1733
def _SetGanetiVersion(versionstring):
1734
  """Set the active version of ganeti to the given versionstring
1735

1736
  @type versionstring: string
1737
  @rtype: list
1738
  @return: the list of nodes where the version change failed
1739

1740
  """
1741
  failed = []
1742
  if constants.HAS_GNU_LN:
1743
    failed.extend(_VerifyCommand(
1744
        ["ln", "-s", "-f", "-T",
1745
         os.path.join(pathutils.PKGLIBDIR, versionstring),
1746
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1747
    failed.extend(_VerifyCommand(
1748
        ["ln", "-s", "-f", "-T",
1749
         os.path.join(pathutils.SHAREDIR, versionstring),
1750
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1751
  else:
1752
    failed.extend(_VerifyCommand(
1753
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1754
    failed.extend(_VerifyCommand(
1755
        ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1756
         os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1757
    failed.extend(_VerifyCommand(
1758
        ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1759
    failed.extend(_VerifyCommand(
1760
        ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1761
         os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1762
  return list(set(failed))
1763

    
1764

    
1765
def _ExecuteCommands(fns):
1766
  """Execute a list of functions, in reverse order.
1767

1768
  @type fns: list of functions.
1769
  @param fns: the functions to be executed.
1770

1771
  """
1772
  for fn in reversed(fns):
1773
    fn()
1774

    
1775

    
1776
def _GetConfigVersion():
1777
  """Determine the version the configuration file currently has.
1778

1779
  @rtype: tuple or None
1780
  @return: (major, minor, revision) if the version can be determined,
1781
      None otherwise
1782

1783
  """
1784
  config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1785
  try:
1786
    config_version = config_data["version"]
1787
  except KeyError:
1788
    return None
1789
  return utils.SplitVersion(config_version)
1790

    
1791

    
1792
def _ReadIntentToUpgrade():
1793
  """Read the file documenting the intent to upgrade the cluster.
1794

1795
  @rtype: (string, string) or (None, None)
1796
  @return: (old version, version to upgrade to), if the file exists,
1797
      and (None, None) otherwise.
1798

1799
  """
1800
  if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1801
    return (None, None)
1802

    
1803
  contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1804
  contents = utils.UnescapeAndSplit(contentstring)
1805
  if len(contents) != 3:
1806
    # file syntactically mal-formed
1807
    return (None, None)
1808
  return (contents[0], contents[1])
1809

    
1810

    
1811
def _WriteIntentToUpgrade(version):
1812
  """Write file documenting the intent to upgrade the cluster.
1813

1814
  @type version: string
1815
  @param version: the version we intent to upgrade to
1816

1817
  """
1818
  utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1819
                  data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1820
                                            "%d" % os.getpid()]))
1821

    
1822

    
1823
def _UpgradeBeforeConfigurationChange(versionstring):
1824
  """
1825
  Carry out all the tasks necessary for an upgrade that happen before
1826
  the configuration file, or Ganeti version, changes.
1827

1828
  @type versionstring: string
1829
  @param versionstring: the version to upgrade to
1830
  @rtype: (bool, list)
1831
  @return: tuple of a bool indicating success and a list of rollback tasks
1832

1833
  """
1834
  rollback = []
1835

    
1836
  if not _VerifyVersionInstalled(versionstring):
1837
    return (False, rollback)
1838

    
1839
  _WriteIntentToUpgrade(versionstring)
1840
  rollback.append(
1841
    lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1842

    
1843
  ToStdout("Draining queue")
1844
  client = GetClient()
1845
  client.SetQueueDrainFlag(True)
1846

    
1847
  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1848

    
1849
  if utils.SimpleRetry(0, _GetRunning,
1850
                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
1851
                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1852
    ToStderr("Failed to completely empty the queue.")
1853
    return (False, rollback)
1854

    
1855
  ToStdout("Stopping daemons on master node.")
1856
  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1857
    return (False, rollback)
1858

    
1859
  if not _VerifyVersionInstalled(versionstring):
1860
    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1861
    return (False, rollback)
1862

    
1863
  ToStdout("Stopping daemons everywhere.")
1864
  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1865
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1866
  if badnodes:
1867
    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
1868
    return (False, rollback)
1869

    
1870
  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
1871
                           "lib/ganeti%d.tar" % time.time())
1872
  ToStdout("Backing up configuration as %s" % backuptar)
1873
  if not _RunCommandAndReport(["tar", "cf", backuptar,
1874
                               pathutils.DATA_DIR]):
1875
    return (False, rollback)
1876

    
1877
  return (True, rollback)
1878

    
1879

    
1880
def _VersionSpecificDowngrade():
1881
  """
1882
  Perform any additional downrade tasks that are version specific
1883
  and need to be done just after the configuration downgrade. This
1884
  function needs to be idempotent, so that it can be redone if the
1885
  downgrade procedure gets interrupted after changing the
1886
  configuration.
1887

1888
  Note that this function has to be reset with every version bump.
1889

1890
  @return: True upon success
1891
  """
1892
  ToStdout("Performing version-specific downgrade tasks.")
1893
  return True
1894

    
1895

    
1896
def _SwitchVersionAndConfig(versionstring, downgrade):
1897
  """
1898
  Switch to the new Ganeti version and change the configuration,
1899
  in correct order.
1900

1901
  @type versionstring: string
1902
  @param versionstring: the version to change to
1903
  @type downgrade: bool
1904
  @param downgrade: True, if the configuration should be downgraded
1905
  @rtype: (bool, list)
1906
  @return: tupe of a bool indicating success, and a list of
1907
      additional rollback tasks
1908

1909
  """
1910
  rollback = []
1911
  if downgrade:
1912
    ToStdout("Downgrading configuration")
1913
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
1914
      return (False, rollback)
1915
    # Note: version specific downgrades need to be done before switching
1916
    # binaries, so that we still have the knowledgeable binary if the downgrade
1917
    # process gets interrupted at this point.
1918
    if not _VersionSpecificDowngrade():
1919
      return (False, rollback)
1920

    
1921
  # Configuration change is the point of no return. From then onwards, it is
1922
  # safer to push through the up/dowgrade than to try to roll it back.
1923

    
1924
  ToStdout("Switching to version %s on all nodes" % versionstring)
1925
  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
1926
  badnodes = _SetGanetiVersion(versionstring)
1927
  if badnodes:
1928
    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
1929
             % (versionstring, ", ".join(badnodes)))
1930
    if not downgrade:
1931
      return (False, rollback)
1932

    
1933
  # Now that we have changed to the new version of Ganeti we should
1934
  # not communicate over luxi any more, as luxi might have changed in
1935
  # incompatible ways. Therefore, manually call the corresponding ganeti
1936
  # commands using their canonical (version independent) path.
1937

    
1938
  if not downgrade:
1939
    ToStdout("Upgrading configuration")
1940
    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
1941
      return (False, rollback)
1942

    
1943
  return (True, rollback)
1944

    
1945

    
1946
def _UpgradeAfterConfigurationChange(oldversion):
1947
  """
1948
  Carry out the upgrade actions necessary after switching to the new
1949
  Ganeti version and updating the configuration.
1950

1951
  As this part is run at a time where the new version of Ganeti is already
1952
  running, no communication should happen via luxi, as this is not a stable
1953
  interface. Also, as the configuration change is the point of no return,
1954
  all actions are pushed trough, even if some of them fail.
1955

1956
  @param oldversion: the version the upgrade started from
1957
  @type oldversion: string
1958
  @rtype: int
1959
  @return: the intended return value
1960

1961
  """
1962
  returnvalue = 0
1963

    
1964
  ToStdout("Ensuring directories everywhere.")
1965
  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
1966
  if badnodes:
1967
    ToStderr("Warning: failed to ensure directories on %s." %
1968
             (", ".join(badnodes)))
1969
    returnvalue = 1
1970

    
1971
  ToStdout("Starting daemons everywhere.")
1972
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
1973
  if badnodes:
1974
    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
1975
    returnvalue = 1
1976

    
1977
  ToStdout("Redistributing the configuration.")
1978
  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
1979
    returnvalue = 1
1980

    
1981
  ToStdout("Restarting daemons everywhere.")
1982
  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1983
  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1984
  if badnodes:
1985
    ToStderr("Warning: failed to start daemons on %s." %
1986
             (", ".join(list(set(badnodes))),))
1987
    returnvalue = 1
1988

    
1989
  ToStdout("Undraining the queue.")
1990
  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
1991
    returnvalue = 1
1992

    
1993
  _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
1994

    
1995
  ToStdout("Running post-upgrade hooks")
1996
  if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
1997
    returnvalue = 1
1998

    
1999
  ToStdout("Verifying cluster.")
2000
  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2001
    returnvalue = 1
2002

    
2003
  return returnvalue
2004

    
2005

    
2006
def UpgradeGanetiCommand(opts, args):
2007
  """Upgrade a cluster to a new ganeti version.
2008

2009
  @param opts: the command line options selected by the user
2010
  @type args: list
2011
  @param args: should be an empty list
2012
  @rtype: int
2013
  @return: the desired exit code
2014

2015
  """
2016
  if ((not opts.resume and opts.to is None)
2017
      or (opts.resume and opts.to is not None)):
2018
    ToStderr("Precisely one of the options --to and --resume"
2019
             " has to be given")
2020
    return 1
2021

    
2022
  oldversion = constants.RELEASE_VERSION
2023

    
2024
  if opts.resume:
2025
    ssconf.CheckMaster(False)
2026
    oldversion, versionstring = _ReadIntentToUpgrade()
2027
    if versionstring is None:
2028
      return 0
2029
    version = utils.version.ParseVersion(versionstring)
2030
    if version is None:
2031
      return 1
2032
    configversion = _GetConfigVersion()
2033
    if configversion is None:
2034
      return 1
2035
    # If the upgrade we resume was an upgrade between compatible
2036
    # versions (like 2.10.0 to 2.10.1), the correct configversion
2037
    # does not guarantee that the config has been updated.
2038
    # However, in the case of a compatible update with the configuration
2039
    # not touched, we are running a different dirversion with the same
2040
    # config version.
2041
    config_already_modified = \
2042
      (utils.IsCorrectConfigVersion(version, configversion) and
2043
       not (versionstring != constants.DIR_VERSION and
2044
            configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2045
                              constants.CONFIG_REVISION)))
2046
    if not config_already_modified:
2047
      # We have to start from the beginning; however, some daemons might have
2048
      # already been stopped, so the only way to get into a well-defined state
2049
      # is by starting all daemons again.
2050
      _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2051
  else:
2052
    versionstring = opts.to
2053
    config_already_modified = False
2054
    version = utils.version.ParseVersion(versionstring)
2055
    if version is None:
2056
      ToStderr("Could not parse version string %s" % versionstring)
2057
      return 1
2058

    
2059
  msg = utils.version.UpgradeRange(version)
2060
  if msg is not None:
2061
    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2062
    return 1
2063

    
2064
  if not config_already_modified:
2065
    success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2066
    if not success:
2067
      _ExecuteCommands(rollback)
2068
      return 1
2069
  else:
2070
    rollback = []
2071

    
2072
  downgrade = utils.version.ShouldCfgdowngrade(version)
2073

    
2074
  success, additionalrollback =  \
2075
      _SwitchVersionAndConfig(versionstring, downgrade)
2076
  if not success:
2077
    rollback.extend(additionalrollback)
2078
    _ExecuteCommands(rollback)
2079
    return 1
2080

    
2081
  return _UpgradeAfterConfigurationChange(oldversion)
2082

    
2083

    
2084
commands = {
2085
  "init": (
2086
    InitCluster, [ArgHost(min=1, max=1)],
2087
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2088
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2089
     NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2090
     SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2091
     DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2092
     PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2093
     GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2094
     HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2095
     IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT]
2096
     + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2097
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2098
  "destroy": (
2099
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2100
    "", "Destroy cluster"),
2101
  "rename": (
2102
    RenameCluster, [ArgHost(min=1, max=1)],
2103
    [FORCE_OPT, DRY_RUN_OPT],
2104
    "<new_name>",
2105
    "Renames the cluster"),
2106
  "redist-conf": (
2107
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2108
    [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2109
    "", "Forces a push of the configuration file and ssconf files"
2110
    " to the nodes in the cluster"),
2111
  "verify": (
2112
    VerifyCluster, ARGS_NONE,
2113
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2114
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2115
    "", "Does a check on the cluster configuration"),
2116
  "verify-disks": (
2117
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2118
    "", "Does a check on the cluster disk status"),
2119
  "repair-disk-sizes": (
2120
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2121
    "[instance...]", "Updates mismatches in recorded disk sizes"),
2122
  "master-failover": (
2123
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2124
    "", "Makes the current node the master"),
2125
  "master-ping": (
2126
    MasterPing, ARGS_NONE, [],
2127
    "", "Checks if the master is alive"),
2128
  "version": (
2129
    ShowClusterVersion, ARGS_NONE, [],
2130
    "", "Shows the cluster version"),
2131
  "getmaster": (
2132
    ShowClusterMaster, ARGS_NONE, [],
2133
    "", "Shows the cluster master"),
2134
  "copyfile": (
2135
    ClusterCopyFile, [ArgFile(min=1, max=1)],
2136
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2137
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2138
  "command": (
2139
    RunClusterCommand, [ArgCommand(min=1)],
2140
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2141
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2142
  "info": (
2143
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2144
    "[--roman]", "Show cluster configuration"),
2145
  "list-tags": (
2146
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2147
  "add-tags": (
2148
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2149
    "tag...", "Add tags to the cluster"),
2150
  "remove-tags": (
2151
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2152
    "tag...", "Remove tags from the cluster"),
2153
  "search-tags": (
2154
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2155
    "Searches the tags on all objects on"
2156
    " the cluster for a given pattern (regex)"),
2157
  "queue": (
2158
    QueueOps,
2159
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2160
    [], "drain|undrain|info", "Change queue properties"),
2161
  "watcher": (
2162
    WatcherOps,
2163
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2164
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2165
    [],
2166
    "{pause <timespec>|continue|info}", "Change watcher properties"),
2167
  "modify": (
2168
    SetClusterParams, ARGS_NONE,
2169
    [FORCE_OPT,
2170
     BACKEND_OPT, CP_SIZE_OPT, RQL_OPT, INSTANCE_COMMUNICATION_NETWORK_OPT,
2171
     ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
2172
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT,
2173
     UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT, DRBD_HELPER_OPT,
2174
     DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT,
2175
     DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2176
     USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] +
2177
     SUBMIT_OPTS +
2178
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
2179
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
2180
    "[opts...]",
2181
    "Alters the parameters of the cluster"),
2182
  "renew-crypto": (
2183
    RenewCrypto, ARGS_NONE,
2184
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2185
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2186
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2187
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2188
     NEW_NODE_CERT_OPT],
2189
    "[opts...]",
2190
    "Renews cluster certificates, keys and secrets"),
2191
  "epo": (
2192
    Epo, [ArgUnknown()],
2193
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2194
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2195
    "[opts...] [args]",
2196
    "Performs an emergency power-off on given args"),
2197
  "activate-master-ip": (
2198
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2199
  "deactivate-master-ip": (
2200
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2201
    "Deactivates the master IP"),
2202
  "show-ispecs-cmd": (
2203
    ShowCreateCommand, ARGS_NONE, [], "",
2204
    "Show the command line to re-create the cluster"),
2205
  "upgrade": (
2206
    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2207
    "Upgrade (or downgrade) to a new Ganeti version"),
2208
  }
2209

    
2210

    
2211
#: dictionary with aliases for commands
2212
aliases = {
2213
  "masterfailover": "master-failover",
2214
  "show": "info",
2215
}
2216

    
2217

    
2218
def Main():
2219
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2220
                     aliases=aliases)